From 5b2fc6d56d5ad7688c3f39dc1524243225ae0908 Mon Sep 17 00:00:00 2001 From: nxshock Date: Mon, 7 Mar 2022 16:10:43 +0500 Subject: [PATCH 01/11] Do now print new line if no progress requested --- gwp.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/gwp.go b/gwp.go index a9aae4b..ab960eb 100644 --- a/gwp.go +++ b/gwp.go @@ -48,10 +48,16 @@ func New(threadCount int) *WorkerPool { tickerCalculateEta.Stop() }() - fmt.Fprintf(os.Stderr, endLine) + newLined := false + for { select { case <-tickerUpdateText.C: + if !newLined { + fmt.Fprintf(os.Stderr, endLine) + newLined = true + } + workerPool.printProgress() case <-tickerCalculateEta.C: workerPool.currentSpeed = float64(workerPool.processedCount-prevPos) * float64(time.Second) / float64(time.Now().Sub(prevTime)) From 21c13e086468758fcae50ee0c8076c2f596bd901 Mon Sep 17 00:00:00 2001 From: nxshock Date: Tue, 8 Mar 2022 17:58:03 +0500 Subject: [PATCH 02/11] Separate speed field --- README.md | 1 + gwp.go | 14 +++++++++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 550c92f..9336f4d 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ func main() { worker := gwp.New(4) // Create pool with specified number of workers worker.ShowProgress = true // Enable progress indicator + worker.ShowProgress = true // Show processing speed in progress indicator worker.EstimateCount = 100 // Set total number on jobs to calculate ETA for i := 0; i < 100; i++ { diff --git a/gwp.go b/gwp.go index ab960eb..38c0225 100644 --- a/gwp.go +++ b/gwp.go @@ -17,6 +17,7 @@ type WorkerPool struct { EstimateCount int ShowProgress bool + ShowSpeed bool processedCount int // processed jobs count errorCount int // processed jobs count that returned error @@ -105,10 +106,17 @@ func (workerPool *WorkerPool) printProgress() { fmt.Fprintf(os.Stderr, " Errors: %d (%.1f%%)", workerPool.errorCount, float64(workerPool.errorCount*100)/float64(workerPool.EstimateCount)) } - if workerPool.EstimateCount > 0 && workerPool.currentSpeed > 0 { - fmt.Fprintf(os.Stderr, " ETA: %s at %.2f rps", - fmtDuration(time.Second*time.Duration(float64(workerPool.EstimateCount-workerPool.processedCount)/workerPool.currentSpeed)), workerPool.currentSpeed) + + if workerPool.currentSpeed > 0 { + if workerPool.EstimateCount > 0 { + fmt.Fprintf(os.Stderr, " ETA: %s", fmtDuration(time.Second*time.Duration(float64(workerPool.EstimateCount-workerPool.processedCount)/workerPool.currentSpeed))) + } + + if workerPool.ShowSpeed { + fmt.Fprintf(os.Stderr, "Speed: %.2f rps", workerPool.currentSpeed) + } } + fmt.Fprint(os.Stderr, endLine) } From bd74140174597824729d66203558e10eef5b222e Mon Sep 17 00:00:00 2001 From: nxshock Date: Tue, 8 Mar 2022 18:00:09 +0500 Subject: [PATCH 03/11] Fix speed filed name --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 9336f4d..fc5b387 100644 --- a/README.md +++ b/README.md @@ -29,8 +29,9 @@ func f(i int) error { func main() { worker := gwp.New(4) // Create pool with specified number of workers + worker.ShowProgress = true // Enable progress indicator - worker.ShowProgress = true // Show processing speed in progress indicator + worker.ShowSpeed = true // Show processing speed in progress indicator worker.EstimateCount = 100 // Set total number on jobs to calculate ETA for i := 0; i < 100; i++ { From 928cd975766755129d52b4bf7c1f32da32d32993 Mon Sep 17 00:00:00 2001 From: nxshock Date: Tue, 8 Mar 2022 18:30:44 +0500 Subject: [PATCH 04/11] Add space before speed field --- gwp.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gwp.go b/gwp.go index 38c0225..30539f2 100644 --- a/gwp.go +++ b/gwp.go @@ -113,7 +113,7 @@ func (workerPool *WorkerPool) printProgress() { } if workerPool.ShowSpeed { - fmt.Fprintf(os.Stderr, "Speed: %.2f rps", workerPool.currentSpeed) + fmt.Fprintf(os.Stderr, " Speed: %.2f rps", workerPool.currentSpeed) } } From 54dac429258e5f426478fd1f0a6450217be5f800 Mon Sep 17 00:00:00 2001 From: nxshock Date: Fri, 24 Jun 2022 21:05:30 +0500 Subject: [PATCH 05/11] Fix comment --- gwp.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gwp.go b/gwp.go index 30539f2..3553500 100644 --- a/gwp.go +++ b/gwp.go @@ -25,7 +25,7 @@ type WorkerPool struct { } // New creates new pool of workers with specified goroutine count. -// If specified number of workers less than 1, runtume.NumCPU() is used. +// If specified number of workers less than 1, runtime.NumCPU() is used. func New(threadCount int) *WorkerPool { if threadCount <= 0 { threadCount = runtime.NumCPU() From a2986a948528cf7b37d610b200ce52574987149d Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 25 Jun 2022 17:53:32 +0500 Subject: [PATCH 06/11] Do not update progress message without changes --- gwp.go | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/gwp.go b/gwp.go index 3553500..49ade98 100644 --- a/gwp.go +++ b/gwp.go @@ -1,6 +1,7 @@ package gwp import ( + "bytes" "fmt" "os" "runtime" @@ -22,6 +23,8 @@ type WorkerPool struct { processedCount int // processed jobs count errorCount int // processed jobs count that returned error currentSpeed float64 // speed calculated for last minute + + lastProgressMessage string } // New creates new pool of workers with specified goroutine count. @@ -93,31 +96,41 @@ func (workerPool *WorkerPool) printProgress() { return } - fmt.Fprintf(os.Stderr, newLine) + buf := new(bytes.Buffer) + + fmt.Fprintf(buf, newLine) if workerPool.EstimateCount == 0 { - fmt.Fprintf(os.Stderr, "Progress: %d", workerPool.processedCount) + fmt.Fprintf(buf, "Progress: %d", workerPool.processedCount) } else { - fmt.Fprintf(os.Stderr, "Progress: %.1f%% (%d / %d)", + fmt.Fprintf(buf, "Progress: %.1f%% (%d / %d)", float64(workerPool.processedCount*100)/float64(workerPool.EstimateCount), workerPool.processedCount, workerPool.EstimateCount) } if workerPool.errorCount > 0 { - fmt.Fprintf(os.Stderr, " Errors: %d (%.1f%%)", + fmt.Fprintf(buf, " Errors: %d (%.1f%%)", workerPool.errorCount, float64(workerPool.errorCount*100)/float64(workerPool.EstimateCount)) } if workerPool.currentSpeed > 0 { if workerPool.EstimateCount > 0 { - fmt.Fprintf(os.Stderr, " ETA: %s", fmtDuration(time.Second*time.Duration(float64(workerPool.EstimateCount-workerPool.processedCount)/workerPool.currentSpeed))) + fmt.Fprintf(buf, " ETA: %s", fmtDuration(time.Second*time.Duration(float64(workerPool.EstimateCount-workerPool.processedCount)/workerPool.currentSpeed))) } if workerPool.ShowSpeed { - fmt.Fprintf(os.Stderr, " Speed: %.2f rps", workerPool.currentSpeed) + fmt.Fprintf(buf, " Speed: %.2f rps", workerPool.currentSpeed) } } - fmt.Fprint(os.Stderr, endLine) + fmt.Fprint(buf, endLine) + + if buf.String() == workerPool.lastProgressMessage { + return + } + + buf.WriteTo(os.Stderr) + + workerPool.lastProgressMessage = buf.String() } // Add sends specified task for execution. From 0d6ffec53030d0a360d3fc820eafeaca0b736347 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sun, 26 Jun 2022 18:24:12 +0500 Subject: [PATCH 07/11] Fix buffer usage order --- gwp.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gwp.go b/gwp.go index 49ade98..f241aca 100644 --- a/gwp.go +++ b/gwp.go @@ -128,9 +128,9 @@ func (workerPool *WorkerPool) printProgress() { return } - buf.WriteTo(os.Stderr) - workerPool.lastProgressMessage = buf.String() + + buf.WriteTo(os.Stderr) } // Add sends specified task for execution. From aba812a470712c81a5396ec123bb5a075e742f59 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 6 Aug 2022 22:13:22 +0500 Subject: [PATCH 08/11] Some chages * Use go-eta library from calculating ETA * Fix linter issues * Add testify library for tests --- go.mod | 5 +++++ go.sum | 16 ++++++++++++++++ gwp.go | 48 ++++++++++++++++++++++++++++++++---------------- 3 files changed, 53 insertions(+), 16 deletions(-) create mode 100644 go.sum diff --git a/go.mod b/go.mod index 484e9ac..7618791 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,8 @@ module github.com/nxshock/gwp go 1.16 + +require ( + github.com/nxshock/go-eta v0.1.0 + github.com/stretchr/testify v1.8.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d88a774 --- /dev/null +++ b/go.sum @@ -0,0 +1,16 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/nxshock/go-eta v0.1.0 h1:rcy8rNPaVeL3f3c5rJRLvho0FErUJThD0+vihxA7t9c= +github.com/nxshock/go-eta v0.1.0/go.mod h1:hZ59FHIJSpPeZk38pd/3zUkI4H31hAQ2oFw+NgdU+SA= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/gwp.go b/gwp.go index f241aca..e906130 100644 --- a/gwp.go +++ b/gwp.go @@ -7,6 +7,8 @@ import ( "runtime" "sync" "time" + + "github.com/nxshock/go-eta" ) // WorkerPool represents pool of workers. @@ -16,7 +18,7 @@ type WorkerPool struct { stopChan chan struct{} wg sync.WaitGroup - EstimateCount int + estimateCount int ShowProgress bool ShowSpeed bool @@ -24,6 +26,8 @@ type WorkerPool struct { errorCount int // processed jobs count that returned error currentSpeed float64 // speed calculated for last minute + eta *eta.Calculator + lastProgressMessage string } @@ -37,7 +41,8 @@ func New(threadCount int) *WorkerPool { workerPool := &WorkerPool{ jobChan: make(chan func() error), resultChan: make(chan error), - stopChan: make(chan struct{})} + stopChan: make(chan struct{}), + eta: eta.New(time.Minute, 0)} workerPool.wg.Add(threadCount) @@ -58,13 +63,13 @@ func New(threadCount int) *WorkerPool { select { case <-tickerUpdateText.C: if !newLined { - fmt.Fprintf(os.Stderr, endLine) + fmt.Fprint(os.Stderr, endLine) newLined = true } - workerPool.printProgress() + _ = workerPool.printProgress() case <-tickerCalculateEta.C: - workerPool.currentSpeed = float64(workerPool.processedCount-prevPos) * float64(time.Second) / float64(time.Now().Sub(prevTime)) + workerPool.currentSpeed = float64(workerPool.processedCount-prevPos) * float64(time.Second) / float64(time.Since(prevTime)) prevPos = workerPool.processedCount prevTime = time.Now() case err := <-workerPool.resultChan: @@ -72,6 +77,7 @@ func New(threadCount int) *WorkerPool { workerPool.errorCount++ } workerPool.processedCount++ + workerPool.eta.Increment(1) case <-workerPool.stopChan: return } @@ -91,30 +97,35 @@ func New(threadCount int) *WorkerPool { return workerPool } -func (workerPool *WorkerPool) printProgress() { +func (workerPool *WorkerPool) SetEstimateCount(n int) { + workerPool.estimateCount = n + workerPool.eta.TotalCount = n +} + +func (workerPool *WorkerPool) printProgress() error { if !workerPool.ShowProgress { - return + return nil } buf := new(bytes.Buffer) - fmt.Fprintf(buf, newLine) + fmt.Fprint(buf, newLine) - if workerPool.EstimateCount == 0 { + if workerPool.estimateCount == 0 { fmt.Fprintf(buf, "Progress: %d", workerPool.processedCount) } else { fmt.Fprintf(buf, "Progress: %.1f%% (%d / %d)", - float64(workerPool.processedCount*100)/float64(workerPool.EstimateCount), workerPool.processedCount, workerPool.EstimateCount) + float64(workerPool.processedCount*100)/float64(workerPool.estimateCount), workerPool.processedCount, workerPool.estimateCount) } if workerPool.errorCount > 0 { fmt.Fprintf(buf, " Errors: %d (%.1f%%)", - workerPool.errorCount, float64(workerPool.errorCount*100)/float64(workerPool.EstimateCount)) + workerPool.errorCount, float64(workerPool.errorCount*100)/float64(workerPool.estimateCount)) } if workerPool.currentSpeed > 0 { - if workerPool.EstimateCount > 0 { - fmt.Fprintf(buf, " ETA: %s", fmtDuration(time.Second*time.Duration(float64(workerPool.EstimateCount-workerPool.processedCount)/workerPool.currentSpeed))) + if workerPool.estimateCount > 0 { + fmt.Fprintf(buf, " ETA: %s", fmtDuration(time.Until(workerPool.eta.Average()))) } if workerPool.ShowSpeed { @@ -125,12 +136,17 @@ func (workerPool *WorkerPool) printProgress() { fmt.Fprint(buf, endLine) if buf.String() == workerPool.lastProgressMessage { - return + return nil } workerPool.lastProgressMessage = buf.String() - buf.WriteTo(os.Stderr) + _, err := buf.WriteTo(os.Stderr) + if err != nil { + return err + } + + return nil } // Add sends specified task for execution. @@ -145,7 +161,7 @@ func (workerPool *WorkerPool) CloseAndWait() { workerPool.stopChan <- struct{}{} close(workerPool.resultChan) - workerPool.printProgress() + _ = workerPool.printProgress() } // ErrorCount returns total error count. From a43e9e76b2a77bd42c3be2277d19c5a8352dd357 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 6 Aug 2022 22:17:35 +0500 Subject: [PATCH 09/11] Update README forworker.SetEstimateCount() --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index fc5b387..fb72ae3 100644 --- a/README.md +++ b/README.md @@ -28,11 +28,11 @@ func f(i int) error { func main() { - worker := gwp.New(4) // Create pool with specified number of workers + worker := gwp.New(4) // Create pool with specified number of workers - worker.ShowProgress = true // Enable progress indicator - worker.ShowSpeed = true // Show processing speed in progress indicator - worker.EstimateCount = 100 // Set total number on jobs to calculate ETA + worker.ShowProgress = true // Enable progress indicator + worker.ShowSpeed = true // Show processing speed in progress indicator + worker.SetEstimateCount(100) // Set total number on jobs to calculate ETA for i := 0; i < 100; i++ { n := i From 0e63308e921c02c9b896ec7b8a7c20d87c876850 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sun, 7 Aug 2022 10:15:27 +0500 Subject: [PATCH 10/11] Show ETA for first period --- gwp.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/gwp.go b/gwp.go index e906130..fcd529a 100644 --- a/gwp.go +++ b/gwp.go @@ -123,14 +123,12 @@ func (workerPool *WorkerPool) printProgress() error { workerPool.errorCount, float64(workerPool.errorCount*100)/float64(workerPool.estimateCount)) } - if workerPool.currentSpeed > 0 { - if workerPool.estimateCount > 0 { - fmt.Fprintf(buf, " ETA: %s", fmtDuration(time.Until(workerPool.eta.Average()))) - } + if workerPool.estimateCount > 0 { + fmt.Fprintf(buf, " ETA: %s", fmtDuration(time.Until(workerPool.eta.Average()))) + } - if workerPool.ShowSpeed { - fmt.Fprintf(buf, " Speed: %.2f rps", workerPool.currentSpeed) - } + if workerPool.currentSpeed > 0 && workerPool.ShowSpeed { + fmt.Fprintf(buf, " Speed: %.2f rps", workerPool.currentSpeed) } fmt.Fprint(buf, endLine) From 161075bb83854db48851265322767b114d2a9be0 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 20 Aug 2022 11:55:26 +0500 Subject: [PATCH 11/11] Update deps --- go.mod | 2 +- go.sum | 2 ++ gwp.go | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 7618791..01724de 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,6 @@ module github.com/nxshock/gwp go 1.16 require ( - github.com/nxshock/go-eta v0.1.0 + github.com/nxshock/go-eta v0.1.1 github.com/stretchr/testify v1.8.0 ) diff --git a/go.sum b/go.sum index d88a774..c701620 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/nxshock/go-eta v0.1.0 h1:rcy8rNPaVeL3f3c5rJRLvho0FErUJThD0+vihxA7t9c= github.com/nxshock/go-eta v0.1.0/go.mod h1:hZ59FHIJSpPeZk38pd/3zUkI4H31hAQ2oFw+NgdU+SA= +github.com/nxshock/go-eta v0.1.1 h1:N4wi3wjrXJGiheYF4zBIjOwugM+++NWsB1fpeRZaHS4= +github.com/nxshock/go-eta v0.1.1/go.mod h1:hZ59FHIJSpPeZk38pd/3zUkI4H31hAQ2oFw+NgdU+SA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/gwp.go b/gwp.go index fcd529a..4119cce 100644 --- a/gwp.go +++ b/gwp.go @@ -42,7 +42,7 @@ func New(threadCount int) *WorkerPool { jobChan: make(chan func() error), resultChan: make(chan error), stopChan: make(chan struct{}), - eta: eta.New(time.Minute, 0)} + eta: eta.New(0)} workerPool.wg.Add(threadCount)