Skip to content

Commit

Permalink
Tweak batcher timeout so it starts from when something has been queued
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Aug 30, 2023
1 parent 6f51a5d commit f34ace1
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions syncx/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewBatcher[T any](process func(batch []T), maxItems int, maxAge time.Durati
buffer: make(chan T, capacity),
stop: make(chan bool),
batch: make([]T, 0, maxItems),
timeout: time.After(maxAge),
timeout: nil,
}
}

Expand All @@ -45,11 +45,19 @@ func (b *Batcher[T]) Start() {
select {
case v := <-b.buffer:
b.batch = append(b.batch, v)

// if this is the first item in the batch we need to restart the age timeout
if b.timeout == nil {
b.timeout = time.After(b.maxAge)
}

// if we have a full batch, flush it
if len(b.batch) == b.maxItems {
b.flush()
}

case <-b.timeout:
// flush whatever we have
b.flush()

case <-b.stop:
Expand Down Expand Up @@ -78,7 +86,7 @@ func (b *Batcher[T]) flush() {
if len(b.batch) > 0 {
b.process(b.batch)
b.batch = make([]T, 0, b.maxItems)
b.timeout = time.After(b.maxAge)
b.timeout = nil
}
}

Expand Down

0 comments on commit f34ace1

Please sign in to comment.