1
0
mirror of https://github.com/nxshock/gwp.git synced 2025-01-18 11:21:10 +05:00

Always update progress text at finish

This commit is contained in:
nxshock 2021-02-04 21:23:43 +05:00
parent ed1db97396
commit 267c3604e6
2 changed files with 56 additions and 26 deletions

58
gwp.go
View File

@ -15,6 +15,10 @@ type WorkerPool struct {
stopChan chan struct{}
wg sync.WaitGroup
EstimateCount int
processedCount int // processed jobs count
errorCount int // processed jobs count that returned error
currentSpeed float64 // speed calculated for last minute
}
// New creates new pool of workers with specified goroutine count.
@ -32,8 +36,6 @@ func New(threadCount int) *WorkerPool {
workerPool.wg.Add(threadCount)
go func() {
var processedCount int
var errorCount int
var prevPos int
prevTime := time.Now()
@ -44,40 +46,22 @@ func New(threadCount int) *WorkerPool {
tickerCalculateEta.Stop()
}()
var currentSpeed float64 // jobs per sec
fmt.Fprintf(os.Stderr, endLine)
for {
select {
case <-tickerUpdateText.C:
if workerPool.EstimateCount == 0 {
continue
}
fmt.Fprintf(os.Stderr, newLine)
fmt.Fprintf(os.Stderr, "Progress: %.1f%% (%d / %d)",
float64(processedCount*100)/float64(workerPool.EstimateCount), processedCount, workerPool.EstimateCount)
if errorCount > 0 {
fmt.Fprintf(os.Stderr, " Errors: %d (%.1f%%)",
errorCount, float64(errorCount*100)/float64(workerPool.EstimateCount))
}
if currentSpeed > 0 {
fmt.Fprintf(os.Stderr, " ETA: %s at %.2f rps",
time.Second*time.Duration(float64(workerPool.EstimateCount-processedCount)/currentSpeed), currentSpeed)
}
fmt.Fprint(os.Stderr, endLine)
workerPool.printProgress()
case <-tickerCalculateEta.C:
currentSpeed = float64(processedCount-prevPos) * float64(time.Second) / float64(time.Now().Sub(prevTime))
prevPos = processedCount
workerPool.currentSpeed = float64(workerPool.processedCount-prevPos) * float64(time.Second) / float64(time.Now().Sub(prevTime))
prevPos = workerPool.processedCount
prevTime = time.Now()
case err := <-workerPool.r:
if err != nil {
errorCount++
workerPool.errorCount++
}
processedCount++
workerPool.processedCount++
case <-workerPool.stopChan:
break
return
}
}
}()
@ -95,6 +79,26 @@ func New(threadCount int) *WorkerPool {
return workerPool
}
func (workerPool *WorkerPool) printProgress() {
if workerPool.EstimateCount == 0 {
return
}
fmt.Fprintf(os.Stderr, newLine)
fmt.Fprintf(os.Stderr, "Progress: %.1f%% (%d / %d)",
float64(workerPool.processedCount*100)/float64(workerPool.EstimateCount), workerPool.processedCount, workerPool.EstimateCount)
if workerPool.errorCount > 0 {
fmt.Fprintf(os.Stderr, " Errors: %d (%.1f%%)",
workerPool.errorCount, float64(workerPool.errorCount*100)/float64(workerPool.EstimateCount))
}
if workerPool.currentSpeed > 0 {
fmt.Fprintf(os.Stderr, " ETA: %s at %.2f rps",
time.Second*time.Duration(float64(workerPool.EstimateCount-workerPool.processedCount)/workerPool.currentSpeed), workerPool.currentSpeed)
}
fmt.Fprint(os.Stderr, endLine)
}
// Add sends specified task for execution.
func (workerPool *WorkerPool) Add(f func() error) {
workerPool.f <- f
@ -106,4 +110,6 @@ func (workerPool *WorkerPool) CloseAndWait() {
workerPool.wg.Wait()
workerPool.stopChan <- struct{}{}
close(workerPool.r)
workerPool.printProgress()
}

View File

@ -1,6 +1,7 @@
package gwp
import (
"errors"
"sync/atomic"
"testing"
@ -23,5 +24,28 @@ func TestBasic(t *testing.T) {
wp.CloseAndWait()
assert.EqualValues(t, 100, *count)
assert.EqualValues(t, 100, wp.processedCount)
}
}
func TestErrorCounter(t *testing.T) {
for i := 0; i < 10; i++ {
wp := New(i)
for j := 0; j < 100; j++ {
n := j
wp.Add(func() error {
if n%2 == 0 {
return errors.New("error")
}
return nil
})
}
wp.CloseAndWait()
assert.EqualValues(t, 100, wp.processedCount)
assert.EqualValues(t, 50, wp.errorCount)
}
}