Skip to content

Commit

Permalink
Replace panic with error message when not declaring a source node in …
Browse files Browse the repository at this point in the history
….yml config (#1695)

* move append for DLQhandler node to only occur if there are source nodes available

* unit test

* fix test cases for unit test and add no destination test

* fix tests

---------

Co-authored-by: Lovro Mažgon <[email protected]>
  • Loading branch information
AdamHaffar and lovromazgon authored Jul 30, 2024
1 parent 2fe8bf1 commit 7339241
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 2 deletions.
4 changes: 3 additions & 1 deletion pkg/pipeline/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,6 @@ func (s *Service) buildSourceNodes(
if err != nil {
return nil, err
}
nodes = append(nodes, dlqHandlerNode)

for _, connID := range pl.ConnectorIDs {
instance, err := connFetcher.Get(ctx, connID)
Expand Down Expand Up @@ -391,6 +390,9 @@ func (s *Service) buildSourceNodes(
nodes = append(nodes, procNodes...)
}

if len(nodes) != 0 {
nodes = append(nodes, dlqHandlerNode)
}
return nodes, nil
}

Expand Down
97 changes: 96 additions & 1 deletion pkg/pipeline/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ func TestServiceLifecycle_buildNodes(t *testing.T) {
is.NoErr(err)

want := []stream.Node{
&stream.DLQHandlerNode{},
&stream.SourceNode{},
&stream.SourceAckerNode{},
&stream.MetricsNode{},
&stream.DLQHandlerNode{},
&stream.FaninNode{},
&stream.FanoutNode{},
&stream.MetricsNode{},
Expand Down Expand Up @@ -125,6 +125,101 @@ func TestServiceLifecycle_buildNodes(t *testing.T) {
}
}

func TestService_buildNodes_NoSourceNode(t *testing.T) {
is := is.New(t)
ctx, killAll := context.WithCancel(context.Background())
defer killAll()
ctrl := gomock.NewController(t)
logger := log.New(zerolog.Nop())
db := &inmemory.DB{}
persister := connector.NewPersister(logger, db, time.Second, 3)

ps := NewService(logger, db)

wantErr := "can't build pipeline without any source connectors"

destination := dummyDestination(persister)
dlq := dummyDestination(persister)
pl := &Instance{
ID: uuid.NewString(),
Config: Config{Name: "test-pipeline"},
Status: StatusUserStopped,
DLQ: DLQ{
Plugin: dlq.Plugin,
Settings: map[string]string{},
WindowSize: 3,
WindowNackThreshold: 2,
},
ConnectorIDs: []string{destination.ID},
}

got, err := ps.buildNodes(
ctx,
testConnectorFetcher{
destination.ID: destination,
testDLQID: dlq,
},
testProcessorFetcher{},
testPluginFetcher{
destination.Plugin: pmock.NewDispenser(ctrl),
dlq.Plugin: pmock.NewDispenser(ctrl),
},
pl,
)

is.True(err != nil)
is.Equal(err.Error(), wantErr)
is.Equal(got, nil)
}

func TestService_buildNodes_NoDestinationNode(t *testing.T) {
is := is.New(t)
ctx, killAll := context.WithCancel(context.Background())
defer killAll()
ctrl := gomock.NewController(t)
logger := log.New(zerolog.Nop())
db := &inmemory.DB{}
persister := connector.NewPersister(logger, db, time.Second, 3)

ps := NewService(logger, db)

wantErr := "can't build pipeline without any destination connectors"

source := dummySource(persister)
dlq := dummyDestination(persister)

pl := &Instance{
ID: uuid.NewString(),
Config: Config{Name: "test-pipeline"},
Status: StatusUserStopped,
DLQ: DLQ{
Plugin: dlq.Plugin,
Settings: map[string]string{},
WindowSize: 3,
WindowNackThreshold: 2,
},
ConnectorIDs: []string{source.ID},
}

got, err := ps.buildNodes(
ctx,
testConnectorFetcher{
source.ID: source,
testDLQID: dlq,
},
testProcessorFetcher{},
testPluginFetcher{
source.Plugin: pmock.NewDispenser(ctrl),
dlq.Plugin: pmock.NewDispenser(ctrl),
},
pl,
)

is.True(err != nil)
is.Equal(err.Error(), wantErr)
is.Equal(got, nil)
}

func TestServiceLifecycle_PipelineSuccess(t *testing.T) {
is := is.New(t)
ctx, killAll := context.WithCancel(context.Background())
Expand Down

0 comments on commit 7339241

Please sign in to comment.