Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework syncx.Batcher so that it flushes a batch without waiting if it has enough items #95

Merged
merged 4 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/shopspring/decimal v1.3.1
github.com/stretchr/testify v1.8.4
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63
golang.org/x/net v0.14.0
golang.org/x/text v0.12.0
)
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRx
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
Expand Down Expand Up @@ -67,6 +67,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 h1:m64FZMko/V45gv0bNmrNYoDEq8U5YUhetc9cBWKS1TQ=
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63/go.mod h1:0v4NqG35kSWCMzLaMeX+IQrlSnVE/bqGSyC2cz/9Le8=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
Expand Down Expand Up @@ -99,7 +101,6 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
Expand Down
71 changes: 50 additions & 21 deletions syncx/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,34 @@
import (
"sync"
"time"

"golang.org/x/exp/constraints"
)

// Batcher allows values to be queued and processed in a background thread.
type Batcher[T any] struct {
process func(batch []T)
timeout time.Duration
process func(batch []T)
maxItems int
maxAge time.Duration

wg *sync.WaitGroup
buffer chan T
stop chan bool
batch []T
timeout <-chan time.Time
}

// NewBatcher creates a new batcher.
func NewBatcher[T any](process func(batch []T), timeout time.Duration, capacity int, wg *sync.WaitGroup) *Batcher[T] {
func NewBatcher[T any](process func(batch []T), maxItems int, maxAge time.Duration, capacity int, wg *sync.WaitGroup) *Batcher[T] {
return &Batcher[T]{
process: process,
timeout: timeout,
wg: wg,
buffer: make(chan T, capacity),
stop: make(chan bool),
process: process,
maxItems: maxItems,
maxAge: maxAge,
wg: wg,
buffer: make(chan T, capacity),
stop: make(chan bool),
batch: make([]T, 0, maxItems),
timeout: time.After(maxAge),
}
}

Expand All @@ -34,14 +43,19 @@

for {
select {
case <-b.stop:
for len(b.buffer) > 0 {
case v := <-b.buffer:
b.batch = append(b.batch, v)
if len(b.batch) == b.maxItems {
b.flush()
}
return

case <-time.After(b.timeout):
case <-b.timeout:
b.flush()

case <-b.stop:
b.drain()
close(b.buffer)
return
}
}
}()
Expand All @@ -59,18 +73,33 @@
close(b.stop)
}

// processes all values currently in the buffer
// flushes whatever has been batched
func (b *Batcher[T]) flush() {
count := len(b.buffer)
if count <= 0 {
return
if len(b.batch) > 0 {
b.process(b.batch)
b.batch = make([]T, 0, b.maxItems)
b.timeout = time.After(b.maxAge)
}
}

func (b *Batcher[T]) drain() {
for len(b.buffer) > 0 || len(b.batch) > 0 {
buffSize := len(b.buffer)
canRead := min(b.maxItems-len(b.batch), buffSize)

for i := 0; i < canRead; i++ {
v := <-b.buffer
b.batch = append(b.batch, v)
}

batch := make([]T, count)
for i := 0; i < count; i++ {
v := <-b.buffer
batch[i] = v
b.flush()
}
}

b.process(batch)
// TODO delete when on go 1.21 and this is builtin
func min[T constraints.Ordered](x T, y T) T {
if x < y {
return x
}

Check warning on line 103 in syncx/batcher.go

View check run for this annotation

Codecov / codecov/patch

syncx/batcher.go#L102-L103

Added lines #L102 - L103 were not covered by tests
return y
}
46 changes: 38 additions & 8 deletions syncx/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,53 @@ import (
)

func TestBatcher(t *testing.T) {
batches := make([][]int, 0, 5)
batches := make([][]int, 0)

wg := &sync.WaitGroup{}
b := syncx.NewBatcher(func(batch []int) {
batches = append(batches, batch)
}, time.Second, 3, wg)
}, 2, time.Second, 3, wg)

b.Start()

assert.Equal(t, 2, b.Queue(1))
assert.Equal(t, 1, b.Queue(2))
assert.Equal(t, 0, b.Queue(3))
assert.Equal(t, 2, b.Queue(4)) // blocks until 1,2,3 processed
assert.Equal(t, 1, b.Queue(5))
b.Queue(1) // won't trigger a batch

time.Sleep(time.Millisecond * 100)
assert.Equal(t, [][]int{}, batches)

b.Queue(2) // 2 items triggers a batch

time.Sleep(time.Millisecond * 100)
assert.Equal(t, [][]int{{1, 2}}, batches)

b.Queue(3)
b.Queue(4)

time.Sleep(time.Millisecond * 100)
assert.Equal(t, [][]int{{1, 2}, {3, 4}}, batches)

b.Queue(5)

time.Sleep(time.Millisecond * 100) // won't trigger a batch
assert.Equal(t, [][]int{{1, 2}, {3, 4}}, batches)

time.Sleep(time.Millisecond * 1100) // batch forced because of age
assert.Equal(t, [][]int{{1, 2}, {3, 4}, {5}}, batches)

time.Sleep(time.Millisecond * 1100) // empty batches never triggered
assert.Equal(t, [][]int{{1, 2}, {3, 4}, {5}}, batches)

b.Queue(6)
b.Queue(7)
b.Queue(8)

b.Stop()
wg.Wait()

assert.Equal(t, [][]int{{1, 2, 3}, {4, 5}}, batches)
assert.Equal(t, [][]int{{1, 2}, {3, 4}, {5}, {6, 7}, {8}}, batches)

// panic if you try to queue to a stopped batcher
assert.Panics(t, func() {
b.Queue(9)
})
}
Loading