mirror of
https://github.com/nxshock/gwp.git
synced 2025-04-19 01:11:51 +05:00
Compare commits
11 Commits
Author | SHA1 | Date | |
---|---|---|---|
161075bb83 | |||
0e63308e92 | |||
a43e9e76b2 | |||
aba812a470 | |||
0d6ffec530 | |||
a2986a9485 | |||
54dac42925 | |||
928cd97576 | |||
bd74140174 | |||
21c13e0864 | |||
5b2fc6d56d |
@ -29,8 +29,10 @@ func f(i int) error {
|
||||
func main() {
|
||||
|
||||
worker := gwp.New(4) // Create pool with specified number of workers
|
||||
|
||||
worker.ShowProgress = true // Enable progress indicator
|
||||
worker.EstimateCount = 100 // Set total number on jobs to calculate ETA
|
||||
worker.ShowSpeed = true // Show processing speed in progress indicator
|
||||
worker.SetEstimateCount(100) // Set total number on jobs to calculate ETA
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
n := i
|
||||
|
5
go.mod
5
go.mod
@ -1,3 +1,8 @@
|
||||
module github.com/nxshock/gwp
|
||||
|
||||
go 1.16
|
||||
|
||||
require (
|
||||
github.com/nxshock/go-eta v0.1.1
|
||||
github.com/stretchr/testify v1.8.0
|
||||
)
|
||||
|
18
go.sum
Normal file
18
go.sum
Normal file
@ -0,0 +1,18 @@
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/nxshock/go-eta v0.1.0 h1:rcy8rNPaVeL3f3c5rJRLvho0FErUJThD0+vihxA7t9c=
|
||||
github.com/nxshock/go-eta v0.1.0/go.mod h1:hZ59FHIJSpPeZk38pd/3zUkI4H31hAQ2oFw+NgdU+SA=
|
||||
github.com/nxshock/go-eta v0.1.1 h1:N4wi3wjrXJGiheYF4zBIjOwugM+++NWsB1fpeRZaHS4=
|
||||
github.com/nxshock/go-eta v0.1.1/go.mod h1:hZ59FHIJSpPeZk38pd/3zUkI4H31hAQ2oFw+NgdU+SA=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
81
gwp.go
81
gwp.go
@ -1,11 +1,14 @@
|
||||
package gwp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/nxshock/go-eta"
|
||||
)
|
||||
|
||||
// WorkerPool represents pool of workers.
|
||||
@ -15,16 +18,21 @@ type WorkerPool struct {
|
||||
stopChan chan struct{}
|
||||
wg sync.WaitGroup
|
||||
|
||||
EstimateCount int
|
||||
estimateCount int
|
||||
ShowProgress bool
|
||||
ShowSpeed bool
|
||||
|
||||
processedCount int // processed jobs count
|
||||
errorCount int // processed jobs count that returned error
|
||||
currentSpeed float64 // speed calculated for last minute
|
||||
|
||||
eta *eta.Calculator
|
||||
|
||||
lastProgressMessage string
|
||||
}
|
||||
|
||||
// New creates new pool of workers with specified goroutine count.
|
||||
// If specified number of workers less than 1, runtume.NumCPU() is used.
|
||||
// If specified number of workers less than 1, runtime.NumCPU() is used.
|
||||
func New(threadCount int) *WorkerPool {
|
||||
if threadCount <= 0 {
|
||||
threadCount = runtime.NumCPU()
|
||||
@ -33,7 +41,8 @@ func New(threadCount int) *WorkerPool {
|
||||
workerPool := &WorkerPool{
|
||||
jobChan: make(chan func() error),
|
||||
resultChan: make(chan error),
|
||||
stopChan: make(chan struct{})}
|
||||
stopChan: make(chan struct{}),
|
||||
eta: eta.New(0)}
|
||||
|
||||
workerPool.wg.Add(threadCount)
|
||||
|
||||
@ -48,13 +57,19 @@ func New(threadCount int) *WorkerPool {
|
||||
tickerCalculateEta.Stop()
|
||||
}()
|
||||
|
||||
fmt.Fprintf(os.Stderr, endLine)
|
||||
newLined := false
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-tickerUpdateText.C:
|
||||
workerPool.printProgress()
|
||||
if !newLined {
|
||||
fmt.Fprint(os.Stderr, endLine)
|
||||
newLined = true
|
||||
}
|
||||
|
||||
_ = workerPool.printProgress()
|
||||
case <-tickerCalculateEta.C:
|
||||
workerPool.currentSpeed = float64(workerPool.processedCount-prevPos) * float64(time.Second) / float64(time.Now().Sub(prevTime))
|
||||
workerPool.currentSpeed = float64(workerPool.processedCount-prevPos) * float64(time.Second) / float64(time.Since(prevTime))
|
||||
prevPos = workerPool.processedCount
|
||||
prevTime = time.Now()
|
||||
case err := <-workerPool.resultChan:
|
||||
@ -62,6 +77,7 @@ func New(threadCount int) *WorkerPool {
|
||||
workerPool.errorCount++
|
||||
}
|
||||
workerPool.processedCount++
|
||||
workerPool.eta.Increment(1)
|
||||
case <-workerPool.stopChan:
|
||||
return
|
||||
}
|
||||
@ -81,29 +97,54 @@ func New(threadCount int) *WorkerPool {
|
||||
return workerPool
|
||||
}
|
||||
|
||||
func (workerPool *WorkerPool) printProgress() {
|
||||
func (workerPool *WorkerPool) SetEstimateCount(n int) {
|
||||
workerPool.estimateCount = n
|
||||
workerPool.eta.TotalCount = n
|
||||
}
|
||||
|
||||
func (workerPool *WorkerPool) printProgress() error {
|
||||
if !workerPool.ShowProgress {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
fmt.Fprintf(os.Stderr, newLine)
|
||||
buf := new(bytes.Buffer)
|
||||
|
||||
if workerPool.EstimateCount == 0 {
|
||||
fmt.Fprintf(os.Stderr, "Progress: %d", workerPool.processedCount)
|
||||
fmt.Fprint(buf, newLine)
|
||||
|
||||
if workerPool.estimateCount == 0 {
|
||||
fmt.Fprintf(buf, "Progress: %d", workerPool.processedCount)
|
||||
} else {
|
||||
fmt.Fprintf(os.Stderr, "Progress: %.1f%% (%d / %d)",
|
||||
float64(workerPool.processedCount*100)/float64(workerPool.EstimateCount), workerPool.processedCount, workerPool.EstimateCount)
|
||||
fmt.Fprintf(buf, "Progress: %.1f%% (%d / %d)",
|
||||
float64(workerPool.processedCount*100)/float64(workerPool.estimateCount), workerPool.processedCount, workerPool.estimateCount)
|
||||
}
|
||||
|
||||
if workerPool.errorCount > 0 {
|
||||
fmt.Fprintf(os.Stderr, " Errors: %d (%.1f%%)",
|
||||
workerPool.errorCount, float64(workerPool.errorCount*100)/float64(workerPool.EstimateCount))
|
||||
fmt.Fprintf(buf, " Errors: %d (%.1f%%)",
|
||||
workerPool.errorCount, float64(workerPool.errorCount*100)/float64(workerPool.estimateCount))
|
||||
}
|
||||
if workerPool.EstimateCount > 0 && workerPool.currentSpeed > 0 {
|
||||
fmt.Fprintf(os.Stderr, " ETA: %s at %.2f rps",
|
||||
fmtDuration(time.Second*time.Duration(float64(workerPool.EstimateCount-workerPool.processedCount)/workerPool.currentSpeed)), workerPool.currentSpeed)
|
||||
|
||||
if workerPool.estimateCount > 0 {
|
||||
fmt.Fprintf(buf, " ETA: %s", fmtDuration(time.Until(workerPool.eta.Average())))
|
||||
}
|
||||
fmt.Fprint(os.Stderr, endLine)
|
||||
|
||||
if workerPool.currentSpeed > 0 && workerPool.ShowSpeed {
|
||||
fmt.Fprintf(buf, " Speed: %.2f rps", workerPool.currentSpeed)
|
||||
}
|
||||
|
||||
fmt.Fprint(buf, endLine)
|
||||
|
||||
if buf.String() == workerPool.lastProgressMessage {
|
||||
return nil
|
||||
}
|
||||
|
||||
workerPool.lastProgressMessage = buf.String()
|
||||
|
||||
_, err := buf.WriteTo(os.Stderr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Add sends specified task for execution.
|
||||
@ -118,7 +159,7 @@ func (workerPool *WorkerPool) CloseAndWait() {
|
||||
workerPool.stopChan <- struct{}{}
|
||||
close(workerPool.resultChan)
|
||||
|
||||
workerPool.printProgress()
|
||||
_ = workerPool.printProgress()
|
||||
}
|
||||
|
||||
// ErrorCount returns total error count.
|
||||
|
Loading…
x
Reference in New Issue
Block a user