1
0
mirror of https://github.com/nxshock/gwp.git synced 2025-04-20 01:21:52 +05:00

Compare commits

...

13 Commits
v0.1.1 ... main

Author SHA1 Message Date
161075bb83 Update deps 2022-08-20 11:55:26 +05:00
0e63308e92 Show ETA for first period 2022-08-07 10:15:27 +05:00
a43e9e76b2 Update README forworker.SetEstimateCount() 2022-08-06 22:17:35 +05:00
aba812a470 Some chages
* Use go-eta library from calculating ETA
* Fix linter issues
* Add testify library for tests
2022-08-06 22:13:22 +05:00
0d6ffec530 Fix buffer usage order 2022-06-26 18:24:12 +05:00
a2986a9485 Do not update progress message without changes 2022-06-25 17:53:32 +05:00
54dac42925
Fix comment 2022-06-24 21:05:30 +05:00
928cd97576 Add space before speed field 2022-03-08 18:30:44 +05:00
bd74140174 Fix speed filed name 2022-03-08 18:00:09 +05:00
21c13e0864 Separate speed field 2022-03-08 17:58:03 +05:00
5b2fc6d56d Do now print new line if no progress requested 2022-03-07 16:10:43 +05:00
35416cd502 Add custom duration formatter 2022-01-08 18:40:23 +05:00
2a342398b9 Do not show zero estimate count 2021-11-04 19:01:40 +05:00
6 changed files with 155 additions and 22 deletions

View File

