diff --git a/fakes_test.go b/fakes_test.go index 5a30a5f..49bb72c 100644 --- a/fakes_test.go +++ b/fakes_test.go @@ -44,11 +44,7 @@ type fakeMetadata struct{} func (m *fakeMetadata) WithOrigin(streams.MetadataOrigin) { } -func (m *fakeMetadata) Update(streams.Metadata) streams.Metadata { - return m -} - -func (m *fakeMetadata) Merge(v streams.Metadata) streams.Metadata { +func (m *fakeMetadata) Merge(v streams.Metadata, s streams.MetadataStrategy) streams.Metadata { return m } diff --git a/kafka/source.go b/kafka/source.go index 52a8b1c..32a9c22 100644 --- a/kafka/source.go +++ b/kafka/source.go @@ -80,8 +80,8 @@ func (m Metadata) WithOrigin(o streams.MetadataOrigin) { } } -// Update updates the given metadata with the contained metadata. -func (m Metadata) Update(v streams.Metadata) streams.Metadata { +// Merge merges the contained metadata into the given the metadata. +func (m Metadata) Merge(v streams.Metadata, s streams.MetadataStrategy) streams.Metadata { if v == nil { return m } @@ -94,29 +94,17 @@ func (m Metadata) Update(v streams.Metadata) streams.Metadata { continue } - if newPos.Offset > oldPos.Offset { - metadata[i] = newPos + if newPos.Origin > oldPos.Origin { + continue } - } - - return metadata -} -// Merge merges the contained metadata into the given the metadata. -func (m Metadata) Merge(v streams.Metadata) streams.Metadata { - if v == nil { - return m - } - - metadata := v.(Metadata) - for _, newPos := range m { - i, oldPos := metadata.find(newPos.Topic, newPos.Partition) - if oldPos == nil { - metadata = append(metadata, newPos) - continue + if newPos.Origin < oldPos.Origin { + metadata[i] = newPos } - if (newPos.Origin == oldPos.Origin && newPos.Offset < oldPos.Offset) || (newPos.Origin < oldPos.Origin) { + // At this point origins are equal + if (s == streams.Lossless && newPos.Offset < oldPos.Offset) || + (s == streams.Dupless && newPos.Offset > oldPos.Offset) { metadata[i] = newPos } } diff --git a/kafka/source_test.go b/kafka/source_test.go index 60a9f00..5d43f67 100644 --- a/kafka/source_test.go +++ b/kafka/source_test.go @@ -92,76 +92,55 @@ func TestMetadata_WithOrigin(t *testing.T) { assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3, Origin: streams.CommitterOrigin}}, meta) } -func TestMetadata_Update(t *testing.T) { +func TestMetadata_Merge(t *testing.T) { meta1 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3}} - meta2 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 2}} + meta2 := kafka.Metadata{{Topic: "foo", Partition: 1, Offset: 2}} - res := meta1.Update(meta2) + res := meta2.Merge(meta1, streams.Lossless) assert.IsType(t, kafka.Metadata{}, res) meta1 = res.(kafka.Metadata) - assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3}}, meta1) -} - -func TestMetadata_UpdatePicksHighest(t *testing.T) { - meta1 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 10}} - meta2 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3}} - - res := meta1.Update(meta2) - - assert.IsType(t, kafka.Metadata{}, res) - merged := res.(kafka.Metadata) - assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 10}}, merged) -} - -func TestMetadata_UpdateNilMerged(t *testing.T) { - meta := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3}} - - res := meta.Update(nil) - - assert.IsType(t, kafka.Metadata{}, res) - merged := res.(kafka.Metadata) - assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3}}, merged) + assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3}, {Topic: "foo", Partition: 1, Offset: 2}}, meta1) } -func TestMetadata_UpdateNewPartition(t *testing.T) { - meta1 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 10}} - meta2 := kafka.Metadata{{Topic: "foo", Partition: 1, Offset: 3}} +func TestMetadata_MergeTakesCommitterOverProcessorWhenCommitter(t *testing.T) { + meta1 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 2, Origin: streams.ProcessorOrigin}} + meta2 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3, Origin: streams.CommitterOrigin}} - res := meta2.Update(meta1) + res := meta2.Merge(meta1, streams.Lossless) assert.IsType(t, kafka.Metadata{}, res) - merged := res.(kafka.Metadata) - assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 10}, {Topic: "foo", Partition: 1, Offset: 3}}, merged) + resMeta := res.(kafka.Metadata) + assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3, Origin: streams.CommitterOrigin}}, resMeta) } -func TestMetadata_Merge(t *testing.T) { - meta1 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3}} - meta2 := kafka.Metadata{{Topic: "foo", Partition: 1, Offset: 2}} +func TestMetadata_MergeTakesCommitterOverProcessorWhenProcessor(t *testing.T) { + meta1 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 2, Origin: streams.ProcessorOrigin}} + meta2 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3, Origin: streams.CommitterOrigin}} - res := meta2.Merge(meta1) + res := meta1.Merge(meta2, streams.Lossless) assert.IsType(t, kafka.Metadata{}, res) - meta1 = res.(kafka.Metadata) - assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3}, {Topic: "foo", Partition: 1, Offset: 2}}, meta1) + resMeta := res.(kafka.Metadata) + assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3, Origin: streams.CommitterOrigin}}, resMeta) } -func TestMetadata_MergeTakesCommitterOverProcessor(t *testing.T) { - meta1 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 2, Origin: streams.ProcessorOrigin}} - meta2 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3, Origin: streams.CommitterOrigin}} +func TestMetadata_MergeTakesHighestWhenTheSameOriginAndDupless(t *testing.T) { + meta1 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3, Origin: streams.ProcessorOrigin}} + meta2 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 2, Origin: streams.ProcessorOrigin}} - res := meta2.Merge(meta1) + res := meta2.Merge(meta1, streams.Dupless) assert.IsType(t, kafka.Metadata{}, res) meta1 = res.(kafka.Metadata) - assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3, Origin: streams.CommitterOrigin}}, meta1) + assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3, Origin: streams.ProcessorOrigin}}, meta1) } -func TestMetadata_MergeTakesLowestWhenTheSameOrigin(t *testing.T) { +func TestMetadata_MergeTakesLowestWhenTheSameOriginAndLossLess(t *testing.T) { meta1 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3, Origin: streams.ProcessorOrigin}} meta2 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 2, Origin: streams.ProcessorOrigin}} - res := meta2.Merge(meta1) + res := meta2.Merge(meta1, streams.Lossless) assert.IsType(t, kafka.Metadata{}, res) meta1 = res.(kafka.Metadata) @@ -171,7 +150,7 @@ func TestMetadata_MergeTakesLowestWhenTheSameOrigin(t *testing.T) { func TestMetadata_MergeNilMerged(t *testing.T) { b := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3}} - res := b.Merge(nil) + res := b.Merge(nil, streams.Lossless) assert.IsType(t, kafka.Metadata{}, res) a := res.(kafka.Metadata) @@ -186,7 +165,7 @@ func BenchmarkMetadata_Merge(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - meta = other.Update(meta) + meta = other.Merge(meta, streams.Lossless) } } diff --git a/message.go b/message.go index 89b58d6..1d70098 100644 --- a/message.go +++ b/message.go @@ -7,20 +7,27 @@ import ( // MetadataOrigin represents the metadata origin type. type MetadataOrigin uint8 -// MetadataOrigin types +// MetadataOrigin types. const ( CommitterOrigin MetadataOrigin = iota ProcessorOrigin ) +// MetadataStrategy represents the metadata merge strategy. +type MetadataStrategy uint8 + +// MetadataStrategy types. +const ( + Lossless MetadataStrategy = iota + Dupless +) + // Metadata represents metadata that can be merged. type Metadata interface { // WithOrigin sets the MetadataOrigin on the metadata. WithOrigin(MetadataOrigin) - // Update updates the given metadata with the contained metadata. - Update(Metadata) Metadata - // Merge merges the contained metadata into the given the metadata. - Merge(Metadata) Metadata + // Merge merges the contained metadata into the given the metadata with the given strategy. + Merge(Metadata, MetadataStrategy) Metadata } // Message represents data the flows through the stream. diff --git a/metastore.go b/metastore.go index 757428e..f507433 100644 --- a/metastore.go +++ b/metastore.go @@ -28,27 +28,14 @@ type Metaitem struct { // Metaitems represents a slice of Metaitem pointers. type Metaitems []*Metaitem -// Update combines contents of two Metaitems objects, updating the Metadata where necessary. -func (m Metaitems) Update(items Metaitems) Metaitems { - return m.join(items, func(old, new Metadata) Metadata { - return old.Update(new) - }) -} - // Merge combines contents of two Metaitems objects, merging the Metadata where necessary. -func (m Metaitems) Merge(items Metaitems) Metaitems { - return m.join(items, func(old, new Metadata) Metadata { - return old.Merge(new) - }) -} - -func (m Metaitems) join(items Metaitems, fn func(old, new Metadata) Metadata) Metaitems { +func (m Metaitems) Merge(items Metaitems, strategy MetadataStrategy) Metaitems { OUTER: for _, newItem := range items { for _, oldItem := range m { if oldItem.Source == newItem.Source { if oldItem.Metadata != nil { - oldItem.Metadata = fn(oldItem.Metadata, newItem.Metadata) + oldItem.Metadata = oldItem.Metadata.Merge(newItem.Metadata, strategy) } continue OUTER } @@ -133,7 +120,7 @@ func (s *metastore) Mark(p Processor, src Source, meta Metadata) error { for _, item := range items { if item.Source == src { - item.Metadata = meta.Update(item.Metadata) + item.Metadata = meta.Merge(item.Metadata, Dupless) return nil } } diff --git a/metastore_test.go b/metastore_test.go index 0e2df9e..a4912a1 100644 --- a/metastore_test.go +++ b/metastore_test.go @@ -8,7 +8,7 @@ import ( "github.com/stretchr/testify/mock" ) -func TestMetaitems_Update(t *testing.T) { +func TestMetaitems_MergeDupless(t *testing.T) { src1 := new(MockSource) src2 := new(MockSource) src3 := new(MockSource) @@ -27,28 +27,19 @@ func TestMetaitems_Update(t *testing.T) { items := streams.Metaitems{item1, item2} other := streams.Metaitems{item3, item4} - meta2.On("Update", mock.Anything).Return(meta5) + meta2.On("Merge", mock.Anything, mock.Anything).Return(meta5) - joined := items.Update(other) + joined := items.Merge(other, streams.Dupless) assert.Len(t, joined, 3) assert.True(t, joined[0] == item1) assert.True(t, joined[1] == item2) assert.True(t, joined[2] == item3) assert.True(t, meta5 == item2.Metadata) - meta2.AssertCalled(t, "Update", meta4) + meta2.AssertCalled(t, "Merge", meta4, streams.Dupless) } -func TestMetaitems_UpdateHandlesNilSourceAndMetadata(t *testing.T) { - items := streams.Metaitems{{Source: nil, Metadata: nil}} - other := streams.Metaitems{{Source: nil, Metadata: nil}} - - joined := items.Update(other) - - assert.Len(t, joined, 1) -} - -func TestMetaitems_Merge(t *testing.T) { +func TestMetaitems_MergeLossless(t *testing.T) { src1 := new(MockSource) src2 := new(MockSource) src3 := new(MockSource) @@ -67,47 +58,27 @@ func TestMetaitems_Merge(t *testing.T) { items := streams.Metaitems{item1, item2} other := streams.Metaitems{item3, item4} - meta2.On("Merge", mock.Anything).Return(meta5) + meta2.On("Merge", mock.Anything, mock.Anything).Return(meta5) - merged := items.Merge(other) + merged := items.Merge(other, streams.Lossless) assert.Len(t, merged, 3) assert.True(t, merged[0] == item1) assert.True(t, merged[1] == item2) assert.True(t, merged[2] == item3) assert.True(t, meta5 == item2.Metadata) - meta2.AssertCalled(t, "Merge", meta4) + meta2.AssertCalled(t, "Merge", meta4, streams.Lossless) } func TestMetaitems_MergeHandlesNilSourceAndMetadata(t *testing.T) { items := streams.Metaitems{{Source: nil, Metadata: nil}} other := streams.Metaitems{{Source: nil, Metadata: nil}} - joined := items.Merge(other) + joined := items.Merge(other, streams.Lossless) assert.Len(t, joined, 1) } -func BenchmarkMetaitems_Update(b *testing.B) { - src1 := &fakeSource{} - src2 := &fakeSource{} - src3 := &fakeSource{} - - meta1 := &fakeMetadata{} - meta2 := &fakeMetadata{} - meta3 := &fakeMetadata{} - meta4 := &fakeMetadata{} - - items1 := streams.Metaitems{{Source: src1, Metadata: meta1}, {Source: src2, Metadata: meta2}} - items2 := streams.Metaitems{{Source: src3, Metadata: meta3}, {Source: src2, Metadata: meta4}} - - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - items1.Update(items2) - } -} - func BenchmarkMetaitems_Merge(b *testing.B) { src1 := &fakeSource{} src2 := &fakeSource{} @@ -124,7 +95,7 @@ func BenchmarkMetaitems_Merge(b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - items1.Merge(items2) + items1.Merge(items2, streams.Lossless) } } @@ -165,7 +136,7 @@ func TestMetastore_MarkProcessor(t *testing.T) { newMeta := new(MockMetadata) meta2 := new(MockMetadata) meta2.On("WithOrigin", streams.ProcessorOrigin) - meta2.On("Update", meta1).Return(newMeta) + meta2.On("Merge", meta1, streams.Dupless).Return(newMeta) s := streams.NewMetastore() err := s.Mark(p, src, meta1) @@ -186,7 +157,7 @@ func TestMetastore_MarkCommitter(t *testing.T) { newMeta := new(MockMetadata) meta2 := new(MockMetadata) meta2.On("WithOrigin", streams.CommitterOrigin) - meta2.On("Update", meta1).Return(newMeta) + meta2.On("Merge", meta1, streams.Dupless).Return(newMeta) s := streams.NewMetastore() err := s.Mark(p, src, meta1) diff --git a/mocks_test.go b/mocks_test.go index 6b08053..0d32570 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -17,13 +17,8 @@ func (m *MockMetadata) WithOrigin(o streams.MetadataOrigin) { m.Called(o) } -func (m *MockMetadata) Update(v streams.Metadata) streams.Metadata { - args := m.Called(v) - return args.Get(0).(streams.Metadata) -} - -func (m *MockMetadata) Merge(v streams.Metadata) streams.Metadata { - args := m.Called(v) +func (m *MockMetadata) Merge(v streams.Metadata, s streams.MetadataStrategy) streams.Metadata { + args := m.Called(v, s) return args.Get(0).(streams.Metadata) } diff --git a/nanotime.go b/nanotime.go index 4575df3..1f1d5f7 100644 --- a/nanotime.go +++ b/nanotime.go @@ -1,7 +1,7 @@ package streams import ( - _ "unsafe" + _ "unsafe" // Required in order to import nanotime ) //go:linkname nanotime runtime.nanotime diff --git a/pump.go b/pump.go index 77f4ec1..cbab0cd 100644 --- a/pump.go +++ b/pump.go @@ -78,6 +78,8 @@ func (p *processorPump) run() { s.Gauge("node.back-pressure", pressure(p.ch), 0.1, tags...) }) } + + // It is not safe to do anything after the loop } // Accept takes a message to be processed in the Pump. diff --git a/supervisor.go b/supervisor.go index a31c390..5af3db8 100644 --- a/supervisor.go +++ b/supervisor.go @@ -57,7 +57,8 @@ type Supervisor interface { } type supervisor struct { - store Metastore + store Metastore + strategy MetadataStrategy pumps map[Processor]Pump @@ -65,9 +66,10 @@ type supervisor struct { } // NewSupervisor returns a new Supervisor instance. -func NewSupervisor(store Metastore) Supervisor { +func NewSupervisor(store Metastore, strategy MetadataStrategy) Supervisor { return &supervisor{ - store: store, + store: store, + strategy: strategy, } } @@ -112,10 +114,10 @@ func (s *supervisor) Commit(caller Processor) error { return err } - items = items.Update(newItems) + items = items.Merge(newItems, Dupless) } - metaItems = metaItems.Merge(items) + metaItems = metaItems.Merge(items, s.strategy) } for _, item := range metaItems { diff --git a/supervisor_test.go b/supervisor_test.go index 49542e5..589b260 100644 --- a/supervisor_test.go +++ b/supervisor_test.go @@ -11,7 +11,7 @@ import ( ) func TestNewSupervisor(t *testing.T) { - supervisor := streams.NewSupervisor(nil) + supervisor := streams.NewSupervisor(nil, streams.Lossless) assert.Implements(t, (*streams.Supervisor)(nil), supervisor) } @@ -45,7 +45,7 @@ func TestSupervisor_Commit(t *testing.T) { node(proc): pump2, } - supervisor := streams.NewSupervisor(store) + supervisor := streams.NewSupervisor(store, streams.Lossless) supervisor.WithPumps(pumps) err := supervisor.Commit(nil) @@ -75,7 +75,7 @@ func TestSupervisor_Commit_WithCaller(t *testing.T) { pumps := map[streams.Node]streams.Pump{node(comm): pump} - supervisor := streams.NewSupervisor(store) + supervisor := streams.NewSupervisor(store, streams.Lossless) supervisor.WithPumps(pumps) err := supervisor.Commit(comm) @@ -99,7 +99,7 @@ func TestSupervisor_Commit_NullSource(t *testing.T) { pumps := map[streams.Node]streams.Pump{node(comm): pump} - supervisor := streams.NewSupervisor(store) + supervisor := streams.NewSupervisor(store, streams.Lossless) supervisor.WithPumps(pumps) err := supervisor.Commit(nil) @@ -113,7 +113,7 @@ func TestSupervisor_Commit_PullAllError(t *testing.T) { store := new(MockMetastore) store.On("PullAll").Return(nil, errors.New("error")) - supervisor := streams.NewSupervisor(store) + supervisor := streams.NewSupervisor(store, streams.Lossless) err := supervisor.Commit(nil) @@ -135,7 +135,7 @@ func TestSupervisor_Commit_PullError(t *testing.T) { pumps := map[streams.Node]streams.Pump{node(comm): pump} - supervisor := streams.NewSupervisor(store) + supervisor := streams.NewSupervisor(store, streams.Lossless) supervisor.WithPumps(pumps) err := supervisor.Commit(nil) @@ -158,7 +158,7 @@ func TestSupervisor_Commit_UnknownPump(t *testing.T) { store := new(MockMetastore) store.On("PullAll").Return(meta, nil) - supervisor := streams.NewSupervisor(store) + supervisor := streams.NewSupervisor(store, streams.Lossless) supervisor.WithPumps(pumps) err := supervisor.Commit(nil) @@ -181,7 +181,7 @@ func TestSupervisor_Commit_CommitterError(t *testing.T) { pumps := map[streams.Node]streams.Pump{node(comm): pump} - supervisor := streams.NewSupervisor(store) + supervisor := streams.NewSupervisor(store, streams.Lossless) supervisor.WithPumps(pumps) err := supervisor.Commit(nil) @@ -206,7 +206,7 @@ func TestSupervisor_Commit_SourceError(t *testing.T) { pumps := map[streams.Node]streams.Pump{node(comm): pump} - supervisor := streams.NewSupervisor(store) + supervisor := streams.NewSupervisor(store, streams.Lossless) supervisor.WithPumps(pumps) err := supervisor.Commit(nil) @@ -223,7 +223,7 @@ func BenchmarkSupervisor_Commit(b *testing.B) { store := &fakeMetastore{Metadata: map[streams.Processor]streams.Metaitems{p: {{Source: src, Metadata: meta}}}} node := &fakeNode{Proc: p} pump := &fakePump{} - supervisor := streams.NewSupervisor(store) + supervisor := streams.NewSupervisor(store, streams.Lossless) supervisor.WithPumps(map[streams.Node]streams.Pump{node: pump}) b.ReportAllocs() @@ -240,7 +240,7 @@ func BenchmarkSupervisor_CommitGlobal(b *testing.B) { store := &fakeMetastore{Metadata: map[streams.Processor]streams.Metaitems{p: {{Source: src, Metadata: meta}}}} node := &fakeNode{Proc: p} pump := &fakePump{} - supervisor := streams.NewSupervisor(store) + supervisor := streams.NewSupervisor(store, streams.Lossless) supervisor.WithPumps(map[streams.Node]streams.Pump{node: pump}) b.ReportAllocs() @@ -407,13 +407,13 @@ func TestTimedSupervisor_CommitSourceError(t *testing.T) { inner := new(MockSupervisor) inner.On("Start").Return(nil) inner.On("Commit", caller).Return(wantErr) + inner.On("Close").Return(nil) supervisor := streams.NewTimedSupervisor(inner, 1*time.Second, nil) - err := supervisor.Start() - - assert.NoError(t, err) + _ = supervisor.Start() + defer supervisor.Close() - err = supervisor.Commit(caller) + err := supervisor.Commit(caller) inner.AssertCalled(t, "Commit", caller) assert.Equal(t, wantErr, err) @@ -461,8 +461,7 @@ func pump() *MockPump { func metadata() *MockMetadata { meta := new(MockMetadata) - meta.On("Merge", mock.Anything).Return(meta) - meta.On("Update", mock.Anything).Return(meta) + meta.On("Merge", mock.Anything, mock.Anything).Return(meta) return meta } diff --git a/task.go b/task.go index 2aaf5ec..ddaca21 100644 --- a/task.go +++ b/task.go @@ -14,7 +14,14 @@ type TaskOptFunc func(t *streamTask) // WithCommitInterval defines an interval of automatic commits. func WithCommitInterval(d time.Duration) TaskOptFunc { return func(t *streamTask) { - t.supervisor = NewTimedSupervisor(t.supervisor, d, t.errorFn) + t.supervisorOpts.Interval = d + } +} + +// WithMetadataStrategy defines an strategy of metadata mergers. +func WithMetadataStrategy(strategy MetadataStrategy) TaskOptFunc { + return func(t *streamTask) { + t.supervisorOpts.Strategy = strategy } } @@ -28,16 +35,22 @@ type Task interface { Close() error } +type supervisorOpts struct { + Strategy MetadataStrategy + Interval time.Duration +} + type streamTask struct { topology *Topology running bool errorFn ErrorFunc - store Metastore - supervisor Supervisor - srcPumps SourcePumps - pumps map[Node]Pump + store Metastore + supervisorOpts supervisorOpts + supervisor Supervisor + srcPumps SourcePumps + pumps map[Node]Pump } // NewTask creates a new streams task. @@ -47,7 +60,10 @@ func NewTask(topology *Topology, opts ...TaskOptFunc) Task { t := &streamTask{ topology: topology, store: store, - supervisor: NewSupervisor(store), + supervisorOpts: supervisorOpts{ + Strategy: Lossless, + Interval: 0, + }, srcPumps: SourcePumps{}, pumps: map[Node]Pump{}, } @@ -56,6 +72,11 @@ func NewTask(topology *Topology, opts ...TaskOptFunc) Task { optFn(t) } + t.supervisor = NewSupervisor(t.store, t.supervisorOpts.Strategy) + if t.supervisorOpts.Interval > 0 { + t.supervisor = NewTimedSupervisor(t.supervisor, t.supervisorOpts.Interval, t.errorFn) + } + return t } diff --git a/task_internal_test.go b/task_internal_test.go index 6083809..c6c593a 100644 --- a/task_internal_test.go +++ b/task_internal_test.go @@ -13,6 +13,13 @@ func TestWithCommitInterval(t *testing.T) { assert.IsType(t, &timedSupervisor{}, task.(*streamTask).supervisor) } +func TestWithMetadataStrategy(t *testing.T) { + task := NewTask(&Topology{sources: map[Source]Node{}}, WithMetadataStrategy(Dupless)) + + assert.IsType(t, &supervisor{}, task.(*streamTask).supervisor) + assert.Equal(t, Dupless, task.(*streamTask).supervisor.(*supervisor).strategy) +} + func TestStreamTask_StartSupervisorStartError(t *testing.T) { task := &streamTask{ topology: &Topology{sources: map[Source]Node{}},