diff --git a/pkg/pipeline/lifecycle.go b/pkg/pipeline/lifecycle.go index 255b49002..d5ab261e9 100644 --- a/pkg/pipeline/lifecycle.go +++ b/pkg/pipeline/lifecycle.go @@ -352,7 +352,6 @@ func (s *Service) buildSourceNodes( if err != nil { return nil, err } - nodes = append(nodes, dlqHandlerNode) for _, connID := range pl.ConnectorIDs { instance, err := connFetcher.Get(ctx, connID) @@ -391,6 +390,9 @@ func (s *Service) buildSourceNodes( nodes = append(nodes, procNodes...) } + if len(nodes) != 0 { + nodes = append(nodes, dlqHandlerNode) + } return nodes, nil } diff --git a/pkg/pipeline/lifecycle_test.go b/pkg/pipeline/lifecycle_test.go index 9cdc426ff..2298908bc 100644 --- a/pkg/pipeline/lifecycle_test.go +++ b/pkg/pipeline/lifecycle_test.go @@ -87,10 +87,10 @@ func TestServiceLifecycle_buildNodes(t *testing.T) { is.NoErr(err) want := []stream.Node{ - &stream.DLQHandlerNode{}, &stream.SourceNode{}, &stream.SourceAckerNode{}, &stream.MetricsNode{}, + &stream.DLQHandlerNode{}, &stream.FaninNode{}, &stream.FanoutNode{}, &stream.MetricsNode{}, @@ -125,6 +125,101 @@ func TestServiceLifecycle_buildNodes(t *testing.T) { } } +func TestService_buildNodes_NoSourceNode(t *testing.T) { + is := is.New(t) + ctx, killAll := context.WithCancel(context.Background()) + defer killAll() + ctrl := gomock.NewController(t) + logger := log.New(zerolog.Nop()) + db := &inmemory.DB{} + persister := connector.NewPersister(logger, db, time.Second, 3) + + ps := NewService(logger, db) + + wantErr := "can't build pipeline without any source connectors" + + destination := dummyDestination(persister) + dlq := dummyDestination(persister) + pl := &Instance{ + ID: uuid.NewString(), + Config: Config{Name: "test-pipeline"}, + Status: StatusUserStopped, + DLQ: DLQ{ + Plugin: dlq.Plugin, + Settings: map[string]string{}, + WindowSize: 3, + WindowNackThreshold: 2, + }, + ConnectorIDs: []string{destination.ID}, + } + + got, err := ps.buildNodes( + ctx, + testConnectorFetcher{ + destination.ID: destination, + testDLQID: dlq, + }, + testProcessorFetcher{}, + testPluginFetcher{ + destination.Plugin: pmock.NewDispenser(ctrl), + dlq.Plugin: pmock.NewDispenser(ctrl), + }, + pl, + ) + + is.True(err != nil) + is.Equal(err.Error(), wantErr) + is.Equal(got, nil) +} + +func TestService_buildNodes_NoDestinationNode(t *testing.T) { + is := is.New(t) + ctx, killAll := context.WithCancel(context.Background()) + defer killAll() + ctrl := gomock.NewController(t) + logger := log.New(zerolog.Nop()) + db := &inmemory.DB{} + persister := connector.NewPersister(logger, db, time.Second, 3) + + ps := NewService(logger, db) + + wantErr := "can't build pipeline without any destination connectors" + + source := dummySource(persister) + dlq := dummyDestination(persister) + + pl := &Instance{ + ID: uuid.NewString(), + Config: Config{Name: "test-pipeline"}, + Status: StatusUserStopped, + DLQ: DLQ{ + Plugin: dlq.Plugin, + Settings: map[string]string{}, + WindowSize: 3, + WindowNackThreshold: 2, + }, + ConnectorIDs: []string{source.ID}, + } + + got, err := ps.buildNodes( + ctx, + testConnectorFetcher{ + source.ID: source, + testDLQID: dlq, + }, + testProcessorFetcher{}, + testPluginFetcher{ + source.Plugin: pmock.NewDispenser(ctrl), + dlq.Plugin: pmock.NewDispenser(ctrl), + }, + pl, + ) + + is.True(err != nil) + is.Equal(err.Error(), wantErr) + is.Equal(got, nil) +} + func TestServiceLifecycle_PipelineSuccess(t *testing.T) { is := is.New(t) ctx, killAll := context.WithCancel(context.Background())