1
0
mirror of https://github.com/nxshock/gwp.git synced 2024-11-27 03:31:02 +05:00
gwp/gwp.go

155 lines
3.5 KiB
Go
Raw Normal View History

2021-02-01 20:50:40 +05:00
package gwp
import (
"bytes"
2021-02-01 20:50:40 +05:00
"fmt"
"os"
"runtime"
"sync"
"time"
)
// WorkerPool represents pool of workers.
type WorkerPool struct {
2021-03-27 10:52:23 +05:00
jobChan chan func() error
resultChan chan error
stopChan chan struct{}
wg sync.WaitGroup
2021-02-01 20:50:40 +05:00
EstimateCount int
2021-03-27 10:52:23 +05:00
ShowProgress bool
2022-03-08 17:58:03 +05:00
ShowSpeed bool
2021-02-04 21:23:43 +05:00
processedCount int // processed jobs count
errorCount int // processed jobs count that returned error
currentSpeed float64 // speed calculated for last minute
lastProgressMessage string
2021-02-01 20:50:40 +05:00
}
// New creates new pool of workers with specified goroutine count.
2022-06-24 21:05:30 +05:00
// If specified number of workers less than 1, runtime.NumCPU() is used.
2021-02-01 20:50:40 +05:00
func New(threadCount int) *WorkerPool {
if threadCount <= 0 {
threadCount = runtime.NumCPU()
}
workerPool := &WorkerPool{
2021-02-04 21:24:40 +05:00
jobChan: make(chan func() error),
resultChan: make(chan error),
stopChan: make(chan struct{})}
2021-02-01 20:50:40 +05:00
workerPool.wg.Add(threadCount)
go func() {
var prevPos int
prevTime := time.Now()
2021-02-03 20:57:44 +05:00
tickerUpdateText := time.NewTicker(defaultProgressUpdatePeriod)
tickerCalculateEta := time.NewTicker(defaultCalculateEtaPeriod)
2021-02-01 20:50:40 +05:00
defer func() {
tickerUpdateText.Stop()
tickerCalculateEta.Stop()
}()
newLined := false
2021-02-01 20:50:40 +05:00
for {
select {
case <-tickerUpdateText.C:
if !newLined {
fmt.Fprintf(os.Stderr, endLine)
newLined = true
}
2021-02-04 21:23:43 +05:00
workerPool.printProgress()
2021-02-01 20:50:40 +05:00
case <-tickerCalculateEta.C:
2021-02-05 17:55:21 +05:00
workerPool.currentSpeed = float64(workerPool.processedCount-prevPos) * float64(time.Second) / float64(time.Now().Sub(prevTime))
2021-02-04 21:23:43 +05:00
prevPos = workerPool.processedCount
2021-02-01 20:50:40 +05:00
prevTime = time.Now()
2021-02-04 21:24:40 +05:00
case err := <-workerPool.resultChan:
if err != nil {
2021-02-04 21:23:43 +05:00
workerPool.errorCount++
}
2021-02-04 21:23:43 +05:00
workerPool.processedCount++
2021-02-01 20:50:40 +05:00
case <-workerPool.stopChan:
2021-02-04 21:23:43 +05:00
return
2021-02-01 20:50:40 +05:00
}
}
}()
for i := 0; i < threadCount; i++ {
go func() {
defer workerPool.wg.Done()
2021-02-04 21:24:40 +05:00
for f := range workerPool.jobChan {
workerPool.resultChan <- f()
2021-02-01 20:50:40 +05:00
}
}()
}
return workerPool
}
2021-02-04 21:23:43 +05:00
func (workerPool *WorkerPool) printProgress() {
2021-03-27 10:52:23 +05:00
if !workerPool.ShowProgress {
2021-02-04 21:23:43 +05:00
return
}
buf := new(bytes.Buffer)
fmt.Fprintf(buf, newLine)
2021-11-04 19:01:40 +05:00
if workerPool.EstimateCount == 0 {
fmt.Fprintf(buf, "Progress: %d", workerPool.processedCount)
2021-11-04 19:01:40 +05:00
} else {
fmt.Fprintf(buf, "Progress: %.1f%% (%d / %d)",
2021-11-04 19:01:40 +05:00
float64(workerPool.processedCount*100)/float64(workerPool.EstimateCount), workerPool.processedCount, workerPool.EstimateCount)
}
2021-02-04 21:23:43 +05:00
if workerPool.errorCount > 0 {
fmt.Fprintf(buf, " Errors: %d (%.1f%%)",
2021-02-04 21:23:43 +05:00
workerPool.errorCount, float64(workerPool.errorCount*100)/float64(workerPool.EstimateCount))
}
2022-03-08 17:58:03 +05:00
if workerPool.currentSpeed > 0 {
if workerPool.EstimateCount > 0 {
fmt.Fprintf(buf, " ETA: %s", fmtDuration(time.Second*time.Duration(float64(workerPool.EstimateCount-workerPool.processedCount)/workerPool.currentSpeed)))
2022-03-08 17:58:03 +05:00
}
if workerPool.ShowSpeed {
fmt.Fprintf(buf, " Speed: %.2f rps", workerPool.currentSpeed)
2022-03-08 17:58:03 +05:00
}
2021-02-04 21:23:43 +05:00
}
2022-03-08 17:58:03 +05:00
fmt.Fprint(buf, endLine)
if buf.String() == workerPool.lastProgressMessage {
return
}
buf.WriteTo(os.Stderr)
workerPool.lastProgressMessage = buf.String()
2021-02-04 21:23:43 +05:00
}
2021-02-01 20:50:40 +05:00
// Add sends specified task for execution.
func (workerPool *WorkerPool) Add(f func() error) {
2021-02-04 21:24:40 +05:00
workerPool.jobChan <- f
2021-02-01 20:50:40 +05:00
}
// CloseAndWait stops accepting tasks and waits for all tasks to complete.
func (workerPool *WorkerPool) CloseAndWait() {
2021-02-04 21:24:40 +05:00
close(workerPool.jobChan)
2021-02-01 20:50:40 +05:00
workerPool.wg.Wait()
workerPool.stopChan <- struct{}{}
2021-02-04 21:24:40 +05:00
close(workerPool.resultChan)
2021-02-04 21:23:43 +05:00
workerPool.printProgress()
2021-02-01 20:50:40 +05:00
}
2021-03-27 11:19:44 +05:00
2022-01-08 18:40:23 +05:00
// ErrorCount returns total error count.
2021-03-27 11:19:44 +05:00
func (workerPool *WorkerPool) ErrorCount() int {
return workerPool.errorCount
}