🚀 Concurrency Made Simple and Safe in Go with go-pool

-concurrency-made-simple-and-safe-in-go-with-go-pool

Go-Pool

Go-Pool is a lightweight, type-safe, and high-performance worker pool for Go. It simplifies managing concurrent workloads, providing deterministic cleanup, optional retries, and efficient result collection—without the complexity of channels or errgroup.

Go’s concurrency model is powerful, but building high-throughput, leak-free, and memory-efficient concurrent systems can be tricky. Go-Pool abstracts these challenges, giving developers a clean, minimal API for safe and efficient task execution.

Features

  • Efficient worker pool for streamlined concurrent task execution.
  • Type-safe generic Drainer (Drainer[T]) for collecting results safely and efficiently.
  • Retryable tasks with exponential backoff and jitter for transient failures.
  • Context-aware execution for safe cancellation.
  • Deterministic shutdown—no goroutine leaks.
  • Minimal allocations and lock-free designs where possible.
  • Fluent functional composition (WithRetry) for flexible task definitions.
  • High throughput: optimized for millions of tasks with microsecond-friendly operations.

Installation

go get github.com/rubengp99/go-pool

Core Concepts

Concept Description
Task A unit of work (func() error) to be executed concurrently.
Worker Interface representing executable tasks, optionally retryable.
Pool Manages concurrent execution using WaitGroup and semaphores.
Drainer[T] Type-safe collector for task results, optimized for high throughput.
Retryable Wraps tasks with automatic retries, including exponential backoff + jitter.

How it works:

  1. Each Worker runs in its own goroutine, managed by a WaitGroup.
  2. Concurrency is controlled via a semaphore.
  3. Shared context handles cancellation propagation.
  4. Drainer[T] safely collects results in a lock-free or low-allocation linked list.
  5. Resources and channels are deterministically cleaned up upon completion.

Usage Examples

1. Running Simple Concurrent Tasks

package main

import (
    "fmt"
    "sync/atomic"
    gopool "github.com/rubengp99/go-pool"
)

func main() {
    var numInvocations uint32
    task := gopool.NewTask(func() error {
        atomic.AddUint32(&numInvocations, 1)
        fmt.Println("Task executed")
        return nil
    })

    pool := gopool.NewPool().WithLimit(2)
    err := pool.Go(task, task, task).Wait()

    fmt.Println("Tasks completed:", numInvocations) // 3
    fmt.Println("Error:", err)                       // nil
}

Explanation:
Runs multiple tasks concurrently with a pool of 2 workers. Simple demonstration of basic concurrency without retries or result collection.

2. Retryable Tasks

task := gopool.NewTask(func() error {
    return fmt.Errorf("temporary error")
}).WithRetry(3, 100*time.Millisecond)

pool := gopool.NewPool()
pool.Go(task).Wait()

Explanation:
Wraps a task with retries using WithRetry. The task will attempt up to 3 times with exponential backoff (100 ms base) for transient errors.

3. Collecting Results with Drainer

output := gopool.NewDrainer[string]()

tasks := gopool.Workers{
    gopool.NewTask(func() error { output.Send("result1"); return nil }),
    gopool.NewTask(func() error { output.Send("result2"); return nil }),
}

pool := gopool.NewPool().WithLimit(2)
pool.Go(tasks...).Wait()

results := output.Drain()
fmt.Println("Collected results:", results) // ["result1", "result2"]

Explanation:
Demonstrates type-safe result collection using a Drainer. Tasks send their results to the drainer, which is drained after execution.

4. Multiple Task Types

type typeA struct{ value string }
type typeB struct{ value float32 }

outputA := gopool.NewDrainer[typeA]()
outputB := gopool.NewDrainer[typeB]()

tasks := gopool.Workers{
    gopool.NewTask(func() error { outputA.Send(typeA{"hello"}); return nil }),
    gopool.NewTask(func() error { outputB.Send(typeB{42.5}); return nil }),
}

pool := gopool.NewPool().WithLimit(2)
pool.Go(tasks...).Wait()

Explanation:
Illustrates concurrent execution of tasks producing different result types, each collected by a separate type-safe drainer.

5. Complete Example: Retries + Drainers

package main

import (
    "fmt"
    "sync/atomic"
    "time"
    gopool "github.com/rubengp99/go-pool"
)

type typeA struct{ value string }
type typeB struct{ value float32 }

func main() {
    var totalInvocations uint32

    outputA := gopool.NewDrainer[typeA]()
    outputB := gopool.NewDrainer[typeB]()

    tasks := gopool.Workers{
        gopool.NewTask(func() error {
            atomic.AddUint32(&totalInvocations, 1)
            fmt.Println("Running simple task")
            return nil
        }),

        gopool.NewTask(func() error {
            atomic.AddUint32(&totalInvocations, 1)
            if atomic.LoadUint32(&totalInvocations)%2 == 0 {
                fmt.Println("Task succeeded after retry")
                return nil
            }
            fmt.Println("Task failed, retrying...")
            return fmt.Errorf("temporary error")
        }).WithRetry(3, 100*time.Millisecond),

        gopool.NewTask(func() error {
            atomic.AddUint32(&totalInvocations, 1)
            outputA.Send(typeA{value: "Hello from typeA!"})
            return nil
        }),

        gopool.NewTask(func() error {
            atomic.AddUint32(&totalInvocations, 1)
            outputB.Send(typeB{value: 99.99})
            return nil
        }),
    }

    pool := gopool.NewPool().WithLimit(2)
    if err := pool.Go(tasks...).Wait(); err != nil {
        fmt.Println("Pool error:", err)
    }

    resultsA := outputA.Drain()
    resultsB := outputB.Drain()

    fmt.Printf("nTotal tasks executed: %dn", totalInvocations)
    fmt.Println("Results of typeA:", resultsA)
    fmt.Println("Results of typeB:", resultsB)
}

Explanation:
Full-featured example combining simple tasks, retryable tasks, and multiple drainers. Demonstrates heterogeneous workloads, retries, and type-safe result collection in one pool.

Benchmarks

Name Iterations ns/op B/op allocs/op
ErrGroup 6,203,892 183.5 24 1
GoPool 6,145,203 192.0 32 1
GoPoolWithDrainer 5,508,412 205.4 90 2
ChannelsWithOutputAndErrChannel 4,461,849 262.0 72 2
ChannelsWithWaitGroup 4,431,901 271.8 80 2
ChannelsWithErrGroup 4,459,243 274.8 80 2
MutexWithErrGroup 2,896,214 378.3 135 2

Benchmark Comparison

Analysis:
Go-Pool adds only ~8.5 ns per operation versus ErrGroup, while offering:

  • Type safety with generic Drainer[T]
  • Automatic retries with backoff
  • Deterministic cleanup
  • Concurrent-safe result draining
    All with minimal memory overhead, ideal for high-throughput Go applications.

Repository

Go-Pool on GitHub

Total
0
Shares
Leave a Reply

Your email address will not be published. Required fields are marked *

Previous Post
how-i-used-vibe-coding-to-build-custom-seo-tools-(without-writing-code!)

How I Used Vibe Coding to Build Custom SEO Tools (Without Writing Code!)

Related Posts