Go-Pool
Go-Pool is a lightweight, type-safe, and high-performance worker pool for Go. It simplifies managing concurrent workloads, providing deterministic cleanup, optional retries, and efficient result collection—without the complexity of channels or errgroup
.
Go’s concurrency model is powerful, but building high-throughput, leak-free, and memory-efficient concurrent systems can be tricky. Go-Pool abstracts these challenges, giving developers a clean, minimal API for safe and efficient task execution.
Features
- Efficient worker pool for streamlined concurrent task execution.
-
Type-safe generic Drainer (
Drainer[T]
) for collecting results safely and efficiently. - Retryable tasks with exponential backoff and jitter for transient failures.
- Context-aware execution for safe cancellation.
- Deterministic shutdown—no goroutine leaks.
- Minimal allocations and lock-free designs where possible.
-
Fluent functional composition (
WithRetry
) for flexible task definitions. - High throughput: optimized for millions of tasks with microsecond-friendly operations.
Installation
go get github.com/rubengp99/go-pool
Core Concepts
Concept | Description |
---|---|
Task | A unit of work (func() error ) to be executed concurrently. |
Worker | Interface representing executable tasks, optionally retryable. |
Pool | Manages concurrent execution using WaitGroup and semaphores. |
Drainer[T] | Type-safe collector for task results, optimized for high throughput. |
Retryable | Wraps tasks with automatic retries, including exponential backoff + jitter. |
How it works:
- Each
Worker
runs in its own goroutine, managed by aWaitGroup
. - Concurrency is controlled via a semaphore.
- Shared
context
handles cancellation propagation. -
Drainer[T]
safely collects results in a lock-free or low-allocation linked list. - Resources and channels are deterministically cleaned up upon completion.
Usage Examples
1. Running Simple Concurrent Tasks
package main
import (
"fmt"
"sync/atomic"
gopool "github.com/rubengp99/go-pool"
)
func main() {
var numInvocations uint32
task := gopool.NewTask(func() error {
atomic.AddUint32(&numInvocations, 1)
fmt.Println("Task executed")
return nil
})
pool := gopool.NewPool().WithLimit(2)
err := pool.Go(task, task, task).Wait()
fmt.Println("Tasks completed:", numInvocations) // 3
fmt.Println("Error:", err) // nil
}
Explanation:
Runs multiple tasks concurrently with a pool of 2 workers. Simple demonstration of basic concurrency without retries or result collection.
2. Retryable Tasks
task := gopool.NewTask(func() error {
return fmt.Errorf("temporary error")
}).WithRetry(3, 100*time.Millisecond)
pool := gopool.NewPool()
pool.Go(task).Wait()
Explanation:
Wraps a task with retries using WithRetry
. The task will attempt up to 3 times with exponential backoff (100 ms base) for transient errors.
3. Collecting Results with Drainer
output := gopool.NewDrainer[string]()
tasks := gopool.Workers{
gopool.NewTask(func() error { output.Send("result1"); return nil }),
gopool.NewTask(func() error { output.Send("result2"); return nil }),
}
pool := gopool.NewPool().WithLimit(2)
pool.Go(tasks...).Wait()
results := output.Drain()
fmt.Println("Collected results:", results) // ["result1", "result2"]
Explanation:
Demonstrates type-safe result collection using a Drainer
. Tasks send their results to the drainer, which is drained after execution.
4. Multiple Task Types
type typeA struct{ value string }
type typeB struct{ value float32 }
outputA := gopool.NewDrainer[typeA]()
outputB := gopool.NewDrainer[typeB]()
tasks := gopool.Workers{
gopool.NewTask(func() error { outputA.Send(typeA{"hello"}); return nil }),
gopool.NewTask(func() error { outputB.Send(typeB{42.5}); return nil }),
}
pool := gopool.NewPool().WithLimit(2)
pool.Go(tasks...).Wait()
Explanation:
Illustrates concurrent execution of tasks producing different result types, each collected by a separate type-safe drainer.
5. Complete Example: Retries + Drainers
package main
import (
"fmt"
"sync/atomic"
"time"
gopool "github.com/rubengp99/go-pool"
)
type typeA struct{ value string }
type typeB struct{ value float32 }
func main() {
var totalInvocations uint32
outputA := gopool.NewDrainer[typeA]()
outputB := gopool.NewDrainer[typeB]()
tasks := gopool.Workers{
gopool.NewTask(func() error {
atomic.AddUint32(&totalInvocations, 1)
fmt.Println("Running simple task")
return nil
}),
gopool.NewTask(func() error {
atomic.AddUint32(&totalInvocations, 1)
if atomic.LoadUint32(&totalInvocations)%2 == 0 {
fmt.Println("Task succeeded after retry")
return nil
}
fmt.Println("Task failed, retrying...")
return fmt.Errorf("temporary error")
}).WithRetry(3, 100*time.Millisecond),
gopool.NewTask(func() error {
atomic.AddUint32(&totalInvocations, 1)
outputA.Send(typeA{value: "Hello from typeA!"})
return nil
}),
gopool.NewTask(func() error {
atomic.AddUint32(&totalInvocations, 1)
outputB.Send(typeB{value: 99.99})
return nil
}),
}
pool := gopool.NewPool().WithLimit(2)
if err := pool.Go(tasks...).Wait(); err != nil {
fmt.Println("Pool error:", err)
}
resultsA := outputA.Drain()
resultsB := outputB.Drain()
fmt.Printf("nTotal tasks executed: %dn", totalInvocations)
fmt.Println("Results of typeA:", resultsA)
fmt.Println("Results of typeB:", resultsB)
}
Explanation:
Full-featured example combining simple tasks, retryable tasks, and multiple drainers. Demonstrates heterogeneous workloads, retries, and type-safe result collection in one pool.
Benchmarks
Name | Iterations | ns/op | B/op | allocs/op |
---|---|---|---|---|
ErrGroup | 6,203,892 | 183.5 | 24 | 1 |
GoPool | 6,145,203 | 192.0 | 32 | 1 |
GoPoolWithDrainer | 5,508,412 | 205.4 | 90 | 2 |
ChannelsWithOutputAndErrChannel | 4,461,849 | 262.0 | 72 | 2 |
ChannelsWithWaitGroup | 4,431,901 | 271.8 | 80 | 2 |
ChannelsWithErrGroup | 4,459,243 | 274.8 | 80 | 2 |
MutexWithErrGroup | 2,896,214 | 378.3 | 135 | 2 |
Analysis:
Go-Pool adds only ~8.5 ns per operation versus ErrGroup
, while offering:
- Type safety with generic
Drainer[T]
- Automatic retries with backoff
- Deterministic cleanup
- Concurrent-safe result draining
All with minimal memory overhead, ideal for high-throughput Go applications.
Repository