diff --git a/freezer/freezer.go b/freezer/freezer.go index d95c373..0c70193 100644 --- a/freezer/freezer.go +++ b/freezer/freezer.go @@ -11,13 +11,18 @@ import ( ) var ( + defaultMaxUnflushedCount = 1024 + defaultMaxUnflushedPeriod = time.Second + _ substrate.AsyncMessageSink = (*asyncMessageSink)(nil) _ substrate.AsyncMessageSource = (*asyncMessageSource)(nil) ) type AsyncMessageSinkConfig struct { - StreamStore straw.StreamStore - FreezerConfig freezer.MessageSinkConfig + StreamStore straw.StreamStore + FreezerConfig freezer.MessageSinkConfig + MaxUnflushedCount int // maximum number of unflushed messages + MaxUnflushedPeriod time.Duration // maximum period before flushing } func NewAsyncMessageSink(config AsyncMessageSinkConfig) (substrate.AsyncMessageSink, error) { @@ -25,36 +30,45 @@ func NewAsyncMessageSink(config AsyncMessageSinkConfig) (substrate.AsyncMessageS if err != nil { return nil, err } - return &asyncMessageSink{fms}, nil + if config.MaxUnflushedCount <= 0 { + config.MaxUnflushedCount = defaultMaxUnflushedCount + } + if config.MaxUnflushedPeriod <= 0 { + config.MaxUnflushedPeriod = defaultMaxUnflushedPeriod + } + + return &asyncMessageSink{ + fms: fms, + maxUnflushedCount: config.MaxUnflushedCount, + maxUnflushedPeriod: config.MaxUnflushedPeriod, + }, nil } type asyncMessageSink struct { - fms *freezer.MessageSink + fms *freezer.MessageSink + maxUnflushedCount int + maxUnflushedPeriod time.Duration } func (ams *asyncMessageSink) PublishMessages(ctx context.Context, acks chan<- substrate.Message, messages <-chan substrate.Message) (rerr error) { var toAck []substrate.Message - t := time.NewTimer(0) - if !t.Stop() { - <-t.C - } + + t := time.NewTicker(ams.maxUnflushedPeriod) + defer t.Stop() + for { select { case <-ctx.Done(): return nil case m := <-messages: - if !t.Stop() { - select { - case <-t.C: - default: - } - } - t.Reset(1000 * time.Millisecond) if err := ams.fms.PutMessage(m.Data()); err != nil { return err } toAck = append(toAck, m) case <-t.C: + if len(toAck) == 0 { + continue + } if err := ams.fms.Flush(); err != nil { return err } @@ -67,19 +81,18 @@ func (ams *asyncMessageSink) PublishMessages(ctx context.Context, acks chan<- su } toAck = toAck[0:0] } - if len(toAck) > 1024 { + if len(toAck) >= ams.maxUnflushedCount { if err := ams.fms.Flush(); err != nil { return err } for _, m := range toAck { select { case <-ctx.Done(): - return ctx.Err() + return nil case acks <- m: } } toAck = toAck[0:0] - t.Stop() } } } diff --git a/freezer/freezer_url.go b/freezer/freezer_url.go index 2ba3e83..8a0d360 100644 --- a/freezer/freezer_url.go +++ b/freezer/freezer_url.go @@ -3,8 +3,10 @@ package freezer import ( "fmt" "net/url" + "strconv" "time" + "github.com/pkg/errors" "github.com/uw-labs/freezer" "github.com/uw-labs/straw" "github.com/uw-labs/substrate" @@ -18,10 +20,27 @@ func init() { suburl.RegisterSource("freezer+s3", newFreezerSource) } -func newFreezerSink(u *url.URL) (substrate.AsyncMessageSink, error) { +func newFreezerSink(u *url.URL) (sink substrate.AsyncMessageSink, err error) { q := u.Query() + var ( + maxUnflushedCount = defaultMaxUnflushedCount + maxUnflushedPeriod = defaultMaxUnflushedPeriod + ) + if count := q.Get("max-unflushed-count"); count != "" { + maxUnflushedCount, err = strconv.Atoi(count) + if err != nil { + return nil, errors.Wrap(err, "failed to parse max-unflushed-count") + } + } + if period := q.Get("max-unflushed-period"); period != "" { + maxUnflushedPeriod, err = time.ParseDuration(period) + if err != nil { + return nil, errors.Wrap(err, "failed to parse max-unflushed-period") + } + } + cts := q.Get("compression") ct := freezer.CompressionTypeNone switch cts { @@ -44,6 +63,7 @@ func newFreezerSink(u *url.URL) (substrate.AsyncMessageSink, error) { sse := q.Get("sse") switch sse { + case "": case "aes256": enc = straw.S3ServerSideEncoding(straw.ServerSideEncryptionTypeAES256) default: @@ -67,6 +87,8 @@ func newFreezerSink(u *url.URL) (substrate.AsyncMessageSink, error) { Path: u.Path, CompressionType: ct, }, + MaxUnflushedCount: maxUnflushedCount, + MaxUnflushedPeriod: maxUnflushedPeriod, } return sinker(conf) } diff --git a/freezer/freezer_url_test.go b/freezer/freezer_url_test.go index 1b28897..a855922 100644 --- a/freezer/freezer_url_test.go +++ b/freezer/freezer_url_test.go @@ -28,19 +28,23 @@ func TestFreezerSink(t *testing.T) { CompressionType: freezer.CompressionTypeNone, Path: "/foo/1", }, - StreamStore: &straw.OsStreamStore{}, + StreamStore: &straw.OsStreamStore{}, + MaxUnflushedCount: defaultMaxUnflushedCount, + MaxUnflushedPeriod: defaultMaxUnflushedPeriod, }, expectedErr: nil, }, { name: "everything-dir", - input: "freezer+dir:///foo/bar2/baz/?compression=snappy", + input: "freezer+dir:///foo/bar2/baz/?compression=snappy&max-unflushed-count=20&max-unflushed-period=10s", expected: AsyncMessageSinkConfig{ FreezerConfig: freezer.MessageSinkConfig{ CompressionType: freezer.CompressionTypeSnappy, Path: "/foo/bar2/baz/", }, - StreamStore: &straw.OsStreamStore{}, + StreamStore: &straw.OsStreamStore{}, + MaxUnflushedCount: 20, + MaxUnflushedPeriod: time.Second * 10, }, expectedErr: nil, },