2021-02-01 20:50:40 +05:00
|
|
|
package gwp
|
|
|
|
|
|
|
|
import (
|
2022-06-25 17:53:32 +05:00
|
|
|
"bytes"
|
2021-02-01 20:50:40 +05:00
|
|
|
"fmt"
|
|
|
|
"os"
|
|
|
|
"runtime"
|
|
|
|
"sync"
|
|
|
|
"time"
|
2022-08-06 22:13:22 +05:00
|
|
|
|
|
|
|
"github.com/nxshock/go-eta"
|
2021-02-01 20:50:40 +05:00
|
|
|
)
|
|
|
|
|
|
|
|
// WorkerPool represents pool of workers.
|
|
|
|
type WorkerPool struct {
|
2021-03-27 10:52:23 +05:00
|
|
|
jobChan chan func() error
|
|
|
|
resultChan chan error
|
|
|
|
stopChan chan struct{}
|
|
|
|
wg sync.WaitGroup
|
|
|
|
|
2022-08-06 22:13:22 +05:00
|
|
|
estimateCount int
|
2021-03-27 10:52:23 +05:00
|
|
|
ShowProgress bool
|
2022-03-08 17:58:03 +05:00
|
|
|
ShowSpeed bool
|
2021-02-04 21:23:43 +05:00
|
|
|
|
|
|
|
processedCount int // processed jobs count
|
|
|
|
errorCount int // processed jobs count that returned error
|
|
|
|
currentSpeed float64 // speed calculated for last minute
|
2022-06-25 17:53:32 +05:00
|
|
|
|
2022-08-06 22:13:22 +05:00
|
|
|
eta *eta.Calculator
|
|
|
|
|
2022-06-25 17:53:32 +05:00
|
|
|
lastProgressMessage string
|
2021-02-01 20:50:40 +05:00
|
|
|
}
|
|
|
|
|
|
|
|
// New creates new pool of workers with specified goroutine count.
|
2022-06-24 21:05:30 +05:00
|
|
|
// If specified number of workers less than 1, runtime.NumCPU() is used.
|
2021-02-01 20:50:40 +05:00
|
|
|
func New(threadCount int) *WorkerPool {
|
|
|
|
if threadCount <= 0 {
|
|
|
|
threadCount = runtime.NumCPU()
|
|
|
|
}
|
|
|
|
|
|
|
|
workerPool := &WorkerPool{
|
2021-02-04 21:24:40 +05:00
|
|
|
jobChan: make(chan func() error),
|
|
|
|
resultChan: make(chan error),
|
2022-08-06 22:13:22 +05:00
|
|
|
stopChan: make(chan struct{}),
|
2022-08-20 11:55:26 +05:00
|
|
|
eta: eta.New(0)}
|
2021-02-01 20:50:40 +05:00
|
|
|
|
|
|
|
workerPool.wg.Add(threadCount)
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
var prevPos int
|
|
|
|
prevTime := time.Now()
|
|
|
|
|
2021-02-03 20:57:44 +05:00
|
|
|
tickerUpdateText := time.NewTicker(defaultProgressUpdatePeriod)
|
|
|
|
tickerCalculateEta := time.NewTicker(defaultCalculateEtaPeriod)
|
2021-02-01 20:50:40 +05:00
|
|
|
defer func() {
|
|
|
|
tickerUpdateText.Stop()
|
|
|
|
tickerCalculateEta.Stop()
|
|
|
|
}()
|
|
|
|
|
2022-03-07 16:10:43 +05:00
|
|
|
newLined := false
|
|
|
|
|
2021-02-01 20:50:40 +05:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-tickerUpdateText.C:
|
2022-03-07 16:10:43 +05:00
|
|
|
if !newLined {
|
2022-08-06 22:13:22 +05:00
|
|
|
fmt.Fprint(os.Stderr, endLine)
|
2022-03-07 16:10:43 +05:00
|
|
|
newLined = true
|
|
|
|
}
|
|
|
|
|
2022-08-06 22:13:22 +05:00
|
|
|
_ = workerPool.printProgress()
|
2021-02-01 20:50:40 +05:00
|
|
|
case <-tickerCalculateEta.C:
|
2022-08-06 22:13:22 +05:00
|
|
|
workerPool.currentSpeed = float64(workerPool.processedCount-prevPos) * float64(time.Second) / float64(time.Since(prevTime))
|
2021-02-04 21:23:43 +05:00
|
|
|
prevPos = workerPool.processedCount
|
2021-02-01 20:50:40 +05:00
|
|
|
prevTime = time.Now()
|
2021-02-04 21:24:40 +05:00
|
|
|
case err := <-workerPool.resultChan:
|
2021-02-03 00:30:47 +05:00
|
|
|
if err != nil {
|
2021-02-04 21:23:43 +05:00
|
|
|
workerPool.errorCount++
|
2021-02-03 00:30:47 +05:00
|
|
|
}
|
2021-02-04 21:23:43 +05:00
|
|
|
workerPool.processedCount++
|
2022-08-06 22:13:22 +05:00
|
|
|
workerPool.eta.Increment(1)
|
2021-02-01 20:50:40 +05:00
|
|
|
case <-workerPool.stopChan:
|
2021-02-04 21:23:43 +05:00
|
|
|
return
|
2021-02-01 20:50:40 +05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
for i := 0; i < threadCount; i++ {
|
|
|
|
go func() {
|
|
|
|
defer workerPool.wg.Done()
|
|
|
|
|
2021-02-04 21:24:40 +05:00
|
|
|
for f := range workerPool.jobChan {
|
|
|
|
workerPool.resultChan <- f()
|
2021-02-01 20:50:40 +05:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
|
|
|
return workerPool
|
|
|
|
}
|
|
|
|
|
2022-08-06 22:13:22 +05:00
|
|
|
func (workerPool *WorkerPool) SetEstimateCount(n int) {
|
|
|
|
workerPool.estimateCount = n
|
|
|
|
workerPool.eta.TotalCount = n
|
|
|
|
}
|
|
|
|
|
|
|
|
func (workerPool *WorkerPool) printProgress() error {
|
2021-03-27 10:52:23 +05:00
|
|
|
if !workerPool.ShowProgress {
|
2022-08-06 22:13:22 +05:00
|
|
|
return nil
|
2021-02-04 21:23:43 +05:00
|
|
|
}
|
|
|
|
|
2022-06-25 17:53:32 +05:00
|
|
|
buf := new(bytes.Buffer)
|
|
|
|
|
2022-08-06 22:13:22 +05:00
|
|
|
fmt.Fprint(buf, newLine)
|
2021-11-04 19:01:40 +05:00
|
|
|
|
2022-08-06 22:13:22 +05:00
|
|
|
if workerPool.estimateCount == 0 {
|
2022-06-25 17:53:32 +05:00
|
|
|
fmt.Fprintf(buf, "Progress: %d", workerPool.processedCount)
|
2021-11-04 19:01:40 +05:00
|
|
|
} else {
|
2022-06-25 17:53:32 +05:00
|
|
|
fmt.Fprintf(buf, "Progress: %.1f%% (%d / %d)",
|
2022-08-06 22:13:22 +05:00
|
|
|
float64(workerPool.processedCount*100)/float64(workerPool.estimateCount), workerPool.processedCount, workerPool.estimateCount)
|
2021-11-04 19:01:40 +05:00
|
|
|
}
|
2021-02-04 21:23:43 +05:00
|
|
|
|
|
|
|
if workerPool.errorCount > 0 {
|
2022-06-25 17:53:32 +05:00
|
|
|
fmt.Fprintf(buf, " Errors: %d (%.1f%%)",
|
2022-08-06 22:13:22 +05:00
|
|
|
workerPool.errorCount, float64(workerPool.errorCount*100)/float64(workerPool.estimateCount))
|
2021-02-04 21:23:43 +05:00
|
|
|
}
|
2022-03-08 17:58:03 +05:00
|
|
|
|
2022-08-07 10:15:27 +05:00
|
|
|
if workerPool.estimateCount > 0 {
|
|
|
|
fmt.Fprintf(buf, " ETA: %s", fmtDuration(time.Until(workerPool.eta.Average())))
|
|
|
|
}
|
2022-03-08 17:58:03 +05:00
|
|
|
|
2022-08-07 10:15:27 +05:00
|
|
|
if workerPool.currentSpeed > 0 && workerPool.ShowSpeed {
|
|
|
|
fmt.Fprintf(buf, " Speed: %.2f rps", workerPool.currentSpeed)
|
2021-02-04 21:23:43 +05:00
|
|
|
}
|
2022-03-08 17:58:03 +05:00
|
|
|
|
2022-06-25 17:53:32 +05:00
|
|
|
fmt.Fprint(buf, endLine)
|
|
|
|
|
|
|
|
if buf.String() == workerPool.lastProgressMessage {
|
2022-08-06 22:13:22 +05:00
|
|
|
return nil
|
2022-06-25 17:53:32 +05:00
|
|
|
}
|
|
|
|
|
|
|
|
workerPool.lastProgressMessage = buf.String()
|
2022-06-26 18:24:12 +05:00
|
|
|
|
2022-08-06 22:13:22 +05:00
|
|
|
_, err := buf.WriteTo(os.Stderr)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
2021-02-04 21:23:43 +05:00
|
|
|
}
|
|
|
|
|
2021-02-01 20:50:40 +05:00
|
|
|
// Add sends specified task for execution.
|
2021-02-03 00:30:47 +05:00
|
|
|
func (workerPool *WorkerPool) Add(f func() error) {
|
2021-02-04 21:24:40 +05:00
|
|
|
workerPool.jobChan <- f
|
2021-02-01 20:50:40 +05:00
|
|
|
}
|
|
|
|
|
|
|
|
// CloseAndWait stops accepting tasks and waits for all tasks to complete.
|
|
|
|
func (workerPool *WorkerPool) CloseAndWait() {
|
2021-02-04 21:24:40 +05:00
|
|
|
close(workerPool.jobChan)
|
2021-02-01 20:50:40 +05:00
|
|
|
workerPool.wg.Wait()
|
|
|
|
workerPool.stopChan <- struct{}{}
|
2021-02-04 21:24:40 +05:00
|
|
|
close(workerPool.resultChan)
|
2021-02-04 21:23:43 +05:00
|
|
|
|
2022-08-06 22:13:22 +05:00
|
|
|
_ = workerPool.printProgress()
|
2021-02-01 20:50:40 +05:00
|
|
|
}
|
2021-03-27 11:19:44 +05:00
|
|
|
|
2022-01-08 18:40:23 +05:00
|
|
|
// ErrorCount returns total error count.
|
2021-03-27 11:19:44 +05:00
|
|
|
func (workerPool *WorkerPool) ErrorCount() int {
|
|
|
|
return workerPool.errorCount
|
|
|
|
}
|