1
0
mirror of https://github.com/nxshock/gwp.git synced 2025-01-18 11:21:10 +05:00

Upload code

This commit is contained in:
nxshock 2021-02-01 20:50:40 +05:00
parent 093e7af9ed
commit 01600e9439
4 changed files with 111 additions and 1 deletions

View File

@ -1,2 +1,5 @@
# gwp
Simple goroutine worker pool written in Go
Simple goroutine worker pool written in Go.
Status: Work in progress.

6
consts_linux.go Normal file
View File

@ -0,0 +1,6 @@
package gwp
const (
newLine = "\033[1A\033[2K"
endLine = "\n"
)

6
consts_windows.go Normal file
View File

@ -0,0 +1,6 @@
package gwp
const (
newLine = "\r"
endLine = ""
)

95
gwp.go Normal file
View File

@ -0,0 +1,95 @@
package gwp
import (
"fmt"
"os"
"runtime"
"sync"
"time"
)
// WorkerPool represents pool of workers.
type WorkerPool struct {
f chan func()
r chan struct{}
stopChan chan struct{}
wg sync.WaitGroup
EstimateCount int
}
// New creates new pool of workers with specified goroutine count.
func New(threadCount int) *WorkerPool {
if threadCount <= 0 {
threadCount = runtime.NumCPU()
}
workerPool := &WorkerPool{
f: make(chan func()),
r: make(chan struct{}),
stopChan: make(chan struct{})}
workerPool.wg.Add(threadCount)
go func() {
var counter int
var prevPos int
prevTime := time.Now()
const calculateEtaPeriod = time.Minute
tickerUpdateText := time.NewTicker(time.Second)
tickerCalculateEta := time.NewTicker(calculateEtaPeriod)
defer func() {
tickerUpdateText.Stop()
tickerCalculateEta.Stop()
}()
var currentSpeed float64 // items per sec
fmt.Fprintf(os.Stderr, endLine)
for {
select {
case <-tickerUpdateText.C:
if workerPool.EstimateCount > 0 {
fmt.Fprintf(os.Stderr, newLine)
fmt.Fprintf(os.Stderr, "%.1f%% (%d / %d) ETA: %s at %.2f speed"+endLine, float64(counter*100)/float64(workerPool.EstimateCount), counter, workerPool.EstimateCount,
time.Second*time.Duration(float64(workerPool.EstimateCount-counter)/currentSpeed), currentSpeed)
}
case <-tickerCalculateEta.C:
currentSpeed = float64(counter-prevPos) * float64(time.Second) / float64(time.Now().Sub(prevTime))
prevPos = counter
prevTime = time.Now()
case <-workerPool.r:
counter++
case <-workerPool.stopChan:
break
}
}
}()
for i := 0; i < threadCount; i++ {
go func() {
defer workerPool.wg.Done()
for f := range workerPool.f {
f()
workerPool.r <- struct{}{}
}
}()
}
return workerPool
}
// Add sends specified task for execution.
func (workerPool *WorkerPool) Add(f func()) {
workerPool.f <- f
}
// CloseAndWait stops accepting tasks and waits for all tasks to complete.
func (workerPool *WorkerPool) CloseAndWait() {
close(workerPool.f)
workerPool.wg.Wait()
workerPool.stopChan <- struct{}{}
close(workerPool.r)
}