From 01600e9439319531c77402c9083bef53a3395cb5 Mon Sep 17 00:00:00 2001 From: nxshock Date: Mon, 1 Feb 2021 20:50:40 +0500 Subject: [PATCH] Upload code --- README.md | 5 ++- consts_linux.go | 6 +++ consts_windows.go | 6 +++ gwp.go | 95 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 consts_linux.go create mode 100644 consts_windows.go create mode 100644 gwp.go diff --git a/README.md b/README.md index 0b95a11..c738b9e 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,5 @@ # gwp - Simple goroutine worker pool written in Go + +Simple goroutine worker pool written in Go. + +Status: Work in progress. diff --git a/consts_linux.go b/consts_linux.go new file mode 100644 index 0000000..b0021ae --- /dev/null +++ b/consts_linux.go @@ -0,0 +1,6 @@ +package gwp + +const ( + newLine = "\033[1A\033[2K" + endLine = "\n" +) diff --git a/consts_windows.go b/consts_windows.go new file mode 100644 index 0000000..5bc5245 --- /dev/null +++ b/consts_windows.go @@ -0,0 +1,6 @@ +package gwp + +const ( + newLine = "\r" + endLine = "" +) diff --git a/gwp.go b/gwp.go new file mode 100644 index 0000000..4acc7fc --- /dev/null +++ b/gwp.go @@ -0,0 +1,95 @@ +package gwp + +import ( + "fmt" + "os" + "runtime" + "sync" + "time" +) + +// WorkerPool represents pool of workers. +type WorkerPool struct { + f chan func() + r chan struct{} + stopChan chan struct{} + wg sync.WaitGroup + EstimateCount int +} + +// New creates new pool of workers with specified goroutine count. +func New(threadCount int) *WorkerPool { + if threadCount <= 0 { + threadCount = runtime.NumCPU() + } + + workerPool := &WorkerPool{ + f: make(chan func()), + r: make(chan struct{}), + stopChan: make(chan struct{})} + + workerPool.wg.Add(threadCount) + + go func() { + var counter int + var prevPos int + prevTime := time.Now() + + const calculateEtaPeriod = time.Minute + + tickerUpdateText := time.NewTicker(time.Second) + tickerCalculateEta := time.NewTicker(calculateEtaPeriod) + defer func() { + tickerUpdateText.Stop() + tickerCalculateEta.Stop() + }() + + var currentSpeed float64 // items per sec + + fmt.Fprintf(os.Stderr, endLine) + for { + select { + case <-tickerUpdateText.C: + if workerPool.EstimateCount > 0 { + fmt.Fprintf(os.Stderr, newLine) + fmt.Fprintf(os.Stderr, "%.1f%% (%d / %d) ETA: %s at %.2f speed"+endLine, float64(counter*100)/float64(workerPool.EstimateCount), counter, workerPool.EstimateCount, + time.Second*time.Duration(float64(workerPool.EstimateCount-counter)/currentSpeed), currentSpeed) + } + case <-tickerCalculateEta.C: + currentSpeed = float64(counter-prevPos) * float64(time.Second) / float64(time.Now().Sub(prevTime)) + prevPos = counter + prevTime = time.Now() + case <-workerPool.r: + counter++ + case <-workerPool.stopChan: + break + } + } + }() + + for i := 0; i < threadCount; i++ { + go func() { + defer workerPool.wg.Done() + + for f := range workerPool.f { + f() + workerPool.r <- struct{}{} + } + }() + } + + return workerPool +} + +// Add sends specified task for execution. +func (workerPool *WorkerPool) Add(f func()) { + workerPool.f <- f +} + +// CloseAndWait stops accepting tasks and waits for all tasks to complete. +func (workerPool *WorkerPool) CloseAndWait() { + close(workerPool.f) + workerPool.wg.Wait() + workerPool.stopChan <- struct{}{} + close(workerPool.r) +}