diff --git a/README.md b/README.md index fb72ae3..550c92f 100644 --- a/README.md +++ b/README.md @@ -28,11 +28,9 @@ func f(i int) error { func main() { - worker := gwp.New(4) // Create pool with specified number of workers - - worker.ShowProgress = true // Enable progress indicator - worker.ShowSpeed = true // Show processing speed in progress indicator - worker.SetEstimateCount(100) // Set total number on jobs to calculate ETA + worker := gwp.New(4) // Create pool with specified number of workers + worker.ShowProgress = true // Enable progress indicator + worker.EstimateCount = 100 // Set total number on jobs to calculate ETA for i := 0; i < 100; i++ { n := i diff --git a/durationformat.go b/durationformat.go deleted file mode 100644 index ac3ccd7..0000000 --- a/durationformat.go +++ /dev/null @@ -1,46 +0,0 @@ -package gwp - -import ( - "fmt" - "time" -) - -func fmtDuration(d time.Duration) string { - d = d.Round(time.Second) - - days := d / time.Hour / 24 - d -= days * 24 * time.Hour - - hours := d / time.Hour - d -= hours * time.Hour - - minites := d / time.Minute - d -= minites * time.Minute - - seconds := d / time.Second - d -= seconds * time.Second - - var resultStr string - - if days > 0 { - resultStr += fmt.Sprintf("%3dd", days) - } else { - resultStr += " " - } - - if hours > 0 { - resultStr += fmt.Sprintf(" %2dh", hours) - } else { - resultStr += " " - } - - if minites > 0 { - resultStr += fmt.Sprintf(" %2dm", minites) - } else { - resultStr += " " - } - - resultStr += fmt.Sprintf(" %2ds", seconds) - - return resultStr -} diff --git a/durationformat_test.go b/durationformat_test.go deleted file mode 100644 index ef16c1e..0000000 --- a/durationformat_test.go +++ /dev/null @@ -1,16 +0,0 @@ -package gwp - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestFmtDuration(t *testing.T) { - assert.Equal(t, " 1s", fmtDuration(time.Second)) - assert.Equal(t, " 1m 0s", fmtDuration(time.Minute)) - assert.Equal(t, " 1h 0s", fmtDuration(time.Hour)) - assert.Equal(t, " 1d 0s", fmtDuration(time.Hour*24)) - assert.Equal(t, "365d 0s", fmtDuration(time.Hour*24*365)) -} diff --git a/go.mod b/go.mod index 01724de..484e9ac 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,3 @@ module github.com/nxshock/gwp go 1.16 - -require ( - github.com/nxshock/go-eta v0.1.1 - github.com/stretchr/testify v1.8.0 -) diff --git a/go.sum b/go.sum deleted file mode 100644 index c701620..0000000 --- a/go.sum +++ /dev/null @@ -1,18 +0,0 @@ -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/nxshock/go-eta v0.1.0 h1:rcy8rNPaVeL3f3c5rJRLvho0FErUJThD0+vihxA7t9c= -github.com/nxshock/go-eta v0.1.0/go.mod h1:hZ59FHIJSpPeZk38pd/3zUkI4H31hAQ2oFw+NgdU+SA= -github.com/nxshock/go-eta v0.1.1 h1:N4wi3wjrXJGiheYF4zBIjOwugM+++NWsB1fpeRZaHS4= -github.com/nxshock/go-eta v0.1.1/go.mod h1:hZ59FHIJSpPeZk38pd/3zUkI4H31hAQ2oFw+NgdU+SA= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/gwp.go b/gwp.go index 4119cce..617c98b 100644 --- a/gwp.go +++ b/gwp.go @@ -1,14 +1,11 @@ package gwp import ( - "bytes" "fmt" "os" "runtime" "sync" "time" - - "github.com/nxshock/go-eta" ) // WorkerPool represents pool of workers. @@ -18,21 +15,16 @@ type WorkerPool struct { stopChan chan struct{} wg sync.WaitGroup - estimateCount int + EstimateCount int ShowProgress bool - ShowSpeed bool processedCount int // processed jobs count errorCount int // processed jobs count that returned error currentSpeed float64 // speed calculated for last minute - - eta *eta.Calculator - - lastProgressMessage string } // New creates new pool of workers with specified goroutine count. -// If specified number of workers less than 1, runtime.NumCPU() is used. +// If specified number of workers less than 1, runtume.NumCPU() is used. func New(threadCount int) *WorkerPool { if threadCount <= 0 { threadCount = runtime.NumCPU() @@ -41,8 +33,7 @@ func New(threadCount int) *WorkerPool { workerPool := &WorkerPool{ jobChan: make(chan func() error), resultChan: make(chan error), - stopChan: make(chan struct{}), - eta: eta.New(0)} + stopChan: make(chan struct{})} workerPool.wg.Add(threadCount) @@ -57,19 +48,13 @@ func New(threadCount int) *WorkerPool { tickerCalculateEta.Stop() }() - newLined := false - + fmt.Fprintf(os.Stderr, endLine) for { select { case <-tickerUpdateText.C: - if !newLined { - fmt.Fprint(os.Stderr, endLine) - newLined = true - } - - _ = workerPool.printProgress() + workerPool.printProgress() case <-tickerCalculateEta.C: - workerPool.currentSpeed = float64(workerPool.processedCount-prevPos) * float64(time.Second) / float64(time.Since(prevTime)) + workerPool.currentSpeed = float64(workerPool.processedCount-prevPos) * float64(time.Second) / float64(time.Now().Sub(prevTime)) prevPos = workerPool.processedCount prevTime = time.Now() case err := <-workerPool.resultChan: @@ -77,7 +62,6 @@ func New(threadCount int) *WorkerPool { workerPool.errorCount++ } workerPool.processedCount++ - workerPool.eta.Increment(1) case <-workerPool.stopChan: return } @@ -97,54 +81,29 @@ func New(threadCount int) *WorkerPool { return workerPool } -func (workerPool *WorkerPool) SetEstimateCount(n int) { - workerPool.estimateCount = n - workerPool.eta.TotalCount = n -} - -func (workerPool *WorkerPool) printProgress() error { +func (workerPool *WorkerPool) printProgress() { if !workerPool.ShowProgress { - return nil + return } - buf := new(bytes.Buffer) + fmt.Fprintf(os.Stderr, newLine) - fmt.Fprint(buf, newLine) - - if workerPool.estimateCount == 0 { - fmt.Fprintf(buf, "Progress: %d", workerPool.processedCount) + if workerPool.EstimateCount == 0 { + fmt.Fprintf(os.Stderr, "Progress: %d", workerPool.processedCount) } else { - fmt.Fprintf(buf, "Progress: %.1f%% (%d / %d)", - float64(workerPool.processedCount*100)/float64(workerPool.estimateCount), workerPool.processedCount, workerPool.estimateCount) + fmt.Fprintf(os.Stderr, "Progress: %.1f%% (%d / %d)", + float64(workerPool.processedCount*100)/float64(workerPool.EstimateCount), workerPool.processedCount, workerPool.EstimateCount) } if workerPool.errorCount > 0 { - fmt.Fprintf(buf, " Errors: %d (%.1f%%)", - workerPool.errorCount, float64(workerPool.errorCount*100)/float64(workerPool.estimateCount)) + fmt.Fprintf(os.Stderr, " Errors: %d (%.1f%%)", + workerPool.errorCount, float64(workerPool.errorCount*100)/float64(workerPool.EstimateCount)) } - - if workerPool.estimateCount > 0 { - fmt.Fprintf(buf, " ETA: %s", fmtDuration(time.Until(workerPool.eta.Average()))) + if workerPool.EstimateCount > 0 && workerPool.currentSpeed > 0 { + fmt.Fprintf(os.Stderr, " ETA: %s at %.2f rps", + time.Second*time.Duration(float64(workerPool.EstimateCount-workerPool.processedCount)/workerPool.currentSpeed), workerPool.currentSpeed) } - - if workerPool.currentSpeed > 0 && workerPool.ShowSpeed { - fmt.Fprintf(buf, " Speed: %.2f rps", workerPool.currentSpeed) - } - - fmt.Fprint(buf, endLine) - - if buf.String() == workerPool.lastProgressMessage { - return nil - } - - workerPool.lastProgressMessage = buf.String() - - _, err := buf.WriteTo(os.Stderr) - if err != nil { - return err - } - - return nil + fmt.Fprint(os.Stderr, endLine) } // Add sends specified task for execution. @@ -159,10 +118,10 @@ func (workerPool *WorkerPool) CloseAndWait() { workerPool.stopChan <- struct{}{} close(workerPool.resultChan) - _ = workerPool.printProgress() + workerPool.printProgress() } -// ErrorCount returns total error count. +// ErrorCount returns total error count func (workerPool *WorkerPool) ErrorCount() int { return workerPool.errorCount }