diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index c247ed91e9a..4cc5bc9363d 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -863,8 +863,8 @@ ORDER BY if stream.Id > streamLog.StreamId { s.Logger().Warningf("Found stream log for nonexistent stream: %+v", streamLog) - // This can happen on manual/failed workflow cleanup so keep going. - continue + // This can happen on manual/failed workflow cleanup so move to the next log. + break } // stream.Id == streamLog.StreamId diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index 5d35b205ed3..0ee6d42ad45 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -1811,3 +1811,62 @@ func createReadVReplicationWorkflowFunc(t *testing.T, workflowType binlogdatapb. }, nil } } + +// Test checks that we don't include logs from non-existent streams in the result. +// Ensures that we just skip the logs from non-existent streams and include the rest. +func TestGetWorkflowsStreamLogs(t *testing.T) { + ctx := context.Background() + + sourceKeyspace := "source_keyspace" + targetKeyspace := "target_keyspace" + workflow := "test_workflow" + + sourceShards := []string{"-"} + targetShards := []string{"-"} + + te := newTestMaterializerEnv(t, ctx, &vtctldatapb.MaterializeSettings{ + SourceKeyspace: sourceKeyspace, + TargetKeyspace: targetKeyspace, + Workflow: workflow, + TableSettings: []*vtctldatapb.TableMaterializeSettings{ + { + TargetTable: "table1", + SourceExpression: fmt.Sprintf("select * from %s", "table1"), + }, + { + TargetTable: "table2", + SourceExpression: fmt.Sprintf("select * from %s", "table2"), + }, + }, + }, sourceShards, targetShards) + + logResult := sqltypes.MakeTestResult( + sqltypes.MakeTestFields("id|vrepl_id|type|state|message|created_at|updated_at|`count`", "int64|int64|varchar|varchar|varchar|varchar|varchar|int64"), + "1|0|State Change|Running|test message for non-existent 1|2006-01-02 15:04:05|2006-01-02 15:04:05|1", + "2|0|State Change|Stopped|test message for non-existent 2|2006-01-02 15:04:06|2006-01-02 15:04:06|1", + "3|1|State Change|Running|log message|2006-01-02 15:04:07|2006-01-02 15:04:07|1", + ) + + te.tmc.expectVRQuery(200, "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)", &sqltypes.Result{}) + te.tmc.expectVRQuery(200, "select id from _vt.vreplication where db_name = 'vt_target_keyspace' and workflow = 'test_workflow'", &sqltypes.Result{}) + te.tmc.expectVRQuery(200, "select id, vrepl_id, type, state, message, created_at, updated_at, `count` from _vt.vreplication_log where vrepl_id in (1) order by vrepl_id asc, id asc", logResult) + + res, err := te.ws.GetWorkflows(ctx, &vtctldatapb.GetWorkflowsRequest{ + Keyspace: targetKeyspace, + Workflow: workflow, + IncludeLogs: true, + }) + require.NoError(t, err) + + assert.Len(t, res.Workflows, 1) + assert.NotNil(t, res.Workflows[0].ShardStreams["-/cell-0000000200"]) + assert.Len(t, res.Workflows[0].ShardStreams["-/cell-0000000200"].Streams, 1) + + gotLogs := res.Workflows[0].ShardStreams["-/cell-0000000200"].Streams[0].Logs + + // The non-existent stream logs shouldn't be part of the result + assert.Len(t, gotLogs, 1) + assert.Equal(t, gotLogs[0].Message, "log message") + assert.Equal(t, gotLogs[0].State, "Running") + assert.Equal(t, gotLogs[0].Id, int64(3)) +}