Skip to content

Commit

Permalink
Rework syncx.Batcher so that it flushes a batch without waiting if it…
Browse files Browse the repository at this point in the history
… has enough items
  • Loading branch information
rowanseymour committed Aug 29, 2023
1 parent 64a5ea3 commit f22225a
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 35 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/shopspring/decimal v1.3.1
github.com/stretchr/testify v1.8.4
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63
golang.org/x/net v0.14.0
golang.org/x/text v0.12.0
)
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRx
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
Expand Down Expand Up @@ -67,6 +67,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 h1:m64FZMko/V45gv0bNmrNYoDEq8U5YUhetc9cBWKS1TQ=
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63/go.mod h1:0v4NqG35kSWCMzLaMeX+IQrlSnVE/bqGSyC2cz/9Le8=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
Expand Down Expand Up @@ -99,7 +101,6 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
Expand Down
69 changes: 44 additions & 25 deletions syncx/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,30 @@ package syncx
import (
"sync"
"time"

"golang.org/x/exp/constraints"
)

// Batcher allows values to be queued and processed in a background thread.
type Batcher[T any] struct {
process func(batch []T)
timeout time.Duration
wg *sync.WaitGroup
buffer chan T
stop chan bool
process func(batch []T)
maxItems int
maxAge time.Duration
wg *sync.WaitGroup
buffer chan T
stop chan bool
batch []T
}

// NewBatcher creates a new batcher.
func NewBatcher[T any](process func(batch []T), timeout time.Duration, capacity int, wg *sync.WaitGroup) *Batcher[T] {
func NewBatcher[T any](process func(batch []T), maxItems int, maxAge time.Duration, capacity int, wg *sync.WaitGroup) *Batcher[T] {
return &Batcher[T]{
process: process,
timeout: timeout,
wg: wg,
buffer: make(chan T, capacity),
stop: make(chan bool),
process: process,
maxItems: maxItems,
maxAge: maxAge,
wg: wg,
buffer: make(chan T, capacity),
stop: make(chan bool),
}
}

Expand All @@ -34,14 +39,28 @@ func (b *Batcher[T]) Start() {

for {
select {
case <-b.stop:
for len(b.buffer) > 0 {
case v := <-b.buffer:
b.batch = append(b.batch, v)
if len(b.batch) == b.maxItems {
b.flush()
}
return

case <-time.After(b.timeout):
case <-time.After(b.maxAge):
b.flush()

case <-b.stop:
for len(b.buffer) > 0 || len(b.batch) > 0 {
buffSize := len(b.buffer)
canRead := min(b.maxItems-len(b.batch), buffSize)

for i := 0; i < canRead; i++ {
v := <-b.buffer
b.batch = append(b.batch, v)
}

b.flush()
}
return
}
}
}()
Expand All @@ -59,18 +78,18 @@ func (b *Batcher[T]) Stop() {
close(b.stop)
}

// processes all values currently in the buffer
// flushes whatever has been batched
func (b *Batcher[T]) flush() {
count := len(b.buffer)
if count <= 0 {
return
if len(b.batch) > 0 {
b.process(b.batch)
b.batch = make([]T, 0, b.maxItems)
}
}

batch := make([]T, count)
for i := 0; i < count; i++ {
v := <-b.buffer
batch[i] = v
// TODO delete when on go 1.21 and this is builtin
func min[T constraints.Ordered](x T, y T) T {
if x < y {
return x
}

b.process(batch)
return y
}
38 changes: 30 additions & 8 deletions syncx/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,45 @@ import (
)

func TestBatcher(t *testing.T) {
batches := make([][]int, 0, 5)
batches := make([][]int, 0)

wg := &sync.WaitGroup{}
b := syncx.NewBatcher(func(batch []int) {
batches = append(batches, batch)
}, time.Second, 3, wg)
}, 2, time.Second, 3, wg)

b.Start()

assert.Equal(t, 2, b.Queue(1))
assert.Equal(t, 1, b.Queue(2))
assert.Equal(t, 0, b.Queue(3))
assert.Equal(t, 2, b.Queue(4)) // blocks until 1,2,3 processed
assert.Equal(t, 1, b.Queue(5))
b.Queue(1) // won't trigger a batch

time.Sleep(time.Millisecond * 100)
assert.Equal(t, [][]int{}, batches)

b.Queue(2) // 2 items triggers a batch

time.Sleep(time.Millisecond * 100)
assert.Equal(t, [][]int{{1, 2}}, batches)

b.Queue(3)
b.Queue(4)

time.Sleep(time.Millisecond * 100)
assert.Equal(t, [][]int{{1, 2}, {3, 4}}, batches)

b.Queue(5)

time.Sleep(time.Millisecond * 100) // won't trigger a batch
assert.Equal(t, [][]int{{1, 2}, {3, 4}}, batches)

time.Sleep(time.Millisecond * 1100) // batch forced because of age
assert.Equal(t, [][]int{{1, 2}, {3, 4}, {5}}, batches)

b.Queue(6)
b.Queue(7)
b.Queue(8)

b.Stop()
wg.Wait()

assert.Equal(t, [][]int{{1, 2, 3}, {4, 5}}, batches)
assert.Equal(t, [][]int{{1, 2}, {3, 4}, {5}, {6, 7}, {8}}, batches)
}

0 comments on commit f22225a

Please sign in to comment.