diff --git a/go.mod b/go.mod index df5d8e4..1b44d85 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/shopspring/decimal v1.3.1 github.com/stretchr/testify v1.8.4 + golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 golang.org/x/net v0.14.0 golang.org/x/text v0.12.0 ) diff --git a/go.sum b/go.sum index 8865099..2659528 100644 --- a/go.sum +++ b/go.sum @@ -21,8 +21,8 @@ github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRx github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= @@ -67,6 +67,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= +golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 h1:m64FZMko/V45gv0bNmrNYoDEq8U5YUhetc9cBWKS1TQ= +golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63/go.mod h1:0v4NqG35kSWCMzLaMeX+IQrlSnVE/bqGSyC2cz/9Le8= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= @@ -99,7 +101,6 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= diff --git a/syncx/batcher.go b/syncx/batcher.go index 698a1b3..57cb5c5 100644 --- a/syncx/batcher.go +++ b/syncx/batcher.go @@ -3,25 +3,30 @@ package syncx import ( "sync" "time" + + "golang.org/x/exp/constraints" ) // Batcher allows values to be queued and processed in a background thread. type Batcher[T any] struct { - process func(batch []T) - timeout time.Duration - wg *sync.WaitGroup - buffer chan T - stop chan bool + process func(batch []T) + maxItems int + maxAge time.Duration + wg *sync.WaitGroup + buffer chan T + stop chan bool + batch []T } // NewBatcher creates a new batcher. -func NewBatcher[T any](process func(batch []T), timeout time.Duration, capacity int, wg *sync.WaitGroup) *Batcher[T] { +func NewBatcher[T any](process func(batch []T), maxItems int, maxAge time.Duration, capacity int, wg *sync.WaitGroup) *Batcher[T] { return &Batcher[T]{ - process: process, - timeout: timeout, - wg: wg, - buffer: make(chan T, capacity), - stop: make(chan bool), + process: process, + maxItems: maxItems, + maxAge: maxAge, + wg: wg, + buffer: make(chan T, capacity), + stop: make(chan bool), } } @@ -34,14 +39,28 @@ func (b *Batcher[T]) Start() { for { select { - case <-b.stop: - for len(b.buffer) > 0 { + case v := <-b.buffer: + b.batch = append(b.batch, v) + if len(b.batch) == b.maxItems { b.flush() } - return - case <-time.After(b.timeout): + case <-time.After(b.maxAge): b.flush() + + case <-b.stop: + for len(b.buffer) > 0 || len(b.batch) > 0 { + buffSize := len(b.buffer) + canRead := min(b.maxItems-len(b.batch), buffSize) + + for i := 0; i < canRead; i++ { + v := <-b.buffer + b.batch = append(b.batch, v) + } + + b.flush() + } + return } } }() @@ -59,18 +78,18 @@ func (b *Batcher[T]) Stop() { close(b.stop) } -// processes all values currently in the buffer +// flushes whatever has been batched func (b *Batcher[T]) flush() { - count := len(b.buffer) - if count <= 0 { - return + if len(b.batch) > 0 { + b.process(b.batch) + b.batch = make([]T, 0, b.maxItems) } +} - batch := make([]T, count) - for i := 0; i < count; i++ { - v := <-b.buffer - batch[i] = v +// TODO delete when on go 1.21 and this is builtin +func min[T constraints.Ordered](x T, y T) T { + if x < y { + return x } - - b.process(batch) + return y } diff --git a/syncx/batcher_test.go b/syncx/batcher_test.go index 6533b23..4cad1a2 100644 --- a/syncx/batcher_test.go +++ b/syncx/batcher_test.go @@ -10,23 +10,45 @@ import ( ) func TestBatcher(t *testing.T) { - batches := make([][]int, 0, 5) + batches := make([][]int, 0) wg := &sync.WaitGroup{} b := syncx.NewBatcher(func(batch []int) { batches = append(batches, batch) - }, time.Second, 3, wg) + }, 2, time.Second, 3, wg) b.Start() - assert.Equal(t, 2, b.Queue(1)) - assert.Equal(t, 1, b.Queue(2)) - assert.Equal(t, 0, b.Queue(3)) - assert.Equal(t, 2, b.Queue(4)) // blocks until 1,2,3 processed - assert.Equal(t, 1, b.Queue(5)) + b.Queue(1) // won't trigger a batch + + time.Sleep(time.Millisecond * 100) + assert.Equal(t, [][]int{}, batches) + + b.Queue(2) // 2 items triggers a batch + + time.Sleep(time.Millisecond * 100) + assert.Equal(t, [][]int{{1, 2}}, batches) + + b.Queue(3) + b.Queue(4) + + time.Sleep(time.Millisecond * 100) + assert.Equal(t, [][]int{{1, 2}, {3, 4}}, batches) + + b.Queue(5) + + time.Sleep(time.Millisecond * 100) // won't trigger a batch + assert.Equal(t, [][]int{{1, 2}, {3, 4}}, batches) + + time.Sleep(time.Millisecond * 1100) // batch forced because of age + assert.Equal(t, [][]int{{1, 2}, {3, 4}, {5}}, batches) + + b.Queue(6) + b.Queue(7) + b.Queue(8) b.Stop() wg.Wait() - assert.Equal(t, [][]int{{1, 2, 3}, {4, 5}}, batches) + assert.Equal(t, [][]int{{1, 2}, {3, 4}, {5}, {6, 7}, {8}}, batches) }