From 267c3604e68ae3c4af3e79e27a2c30e837d6e2b0 Mon Sep 17 00:00:00 2001 From: nxshock Date: Thu, 4 Feb 2021 21:23:43 +0500 Subject: [PATCH] Always update progress text at finish --- gwp.go | 58 +++++++++++++++++++++++++++++------------------------ gwp_test.go | 24 ++++++++++++++++++++++ 2 files changed, 56 insertions(+), 26 deletions(-) diff --git a/gwp.go b/gwp.go index c94e853..07805d9 100644 --- a/gwp.go +++ b/gwp.go @@ -15,6 +15,10 @@ type WorkerPool struct { stopChan chan struct{} wg sync.WaitGroup EstimateCount int + + processedCount int // processed jobs count + errorCount int // processed jobs count that returned error + currentSpeed float64 // speed calculated for last minute } // New creates new pool of workers with specified goroutine count. @@ -32,8 +36,6 @@ func New(threadCount int) *WorkerPool { workerPool.wg.Add(threadCount) go func() { - var processedCount int - var errorCount int var prevPos int prevTime := time.Now() @@ -44,40 +46,22 @@ func New(threadCount int) *WorkerPool { tickerCalculateEta.Stop() }() - var currentSpeed float64 // jobs per sec - fmt.Fprintf(os.Stderr, endLine) for { select { case <-tickerUpdateText.C: - if workerPool.EstimateCount == 0 { - continue - } - - fmt.Fprintf(os.Stderr, newLine) - fmt.Fprintf(os.Stderr, "Progress: %.1f%% (%d / %d)", - float64(processedCount*100)/float64(workerPool.EstimateCount), processedCount, workerPool.EstimateCount) - - if errorCount > 0 { - fmt.Fprintf(os.Stderr, " Errors: %d (%.1f%%)", - errorCount, float64(errorCount*100)/float64(workerPool.EstimateCount)) - } - if currentSpeed > 0 { - fmt.Fprintf(os.Stderr, " ETA: %s at %.2f rps", - time.Second*time.Duration(float64(workerPool.EstimateCount-processedCount)/currentSpeed), currentSpeed) - } - fmt.Fprint(os.Stderr, endLine) + workerPool.printProgress() case <-tickerCalculateEta.C: - currentSpeed = float64(processedCount-prevPos) * float64(time.Second) / float64(time.Now().Sub(prevTime)) - prevPos = processedCount + workerPool.currentSpeed = float64(workerPool.processedCount-prevPos) * float64(time.Second) / float64(time.Now().Sub(prevTime)) + prevPos = workerPool.processedCount prevTime = time.Now() case err := <-workerPool.r: if err != nil { - errorCount++ + workerPool.errorCount++ } - processedCount++ + workerPool.processedCount++ case <-workerPool.stopChan: - break + return } } }() @@ -95,6 +79,26 @@ func New(threadCount int) *WorkerPool { return workerPool } +func (workerPool *WorkerPool) printProgress() { + if workerPool.EstimateCount == 0 { + return + } + + fmt.Fprintf(os.Stderr, newLine) + fmt.Fprintf(os.Stderr, "Progress: %.1f%% (%d / %d)", + float64(workerPool.processedCount*100)/float64(workerPool.EstimateCount), workerPool.processedCount, workerPool.EstimateCount) + + if workerPool.errorCount > 0 { + fmt.Fprintf(os.Stderr, " Errors: %d (%.1f%%)", + workerPool.errorCount, float64(workerPool.errorCount*100)/float64(workerPool.EstimateCount)) + } + if workerPool.currentSpeed > 0 { + fmt.Fprintf(os.Stderr, " ETA: %s at %.2f rps", + time.Second*time.Duration(float64(workerPool.EstimateCount-workerPool.processedCount)/workerPool.currentSpeed), workerPool.currentSpeed) + } + fmt.Fprint(os.Stderr, endLine) +} + // Add sends specified task for execution. func (workerPool *WorkerPool) Add(f func() error) { workerPool.f <- f @@ -106,4 +110,6 @@ func (workerPool *WorkerPool) CloseAndWait() { workerPool.wg.Wait() workerPool.stopChan <- struct{}{} close(workerPool.r) + + workerPool.printProgress() } diff --git a/gwp_test.go b/gwp_test.go index 84342ff..0f2ec94 100644 --- a/gwp_test.go +++ b/gwp_test.go @@ -1,6 +1,7 @@ package gwp import ( + "errors" "sync/atomic" "testing" @@ -23,5 +24,28 @@ func TestBasic(t *testing.T) { wp.CloseAndWait() assert.EqualValues(t, 100, *count) + assert.EqualValues(t, 100, wp.processedCount) + } +} + +func TestErrorCounter(t *testing.T) { + for i := 0; i < 10; i++ { + wp := New(i) + + for j := 0; j < 100; j++ { + n := j + wp.Add(func() error { + if n%2 == 0 { + return errors.New("error") + } + + return nil + }) + } + + wp.CloseAndWait() + + assert.EqualValues(t, 100, wp.processedCount) + assert.EqualValues(t, 50, wp.errorCount) } }