@ -29,8 +29,10 @@ func f(i int) error {
func main() { func main() {
worker := gwp.New(4) // Create pool with specified number of workers worker := gwp.New(4) // Create pool with specified number of workers
worker.ShowProgress = true // Enable progress indicator worker.ShowProgress = true // Enable progress indicator
worker.EstimateCount = 100 // Set total number on jobs to calculate ETA worker.ShowSpeed = true // Show processing speed in progress indicator
worker.SetEstimateCount(100) // Set total number on jobs to calculate ETA
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
n := i n := i

46
durationformat.go Normal file
View File

@ -0,0 +1,46 @@
package gwp
import (
"fmt"
"time"
)
func fmtDuration(d time.Duration) string {
d = d.Round(time.Second)
days := d / time.Hour / 24
d -= days * 24 * time.Hour
hours := d / time.Hour
d -= hours * time.Hour
minites := d / time.Minute
d -= minites * time.Minute
seconds := d / time.Second
d -= seconds * time.Second
var resultStr string
if days > 0 {
resultStr += fmt.Sprintf("%3dd", days)
} else {
resultStr += " "
}
if hours > 0 {
resultStr += fmt.Sprintf(" %2dh", hours)
} else {
resultStr += " "
}
if minites > 0 {
resultStr += fmt.Sprintf(" %2dm", minites)
} else {
resultStr += " "
}
resultStr += fmt.Sprintf(" %2ds", seconds)
return resultStr
}

16
durationformat_test.go Normal file
View File

@ -0,0 +1,16 @@
package gwp
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestFmtDuration(t *testing.T) {
assert.Equal(t, " 1s", fmtDuration(time.Second))
assert.Equal(t, " 1m 0s", fmtDuration(time.Minute))
assert.Equal(t, " 1h 0s", fmtDuration(time.Hour))
assert.Equal(t, " 1d 0s", fmtDuration(time.Hour*24))
assert.Equal(t, "365d 0s", fmtDuration(time.Hour*24*365))
}

5
go.mod
View File

@ -1,3 +1,8 @@
module github.com/nxshock/gwp module github.com/nxshock/gwp
go 1.16 go 1.16
require (
github.com/nxshock/go-eta v0.1.1
github.com/stretchr/testify v1.8.0
)

18
go.sum Normal file
View File

@ -0,0 +1,18 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/nxshock/go-eta v0.1.0 h1:rcy8rNPaVeL3f3c5rJRLvho0FErUJThD0+vihxA7t9c=
github.com/nxshock/go-eta v0.1.0/go.mod h1:hZ59FHIJSpPeZk38pd/3zUkI4H31hAQ2oFw+NgdU+SA=
github.com/nxshock/go-eta v0.1.1 h1:N4wi3wjrXJGiheYF4zBIjOwugM+++NWsB1fpeRZaHS4=
github.com/nxshock/go-eta v0.1.1/go.mod h1:hZ59FHIJSpPeZk38pd/3zUkI4H31hAQ2oFw+NgdU+SA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

84
gwp.go
View File

@ -1,11 +1,14 @@
package gwp package gwp
import ( import (
"bytes"
"fmt" "fmt"
"os" "os"
"runtime" "runtime"
"sync" "sync"
"time" "time"
"github.com/nxshock/go-eta"
) )
// WorkerPool represents pool of workers. // WorkerPool represents pool of workers.
@ -15,16 +18,21 @@ type WorkerPool struct {
stopChan chan struct{} stopChan chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
EstimateCount int estimateCount int
ShowProgress bool ShowProgress bool
ShowSpeed bool
processedCount int // processed jobs count processedCount int // processed jobs count
errorCount int // processed jobs count that returned error errorCount int // processed jobs count that returned error
currentSpeed float64 // speed calculated for last minute currentSpeed float64 // speed calculated for last minute
eta *eta.Calculator
lastProgressMessage string
} }
// New creates new pool of workers with specified goroutine count. // New creates new pool of workers with specified goroutine count.
// If specified number of workers less than 1, runtume.NumCPU() is used. // If specified number of workers less than 1, runtime.NumCPU() is used.
func New(threadCount int) *WorkerPool { func New(threadCount int) *WorkerPool {
if threadCount <= 0 { if threadCount <= 0 {
threadCount = runtime.NumCPU() threadCount = runtime.NumCPU()
@ -33,7 +41,8 @@ func New(threadCount int) *WorkerPool {
workerPool := &WorkerPool{ workerPool := &WorkerPool{
jobChan: make(chan func() error), jobChan: make(chan func() error),
resultChan: make(chan error), resultChan: make(chan error),
stopChan: make(chan struct{})} stopChan: make(chan struct{}),
eta: eta.New(0)}
workerPool.wg.Add(threadCount) workerPool.wg.Add(threadCount)
@ -48,13 +57,19 @@ func New(threadCount int) *WorkerPool {
tickerCalculateEta.Stop() tickerCalculateEta.Stop()
}() }()
fmt.Fprintf(os.Stderr, endLine) newLined := false
for { for {
select { select {
case <-tickerUpdateText.C: case <-tickerUpdateText.C:
workerPool.printProgress() if !newLined {
fmt.Fprint(os.Stderr, endLine)
newLined = true
}
_ = workerPool.printProgress()
case <-tickerCalculateEta.C: case <-tickerCalculateEta.C:
workerPool.currentSpeed = float64(workerPool.processedCount-prevPos) * float64(time.Second) / float64(time.Now().Sub(prevTime)) workerPool.currentSpeed = float64(workerPool.processedCount-prevPos) * float64(time.Second) / float64(time.Since(prevTime))
prevPos = workerPool.processedCount prevPos = workerPool.processedCount
prevTime = time.Now() prevTime = time.Now()
case err := <-workerPool.resultChan: case err := <-workerPool.resultChan:
@ -62,6 +77,7 @@ func New(threadCount int) *WorkerPool {
workerPool.errorCount++ workerPool.errorCount++
} }
workerPool.processedCount++ workerPool.processedCount++
workerPool.eta.Increment(1)
case <-workerPool.stopChan: case <-workerPool.stopChan:
return return
} }
@ -81,24 +97,54 @@ func New(threadCount int) *WorkerPool {
return workerPool return workerPool
} }
func (workerPool *WorkerPool) printProgress() { func (workerPool *WorkerPool) SetEstimateCount(n int) {
workerPool.estimateCount = n
workerPool.eta.TotalCount = n
}
func (workerPool *WorkerPool) printProgress() error {
if !workerPool.ShowProgress { if !workerPool.ShowProgress {
return return nil
} }
fmt.Fprintf(os.Stderr, newLine) buf := new(bytes.Buffer)
fmt.Fprintf(os.Stderr, "Progress: %.1f%% (%d / %d)",
float64(workerPool.processedCount*100)/float64(workerPool.EstimateCount), workerPool.processedCount, workerPool.EstimateCount) fmt.Fprint(buf, newLine)
if workerPool.estimateCount == 0 {
fmt.Fprintf(buf, "Progress: %d", workerPool.processedCount)
} else {
fmt.Fprintf(buf, "Progress: %.1f%% (%d / %d)",
float64(workerPool.processedCount*100)/float64(workerPool.estimateCount), workerPool.processedCount, workerPool.estimateCount)
}
if workerPool.errorCount > 0 { if workerPool.errorCount > 0 {
fmt.Fprintf(os.Stderr, " Errors: %d (%.1f%%)", fmt.Fprintf(buf, " Errors: %d (%.1f%%)",
workerPool.errorCount, float64(workerPool.errorCount*100)/float64(workerPool.EstimateCount)) workerPool.errorCount, float64(workerPool.errorCount*100)/float64(workerPool.estimateCount))
} }
if workerPool.EstimateCount > 0 && workerPool.currentSpeed > 0 {
fmt.Fprintf(os.Stderr, " ETA: %s at %.2f rps", if workerPool.estimateCount > 0 {
time.Second*time.Duration(float64(workerPool.EstimateCount-workerPool.processedCount)/workerPool.currentSpeed), workerPool.currentSpeed) fmt.Fprintf(buf, " ETA: %s", fmtDuration(time.Until(workerPool.eta.Average())))
} }
fmt.Fprint(os.Stderr, endLine)
if workerPool.currentSpeed > 0 && workerPool.ShowSpeed {
fmt.Fprintf(buf, " Speed: %.2f rps", workerPool.currentSpeed)
}
fmt.Fprint(buf, endLine)
if buf.String() == workerPool.lastProgressMessage {
return nil
}
workerPool.lastProgressMessage = buf.String()
_, err := buf.WriteTo(os.Stderr)
if err != nil {
return err
}
return nil
} }
// Add sends specified task for execution. // Add sends specified task for execution.
@ -113,10 +159,10 @@ func (workerPool *WorkerPool) CloseAndWait() {
workerPool.stopChan <- struct{}{} workerPool.stopChan <- struct{}{}
close(workerPool.resultChan) close(workerPool.resultChan)
workerPool.printProgress() _ = workerPool.printProgress()
} }
// ErrorCount returns total error count // ErrorCount returns total error count.
func (workerPool *WorkerPool) ErrorCount() int { func (workerPool *WorkerPool) ErrorCount() int {
return workerPool.errorCount return workerPool.errorCount
} }