Package pool implements a consumer goroutine pool for easier goroutine handling.
Features:
- Dead simple to use and makes no assumptions about how you will use it.
- Automatic recovery from consumer goroutines which returns an error to the results + automatically cancels the rest of the jobs.
Use go get.
go get gopkg.in/go-playground/pool.v1
or to update
go get -u gopkg.in/go-playground/pool.v1
Then import the pool package into your own code.
import "gopkg.in/go-playground/pool.v1"
Please see http://godoc.org/gopkg.in/go-playground/pool.v1 for detailed usage docs.
Struct return value
package main
import (
"errors"
"fmt"
"time"
"gopkg.in/go-playground/pool.v1"
)
type resultStruct struct {
i int
err error
}
func main() {
p := pool.NewPool(4, 16)
// can add a consumer hook for each consumer routine to get a value from
// such as a database connection which each job can reuse via job.HookParam()
// p.AddConsumerHook(func() interface{}{ return db connection or whatever})
fn := func(job *pool.Job) {
i := job.Params()[0].(int)
res := &resultStruct{
i: i,
}
// any condition that would cause an error
if i == 10 {
res.err = errors.New("Something bad happened, but don't need to cancel the rest of the jobs")
job.Return(res)
// or if you want to cancel run the line below
job.Cancel()
return
}
time.Sleep(time.Second * 1)
job.Return(res)
}
for i := 0; i < 4; i++ {
p.Queue(fn, i)
}
for result := range p.Results() {
err, ok := result.(*pool.ErrRecovery)
if ok {
// there was some sort of panic that
// was recovered, in this scenario
fmt.Println(err)
return
}
res := result.(*resultStruct)
if res.err != nil {
// do what you want with error or cancel the pool here p.Cancel()
fmt.Println(res.err)
}
// do what you want with result
fmt.Println(res.i)
}
}
Value return value
package main
import (
"errors"
"fmt"
"time"
"gopkg.in/go-playground/pool.v1"
)
func main() {
p := pool.NewPool(4, 16)
// can add a consumer hook for each consumer routine to get a value from
// such as a database connection which each job can reuse via job.HookParam()
// p.AddConsumerHook(func() interface{}{ return db connection or whatever})
fn := func(job *pool.Job) {
i := job.Params()[0].(int)
// any condition that would cause an error
if i == 10 {
job.Return(errors.New("Something bad happened, but don't need to cancel the rest of the jobs"))
// or if you want to cancel run the line below
job.Cancel()
return
}
time.Sleep(time.Second * 1)
job.Return(i)
}
for i := 0; i < 4; i++ {
p.Queue(fn, i)
}
for result := range p.Results() {
switch result.(type) {
case *pool.ErrRecovery:
err := result.(*pool.ErrRecovery)
// do what you want with error or cancel the pool here p.Cancel()
fmt.Println(err)
default:
j := result.(int)
// do what you want with result
fmt.Println(j)
}
}
}
Run on MacBook Pro (Retina, 15-inch, Late 2013) 2.6 GHz Intel Core i7 16 GB 1600 MHz DDR3 using Go 1.5.1
$ go test -cpu=4 -bench=. -benchmem=true
PASS
BenchmarkSmallRun-4 1 3000201819 ns/op 2272 B/op 58 allocs/op
BenchmarkSmallCancel-4 1 2002207036 ns/op 2928 B/op 79 allocs/op
BenchmarkLargeCancel-4 1 2000774880 ns/op 106656 B/op 3026 allocs/op
BenchmarkOverconsumeLargeRun-4 1 4003364358 ns/op 29872 B/op 557 allocs/op
To put these benchmarks in perspective:
- BenchmarkSmallRun-4 did 10 seconds worth of processing in 3 seconds
- BenchmarkSmallCancel-4 ran 20 jobs, cancelled on job 6 and and ran in 2 seconds
- BenchmarkLargeCancel-4 ran 1000 jobs, cancelled on job 6 and and ran in 2 seconds
- BenchmarkOverconsumeLargeRun-4 ran 100 jobs using 25 consumers in 4 seconds
There will always be a development branch for each version i.e. v1-development
. In order to contribute,
please make your pull requests against those branches.
If the changes being proposed or requested are breaking changes, please create an issue, for discussion or create a pull request against the highest development branch for example this package has a v1 and v1-development branch however, there will also be a v2-development branch even though v2 doesn't exist yet.
Distributed under MIT License, please see license file in code for more details.