From 136f6a6d522e7e6b085c5f9d09a2ec41ae02314f Mon Sep 17 00:00:00 2001 From: Adam Haffar Date: Thu, 4 Jul 2024 16:58:09 +0100 Subject: [PATCH 1/4] move append for DLQhandler node to only occur if there are source nodes available --- pkg/pipeline/lifecycle.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/pipeline/lifecycle.go b/pkg/pipeline/lifecycle.go index 716fd00a6..68f4ba82f 100644 --- a/pkg/pipeline/lifecycle.go +++ b/pkg/pipeline/lifecycle.go @@ -352,7 +352,6 @@ func (s *Service) buildSourceNodes( if err != nil { return nil, err } - nodes = append(nodes, dlqHandlerNode) for _, connID := range pl.ConnectorIDs { instance, err := connFetcher.Get(ctx, connID) @@ -391,6 +390,9 @@ func (s *Service) buildSourceNodes( nodes = append(nodes, procNodes...) } + if len(nodes) != 0 { + nodes = append(nodes, dlqHandlerNode) + } return nodes, nil } From 1b54d6e52c8ba50c8d047b6b8fd4ec6dea84ef2f Mon Sep 17 00:00:00 2001 From: Adam Haffar Date: Thu, 4 Jul 2024 17:30:14 +0100 Subject: [PATCH 2/4] unit test --- pkg/pipeline/lifecycle_test.go | 46 ++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/pkg/pipeline/lifecycle_test.go b/pkg/pipeline/lifecycle_test.go index 8d659bd09..da87625d5 100644 --- a/pkg/pipeline/lifecycle_test.go +++ b/pkg/pipeline/lifecycle_test.go @@ -125,6 +125,52 @@ func TestServiceLifecycle_buildNodes(t *testing.T) { } } +func TestService_buildNodes_noSourceNode(t *testing.T) { + is := is.New(t) + ctx, killAll := context.WithCancel(context.Background()) + defer killAll() + ctrl := gomock.NewController(t) + logger := log.New(zerolog.Nop()) + db := &inmemory.DB{} + persister := connector.NewPersister(logger, db, time.Second, 3) + + ps := NewService(logger, db) + + wantErr := "can't build pipeline without any source connectors" + + destination := dummyDestination(persister) + dlq := dummyDestination(persister) + pl := &Instance{ + ID: uuid.NewString(), + Config: Config{Name: "test-pipeline"}, + Status: StatusUserStopped, + DLQ: DLQ{ + Plugin: dlq.Plugin, + Settings: map[string]string{}, + WindowSize: 3, + WindowNackThreshold: 2, + }, + ConnectorIDs: []string{destination.ID}, + } + + got, err := ps.buildNodes( + ctx, + testConnectorFetcher{ + destination.ID: destination, + testDLQID: dlq, + }, + testProcessorFetcher{}, + testPluginFetcher{ + destination.Plugin: pmock.NewDispenser(ctrl), + dlq.Plugin: pmock.NewDispenser(ctrl), + }, + pl, + ) + + fmt.Printf("got: %v, want: %v", err, wantErr) + is.Equal(got, nil) +} + func TestServiceLifecycle_PipelineSuccess(t *testing.T) { is := is.New(t) ctx, killAll := context.WithCancel(context.Background()) From 069519da00586f13dd681b2629193ea4a20dee7f Mon Sep 17 00:00:00 2001 From: Adam Haffar Date: Wed, 10 Jul 2024 18:18:06 +0100 Subject: [PATCH 3/4] fix test cases for unit test and add no destination test --- pkg/pipeline/lifecycle_test.go | 48 +++++++++++++++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/pkg/pipeline/lifecycle_test.go b/pkg/pipeline/lifecycle_test.go index da87625d5..f21659857 100644 --- a/pkg/pipeline/lifecycle_test.go +++ b/pkg/pipeline/lifecycle_test.go @@ -166,8 +166,54 @@ func TestService_buildNodes_noSourceNode(t *testing.T) { }, pl, ) + is.Equal(err.Error(), wantErr) + is.Equal(got, nil) +} + +func TestService_buildNodes_noDestinationNode(t *testing.T) { + is := is.New(t) + ctx, killAll := context.WithCancel(context.Background()) + defer killAll() + ctrl := gomock.NewController(t) + logger := log.New(zerolog.Nop()) + db := &inmemory.DB{} + persister := connector.NewPersister(logger, db, time.Second, 3) + + ps := NewService(logger, db) + + wantErr := "can't build pipeline without any destination connectors" + + source := dummySource(persister) + dlq := dummyDestination(persister) + + pl := &Instance{ + ID: uuid.NewString(), + Config: Config{Name: "test-pipeline"}, + Status: StatusUserStopped, + DLQ: DLQ{ + Plugin: dlq.Plugin, + Settings: map[string]string{}, + WindowSize: 3, + WindowNackThreshold: 2, + }, + ConnectorIDs: []string{source.ID}, + } + + got, err := ps.buildNodes( + ctx, + testConnectorFetcher{ + source.ID: source, + testDLQID: dlq, + }, + testProcessorFetcher{}, + testPluginFetcher{ + source.Plugin: pmock.NewDispenser(ctrl), + dlq.Plugin: pmock.NewDispenser(ctrl), + }, + pl, + ) - fmt.Printf("got: %v, want: %v", err, wantErr) + is.Equal(err.Error(), wantErr) is.Equal(got, nil) } From 4f33ce5131020ebe65187babe61b4b1d9c257277 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Tue, 30 Jul 2024 19:21:42 +0200 Subject: [PATCH 4/4] fix tests --- pkg/pipeline/lifecycle_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/pipeline/lifecycle_test.go b/pkg/pipeline/lifecycle_test.go index 0552c3d42..2298908bc 100644 --- a/pkg/pipeline/lifecycle_test.go +++ b/pkg/pipeline/lifecycle_test.go @@ -87,10 +87,10 @@ func TestServiceLifecycle_buildNodes(t *testing.T) { is.NoErr(err) want := []stream.Node{ - &stream.DLQHandlerNode{}, &stream.SourceNode{}, &stream.SourceAckerNode{}, &stream.MetricsNode{}, + &stream.DLQHandlerNode{}, &stream.FaninNode{}, &stream.FanoutNode{}, &stream.MetricsNode{}, @@ -125,7 +125,7 @@ func TestServiceLifecycle_buildNodes(t *testing.T) { } } -func TestService_buildNodes_noSourceNode(t *testing.T) { +func TestService_buildNodes_NoSourceNode(t *testing.T) { is := is.New(t) ctx, killAll := context.WithCancel(context.Background()) defer killAll() @@ -166,11 +166,13 @@ func TestService_buildNodes_noSourceNode(t *testing.T) { }, pl, ) + + is.True(err != nil) is.Equal(err.Error(), wantErr) is.Equal(got, nil) } -func TestService_buildNodes_noDestinationNode(t *testing.T) { +func TestService_buildNodes_NoDestinationNode(t *testing.T) { is := is.New(t) ctx, killAll := context.WithCancel(context.Background()) defer killAll() @@ -213,6 +215,7 @@ func TestService_buildNodes_noDestinationNode(t *testing.T) { pl, ) + is.True(err != nil) is.Equal(err.Error(), wantErr) is.Equal(got, nil) }