diff --git a/gwp.go b/gwp.go index ebf8e27..6a1a27c 100644 --- a/gwp.go +++ b/gwp.go @@ -10,28 +10,30 @@ import ( // WorkerPool represents pool of workers. type WorkerPool struct { - f chan func() - r chan struct{} + f chan func() error + r chan error stopChan chan struct{} wg sync.WaitGroup EstimateCount int } // New creates new pool of workers with specified goroutine count. +// If specified number of workers less than 1, runtume.NumCPU() is used. func New(threadCount int) *WorkerPool { if threadCount <= 0 { threadCount = runtime.NumCPU() } workerPool := &WorkerPool{ - f: make(chan func()), - r: make(chan struct{}), + f: make(chan func() error), + r: make(chan error), stopChan: make(chan struct{})} workerPool.wg.Add(threadCount) go func() { - var counter int + var processedCount int + var errorCount int var prevPos int prevTime := time.Now() @@ -50,18 +52,32 @@ func New(threadCount int) *WorkerPool { for { select { case <-tickerUpdateText.C: - if workerPool.EstimateCount > 0 { - fmt.Fprintf(os.Stderr, newLine) - fmt.Fprintf(os.Stderr, "%.1f%% (%d / %d) ETA: %s at %.2f rps"+endLine, - float64(counter*100)/float64(workerPool.EstimateCount), counter, workerPool.EstimateCount, - time.Second*time.Duration(float64(workerPool.EstimateCount-counter)/currentSpeed), currentSpeed) + if workerPool.EstimateCount == 0 { + continue } + + fmt.Fprintf(os.Stderr, newLine) + fmt.Fprintf(os.Stderr, "Progress: %.1f%% (%d / %d)", + float64(processedCount*100)/float64(workerPool.EstimateCount), processedCount, workerPool.EstimateCount) + + if errorCount > 0 { + fmt.Fprintf(os.Stderr, " Errors: %d (%.1f%%)", + errorCount, float64(errorCount*100)/float64(workerPool.EstimateCount)) + } + if currentSpeed > 0 { + fmt.Fprintf(os.Stderr, " ETA: %s at %.2f rps", + time.Second*time.Duration(float64(workerPool.EstimateCount-processedCount)/currentSpeed), currentSpeed) + } + fmt.Fprint(os.Stderr, endLine) case <-tickerCalculateEta.C: - currentSpeed = float64(counter-prevPos) * float64(time.Second) / float64(time.Now().Sub(prevTime)) - prevPos = counter + currentSpeed = float64(processedCount-prevPos) * float64(time.Second) / float64(time.Now().Sub(prevTime)) + prevPos = processedCount prevTime = time.Now() - case <-workerPool.r: - counter++ + case err := <-workerPool.r: + if err != nil { + errorCount++ + } + processedCount++ case <-workerPool.stopChan: break } @@ -73,8 +89,7 @@ func New(threadCount int) *WorkerPool { defer workerPool.wg.Done() for f := range workerPool.f { - f() - workerPool.r <- struct{}{} + workerPool.r <- f() } }() } @@ -83,7 +98,7 @@ func New(threadCount int) *WorkerPool { } // Add sends specified task for execution. -func (workerPool *WorkerPool) Add(f func()) { +func (workerPool *WorkerPool) Add(f func() error) { workerPool.f <- f }