diff --git a/go/vt/vtctl/workflow/switcher_dry_run_test.go b/go/vt/vtctl/workflow/switcher_dry_run_test.go index 776a90e0c10..ff0bc709fd4 100644 --- a/go/vt/vtctl/workflow/switcher_dry_run_test.go +++ b/go/vt/vtctl/workflow/switcher_dry_run_test.go @@ -30,41 +30,43 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) -func TestDropTargetVReplicationStreams(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - - workflowName := "wf1" - tableName := "t1" - sourceKeyspaceName := "sourceks" - targetKeyspaceName := "targetks" - - schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ - tableName: { +func newTrafficSwitcherEnv(t *testing.T, tables []string, sourceKeyspaceName string, sourceShards []string, targetKeyspaceName string, targetShards []string, workflowName string) (*testEnv, *trafficSwitcher) { + ctx := context.Background() + schema := map[string]*tabletmanagerdatapb.SchemaDefinition{} + for _, tableName := range tables { + schema[tableName] = &tabletmanagerdatapb.SchemaDefinition{ TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ { Name: tableName, Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), }, }, - }, + } } sourceKeyspace := &testKeyspace{ KeyspaceName: sourceKeyspaceName, - ShardNames: []string{"0"}, + ShardNames: sourceShards, } targetKeyspace := &testKeyspace{ KeyspaceName: targetKeyspaceName, - ShardNames: []string{"-80", "80-"}, + ShardNames: targetShards, } env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) - defer env.close() env.tmc.schema = schema ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspaceName, workflowName) require.NoError(t, err) + return env, ts +} + +func TestDropTargetVReplicationStreams(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + env, ts := newTrafficSwitcherEnv(t, []string{"t1"}, "sourceks", []string{"0"}, "targetks", []string{"-80", "80-"}, "wf1") + defer env.close() drLog := NewLogRecorder() dr := switcherDryRun{ @@ -72,7 +74,7 @@ func TestDropTargetVReplicationStreams(t *testing.T) { drLog: drLog, } - err = dr.dropTargetVReplicationStreams(ctx) + err := dr.dropTargetVReplicationStreams(ctx) require.NoError(t, err) require.Len(t, drLog.logs, 1) log := drLog.logs[0] @@ -86,37 +88,8 @@ func TestStartReverseVReplication(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() - workflowName := "wf1" - tableName := "t1" - sourceKeyspaceName := "sourceks" - targetKeyspaceName := "targetks" - - schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ - tableName: { - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ - { - Name: tableName, - Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), - }, - }, - }, - } - - sourceKeyspace := &testKeyspace{ - KeyspaceName: sourceKeyspaceName, - ShardNames: []string{"-80", "80-"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: targetKeyspaceName, - ShardNames: []string{"0"}, - } - - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + env, ts := newTrafficSwitcherEnv(t, []string{"t1"}, "sourceks", []string{"-80", "80-"}, "targetks", []string{"0"}, "wf1") defer env.close() - env.tmc.schema = schema - - ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspaceName, workflowName) - require.NoError(t, err) drLog := NewLogRecorder() dr := switcherDryRun{ @@ -124,7 +97,7 @@ func TestStartReverseVReplication(t *testing.T) { drLog: drLog, } - err = dr.startReverseVReplication(ctx) + err := dr.startReverseVReplication(ctx) require.NoError(t, err) require.Len(t, drLog.logs, 1) log := drLog.logs[0] @@ -138,37 +111,8 @@ func TestRemoveSourceTables(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() - workflowName := "wf1" - tableName := "t1" - sourceKeyspaceName := "sourceks" - targetKeyspaceName := "targetks" - - schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ - tableName: { - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ - { - Name: tableName, - Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), - }, - }, - }, - } - - sourceKeyspace := &testKeyspace{ - KeyspaceName: sourceKeyspaceName, - ShardNames: []string{"-80", "80-"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: targetKeyspaceName, - ShardNames: []string{"0"}, - } - - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + env, ts := newTrafficSwitcherEnv(t, []string{"t1"}, "sourceks", []string{"-80", "80-"}, "targetks", []string{"0"}, "wf1") defer env.close() - env.tmc.schema = schema - - ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspaceName, workflowName) - require.NoError(t, err) drLog := NewLogRecorder() dr := switcherDryRun{ @@ -176,7 +120,7 @@ func TestRemoveSourceTables(t *testing.T) { drLog: drLog, } - err = dr.removeSourceTables(ctx, RenameTable) + err := dr.removeSourceTables(ctx, RenameTable) require.NoError(t, err) require.Len(t, drLog.logs, 1) log := drLog.logs[0] @@ -201,37 +145,8 @@ func TestDropShards(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() - workflowName := "wf1" - tableName := "t1" - sourceKeyspaceName := "sourceks" - targetKeyspaceName := "targetks" - - schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ - tableName: { - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ - { - Name: tableName, - Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), - }, - }, - }, - } - - sourceKeyspace := &testKeyspace{ - KeyspaceName: sourceKeyspaceName, - ShardNames: []string{"-80", "80-"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: targetKeyspaceName, - ShardNames: []string{"0"}, - } - - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + env, ts := newTrafficSwitcherEnv(t, []string{"t1"}, "sourceks", []string{"-80", "80-"}, "targetks", []string{"0"}, "wf1") defer env.close() - env.tmc.schema = schema - - ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspaceName, workflowName) - require.NoError(t, err) drLog := NewLogRecorder() dr := switcherDryRun{ @@ -239,7 +154,7 @@ func TestDropShards(t *testing.T) { drLog: drLog, } - err = dr.dropSourceShards(ctx) + err := dr.dropSourceShards(ctx) require.NoError(t, err) require.Len(t, drLog.logs, 1) log := drLog.logs[0] @@ -259,37 +174,8 @@ func TestDropSourceReverseVReplicationStreams(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() - workflowName := "wf1" - tableName := "t1" - sourceKeyspaceName := "sourceks" - targetKeyspaceName := "targetks" - - schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ - tableName: { - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ - { - Name: tableName, - Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), - }, - }, - }, - } - - sourceKeyspace := &testKeyspace{ - KeyspaceName: sourceKeyspaceName, - ShardNames: []string{"-80", "80-"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: targetKeyspaceName, - ShardNames: []string{"0"}, - } - - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + env, ts := newTrafficSwitcherEnv(t, []string{"t1"}, "sourceks", []string{"-80", "80-"}, "targetks", []string{"0"}, "wf1") defer env.close() - env.tmc.schema = schema - - ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspaceName, workflowName) - require.NoError(t, err) drLog := NewLogRecorder() dr := switcherDryRun{ @@ -297,7 +183,7 @@ func TestDropSourceReverseVReplicationStreams(t *testing.T) { drLog: drLog, } - err = dr.dropSourceReverseVReplicationStreams(ctx) + err := dr.dropSourceReverseVReplicationStreams(ctx) require.NoError(t, err) require.Len(t, drLog.logs, 1) log := drLog.logs[0] @@ -305,53 +191,15 @@ func TestDropSourceReverseVReplicationStreams(t *testing.T) { // Make sure both the source streams are included in the logs assert.Contains(t, log, "-80") assert.Contains(t, log, "80-") - assert.Contains(t, log, fmt.Sprintf("%s_reverse", workflowName)) + assert.Contains(t, log, "wf1_reverse") } func TestDropSourceDeniedTables(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() - workflowName := "wf1" - tableName := "t1" - tableName2 := "t2" - sourceKeyspaceName := "sourceks" - targetKeyspaceName := "targetks" - - schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ - tableName: { - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ - { - Name: tableName, - Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), - }, - }, - }, - tableName2: { - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ - { - Name: tableName2, - Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName2), - }, - }, - }, - } - - sourceKeyspace := &testKeyspace{ - KeyspaceName: sourceKeyspaceName, - ShardNames: []string{"-80", "80-"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: targetKeyspaceName, - ShardNames: []string{"0"}, - } - - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + env, ts := newTrafficSwitcherEnv(t, []string{"t1", "t2"}, "sourceks", []string{"-80", "80-"}, "targetks", []string{"0"}, "wf1") defer env.close() - env.tmc.schema = schema - - ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspaceName, workflowName) - require.NoError(t, err) drLog := NewLogRecorder() dr := switcherDryRun{ @@ -359,7 +207,7 @@ func TestDropSourceDeniedTables(t *testing.T) { drLog: drLog, } - err = dr.dropSourceDeniedTables(ctx) + err := dr.dropSourceDeniedTables(ctx) require.NoError(t, err) require.Len(t, drLog.logs, 1) log := drLog.logs[0] @@ -376,46 +224,8 @@ func TestDropTargetDeniedTables(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() - workflowName := "wf1" - tableName := "t1" - tableName2 := "t2" - sourceKeyspaceName := "sourceks" - targetKeyspaceName := "targetks" - - schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ - tableName: { - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ - { - Name: tableName, - Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), - }, - }, - }, - tableName2: { - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ - { - Name: tableName2, - Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName2), - }, - }, - }, - } - - sourceKeyspace := &testKeyspace{ - KeyspaceName: sourceKeyspaceName, - ShardNames: []string{"0"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: targetKeyspaceName, - ShardNames: []string{"-80", "80-"}, - } - - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + env, ts := newTrafficSwitcherEnv(t, []string{"t1", "t2"}, "sourceks", []string{"0"}, "targetks", []string{"-80", "80-"}, "wf1") defer env.close() - env.tmc.schema = schema - - ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspaceName, workflowName) - require.NoError(t, err) drLog := NewLogRecorder() dr := switcherDryRun{ @@ -423,7 +233,7 @@ func TestDropTargetDeniedTables(t *testing.T) { drLog: drLog, } - err = dr.dropTargetDeniedTables(ctx) + err := dr.dropTargetDeniedTables(ctx) require.NoError(t, err) require.Len(t, drLog.logs, 1) log := drLog.logs[0] @@ -440,46 +250,8 @@ func TestRemoveTargetTables(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() - workflowName := "wf1" - tableName := "t1" - tableName2 := "t2" - sourceKeyspaceName := "sourceks" - targetKeyspaceName := "targetks" - - schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ - tableName: { - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ - { - Name: tableName, - Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), - }, - }, - }, - tableName2: { - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ - { - Name: tableName2, - Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName2), - }, - }, - }, - } - - sourceKeyspace := &testKeyspace{ - KeyspaceName: sourceKeyspaceName, - ShardNames: []string{"0"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: targetKeyspaceName, - ShardNames: []string{"-80", "80-"}, - } - - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + env, ts := newTrafficSwitcherEnv(t, []string{"t1", "t2"}, "sourceks", []string{"0"}, "targetks", []string{"-80", "80-"}, "wf1") defer env.close() - env.tmc.schema = schema - - ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspaceName, workflowName) - require.NoError(t, err) drLog := NewLogRecorder() dr := switcherDryRun{ @@ -487,12 +259,12 @@ func TestRemoveTargetTables(t *testing.T) { drLog: drLog, } - err = dr.removeTargetTables(ctx) + err := dr.removeTargetTables(ctx) require.NoError(t, err) require.Len(t, drLog.logs, 1) log := drLog.logs[0] - assert.Contains(t, log, targetKeyspaceName) + assert.Contains(t, log, "targetks") // Make sure both the target streams are included in the logs assert.Contains(t, log, "-80") assert.Contains(t, log, "80-") @@ -505,46 +277,11 @@ func TestSwitchKeyspaceReads(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() - workflowName := "wf1" - tableName := "t1" - tableName2 := "t2" sourceKeyspaceName := "sourceks" targetKeyspaceName := "targetks" - schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ - tableName: { - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ - { - Name: tableName, - Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), - }, - }, - }, - tableName2: { - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ - { - Name: tableName2, - Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName2), - }, - }, - }, - } - - sourceKeyspace := &testKeyspace{ - KeyspaceName: sourceKeyspaceName, - ShardNames: []string{"0"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: targetKeyspaceName, - ShardNames: []string{"-80", "80-"}, - } - - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + env, ts := newTrafficSwitcherEnv(t, []string{"t1", "t2"}, sourceKeyspaceName, []string{"0"}, targetKeyspaceName, []string{"-80", "80-"}, "wf1") defer env.close() - env.tmc.schema = schema - - ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspaceName, workflowName) - require.NoError(t, err) drLog := NewLogRecorder() dr := switcherDryRun{ @@ -552,7 +289,7 @@ func TestSwitchKeyspaceReads(t *testing.T) { drLog: drLog, } - err = dr.switchKeyspaceReads(ctx, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_RDONLY}) + err := dr.switchKeyspaceReads(ctx, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_RDONLY}) require.NoError(t, err) require.Len(t, drLog.logs, 1) log := drLog.logs[0] @@ -565,46 +302,11 @@ func TestSwitchShardReads(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() - workflowName := "wf1" - tableName := "t1" - tableName2 := "t2" sourceKeyspaceName := "sourceks" targetKeyspaceName := "targetks" - schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ - tableName: { - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ - { - Name: tableName, - Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), - }, - }, - }, - tableName2: { - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ - { - Name: tableName2, - Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName2), - }, - }, - }, - } - - sourceKeyspace := &testKeyspace{ - KeyspaceName: sourceKeyspaceName, - ShardNames: []string{"0"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: targetKeyspaceName, - ShardNames: []string{"-80", "80-"}, - } - - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + env, ts := newTrafficSwitcherEnv(t, []string{"t1", "t2"}, sourceKeyspaceName, []string{"0"}, targetKeyspaceName, []string{"-80", "80-"}, "wf1") defer env.close() - env.tmc.schema = schema - - ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspaceName, workflowName) - require.NoError(t, err) drLog := NewLogRecorder() dr := switcherDryRun{ @@ -612,7 +314,7 @@ func TestSwitchShardReads(t *testing.T) { drLog: drLog, } - err = dr.switchShardReads(ctx, nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_RDONLY}, DirectionForward) + err := dr.switchShardReads(ctx, nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_RDONLY}, DirectionForward) require.NoError(t, err) require.Len(t, drLog.logs, 1) log := drLog.logs[0] @@ -636,46 +338,11 @@ func TestChangeRouting(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() - workflowName := "wf1" - tableName := "t1" - tableName2 := "t2" sourceKeyspaceName := "sourceks" targetKeyspaceName := "targetks" - schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ - tableName: { - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ - { - Name: tableName, - Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), - }, - }, - }, - tableName2: { - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ - { - Name: tableName2, - Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName2), - }, - }, - }, - } - - sourceKeyspace := &testKeyspace{ - KeyspaceName: sourceKeyspaceName, - ShardNames: []string{"0"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: targetKeyspaceName, - ShardNames: []string{"-80", "80-"}, - } - - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + env, ts := newTrafficSwitcherEnv(t, []string{"t1", "t2"}, sourceKeyspaceName, []string{"0"}, targetKeyspaceName, []string{"-80", "80-"}, "wf1") defer env.close() - env.tmc.schema = schema - - ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspaceName, workflowName) - require.NoError(t, err) drLog := NewLogRecorder() dr := switcherDryRun{ @@ -684,7 +351,7 @@ func TestChangeRouting(t *testing.T) { } ts.migrationType = binlogdatapb.MigrationType_TABLES - err = dr.changeRouting(ctx) + err := dr.changeRouting(ctx) require.NoError(t, err) assert.Len(t, drLog.logs, 2) assert.Contains(t, drLog.logs[0], fmt.Sprintf("keyspace %s to keyspace %s", sourceKeyspaceName, targetKeyspaceName))