2021-02-01 20:50:40 +05:00
|
|
|
package gwp
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"os"
|
|
|
|
"runtime"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
|
|
|
// WorkerPool represents pool of workers.
|
|
|
|
type WorkerPool struct {
|
2021-03-27 10:52:23 +05:00
|
|
|
jobChan chan func() error
|
|
|
|
resultChan chan error
|
|
|
|
stopChan chan struct{}
|
|
|
|
wg sync.WaitGroup
|
|
|
|
|
2021-02-01 20:50:40 +05:00
|
|
|
EstimateCount int
|
2021-03-27 10:52:23 +05:00
|
|
|
ShowProgress bool
|
2022-03-08 17:58:03 +05:00
|
|
|
ShowSpeed bool
|
2021-02-04 21:23:43 +05:00
|
|
|
|
|
|
|
processedCount int // processed jobs count
|
|
|
|
errorCount int // processed jobs count that returned error
|
|
|
|
currentSpeed float64 // speed calculated for last minute
|
2021-02-01 20:50:40 +05:00
|
|
|
}
|
|
|
|
|
|
|
|
// New creates new pool of workers with specified goroutine count.
|
2022-06-24 21:05:30 +05:00
|
|
|
// If specified number of workers less than 1, runtime.NumCPU() is used.
|
2021-02-01 20:50:40 +05:00
|
|
|
func New(threadCount int) *WorkerPool {
|
|
|
|
if threadCount <= 0 {
|
|
|
|
threadCount = runtime.NumCPU()
|
|
|
|
}
|
|
|
|
|
|
|
|
workerPool := &WorkerPool{
|
2021-02-04 21:24:40 +05:00
|
|
|
jobChan: make(chan func() error),
|
|
|
|
resultChan: make(chan error),
|
|
|
|
stopChan: make(chan struct{})}
|
2021-02-01 20:50:40 +05:00
|
|
|
|
|
|
|
workerPool.wg.Add(threadCount)
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
var prevPos int
|
|
|
|
prevTime := time.Now()
|
|
|
|
|
2021-02-03 20:57:44 +05:00
|
|
|
tickerUpdateText := time.NewTicker(defaultProgressUpdatePeriod)
|
|
|
|
tickerCalculateEta := time.NewTicker(defaultCalculateEtaPeriod)
|
2021-02-01 20:50:40 +05:00
|
|
|
defer func() {
|
|
|
|
tickerUpdateText.Stop()
|
|
|
|
tickerCalculateEta.Stop()
|
|
|
|
}()
|
|
|
|
|
2022-03-07 16:10:43 +05:00
|
|
|
newLined := false
|
|
|
|
|
2021-02-01 20:50:40 +05:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-tickerUpdateText.C:
|
2022-03-07 16:10:43 +05:00
|
|
|
if !newLined {
|
|
|
|
fmt.Fprintf(os.Stderr, endLine)
|
|
|
|
newLined = true
|
|
|
|
}
|
|
|
|
|
2021-02-04 21:23:43 +05:00
|
|
|
workerPool.printProgress()
|
2021-02-01 20:50:40 +05:00
|
|
|
case <-tickerCalculateEta.C:
|
2021-02-05 17:55:21 +05:00
|
|
|
workerPool.currentSpeed = float64(workerPool.processedCount-prevPos) * float64(time.Second) / float64(time.Now().Sub(prevTime))
|
2021-02-04 21:23:43 +05:00
|
|
|
prevPos = workerPool.processedCount
|
2021-02-01 20:50:40 +05:00
|
|
|
prevTime = time.Now()
|
2021-02-04 21:24:40 +05:00
|
|
|
case err := <-workerPool.resultChan:
|
2021-02-03 00:30:47 +05:00
|
|
|
if err != nil {
|
2021-02-04 21:23:43 +05:00
|
|
|
workerPool.errorCount++
|
2021-02-03 00:30:47 +05:00
|
|
|
}
|
2021-02-04 21:23:43 +05:00
|
|
|
workerPool.processedCount++
|
2021-02-01 20:50:40 +05:00
|
|
|
case <-workerPool.stopChan:
|
2021-02-04 21:23:43 +05:00
|
|
|
return
|
2021-02-01 20:50:40 +05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
for i := 0; i < threadCount; i++ {
|
|
|
|
go func() {
|
|
|
|
defer workerPool.wg.Done()
|
|
|
|
|
2021-02-04 21:24:40 +05:00
|
|
|
for f := range workerPool.jobChan {
|
|
|
|
workerPool.resultChan <- f()
|
2021-02-01 20:50:40 +05:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
|
|
|
return workerPool
|
|
|
|
}
|
|
|
|
|
2021-02-04 21:23:43 +05:00
|
|
|
func (workerPool *WorkerPool) printProgress() {
|
2021-03-27 10:52:23 +05:00
|
|
|
if !workerPool.ShowProgress {
|
2021-02-04 21:23:43 +05:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
fmt.Fprintf(os.Stderr, newLine)
|
2021-11-04 19:01:40 +05:00
|
|
|
|
|
|
|
if workerPool.EstimateCount == 0 {
|
|
|
|
fmt.Fprintf(os.Stderr, "Progress: %d", workerPool.processedCount)
|
|
|
|
} else {
|
|
|
|
fmt.Fprintf(os.Stderr, "Progress: %.1f%% (%d / %d)",
|
|
|
|
float64(workerPool.processedCount*100)/float64(workerPool.EstimateCount), workerPool.processedCount, workerPool.EstimateCount)
|
|
|
|
}
|
2021-02-04 21:23:43 +05:00
|
|
|
|
|
|
|
if workerPool.errorCount > 0 {
|
|
|
|
fmt.Fprintf(os.Stderr, " Errors: %d (%.1f%%)",
|
|
|
|
workerPool.errorCount, float64(workerPool.errorCount*100)/float64(workerPool.EstimateCount))
|
|
|
|
}
|
2022-03-08 17:58:03 +05:00
|
|
|
|
|
|
|
if workerPool.currentSpeed > 0 {
|
|
|
|
if workerPool.EstimateCount > 0 {
|
|
|
|
fmt.Fprintf(os.Stderr, " ETA: %s", fmtDuration(time.Second*time.Duration(float64(workerPool.EstimateCount-workerPool.processedCount)/workerPool.currentSpeed)))
|
|
|
|
}
|
|
|
|
|
|
|
|
if workerPool.ShowSpeed {
|
2022-03-08 18:30:44 +05:00
|
|
|
fmt.Fprintf(os.Stderr, " Speed: %.2f rps", workerPool.currentSpeed)
|
2022-03-08 17:58:03 +05:00
|
|
|
}
|
2021-02-04 21:23:43 +05:00
|
|
|
}
|
2022-03-08 17:58:03 +05:00
|
|
|
|
2021-02-04 21:23:43 +05:00
|
|
|
fmt.Fprint(os.Stderr, endLine)
|
|
|
|
}
|
|
|
|
|
2021-02-01 20:50:40 +05:00
|
|
|
// Add sends specified task for execution.
|
2021-02-03 00:30:47 +05:00
|
|
|
func (workerPool *WorkerPool) Add(f func() error) {
|
2021-02-04 21:24:40 +05:00
|
|
|
workerPool.jobChan <- f
|
2021-02-01 20:50:40 +05:00
|
|
|
}
|
|
|
|
|
|
|
|
// CloseAndWait stops accepting tasks and waits for all tasks to complete.
|
|
|
|
func (workerPool *WorkerPool) CloseAndWait() {
|
2021-02-04 21:24:40 +05:00
|
|
|
close(workerPool.jobChan)
|
2021-02-01 20:50:40 +05:00
|
|
|
workerPool.wg.Wait()
|
|
|
|
workerPool.stopChan <- struct{}{}
|
2021-02-04 21:24:40 +05:00
|
|
|
close(workerPool.resultChan)
|
2021-02-04 21:23:43 +05:00
|
|
|
|
|
|
|
workerPool.printProgress()
|
2021-02-01 20:50:40 +05:00
|
|
|
}
|
2021-03-27 11:19:44 +05:00
|
|
|
|
2022-01-08 18:40:23 +05:00
|
|
|
// ErrorCount returns total error count.
|
2021-03-27 11:19:44 +05:00
|
|
|
func (workerPool *WorkerPool) ErrorCount() int {
|
|
|
|
return workerPool.errorCount
|
|
|
|
}
|