From 7c5ac3f13c332a2d97a271feb57905bfcfab0c76 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 19 Dec 2023 16:53:13 -0500 Subject: [PATCH] Update singular workflow in traffic switcher Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/traffic_switcher.go | 3 +- go/vt/wrangler/traffic_switcher.go | 3 +- go/vt/wrangler/traffic_switcher_env_test.go | 4 +- go/vt/wrangler/traffic_switcher_test.go | 44 ++++++++++----------- 4 files changed, 28 insertions(+), 26 deletions(-) diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 9999d46cdcc..104d024ebd5 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -609,7 +609,8 @@ func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, func (ts *trafficSwitcher) startReverseVReplication(ctx context.Context) error { return ts.ForAllSources(func(source *MigrationSource) error { - query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s", encodeString(source.GetPrimary().DbName())) + query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s", + encodeString(source.GetPrimary().DbName()), encodeString(ts.ReverseWorkflowName())) _, err := ts.VReplicationExec(ctx, source.GetPrimary().Alias, query) return err }) diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index c8a47960f3d..a204eff9453 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -1615,7 +1615,8 @@ func (ts *trafficSwitcher) deleteShardRoutingRules(ctx context.Context) error { func (ts *trafficSwitcher) startReverseVReplication(ctx context.Context) error { return ts.ForAllSources(func(source *workflow.MigrationSource) error { - query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s", encodeString(source.GetPrimary().DbName())) + query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s", + encodeString(source.GetPrimary().DbName()), encodeString(ts.ReverseWorkflowName())) _, err := ts.VReplicationExec(ctx, source.GetPrimary().Alias, query) return err }) diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index 128719a8225..ff63a65b774 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -863,7 +863,7 @@ func (tme *testShardMigraterEnv) expectStartReverseVReplication() { // NOTE: this is not a faithful reproduction of what should happen. // The ids returned are not accurate. for _, dbclient := range tme.dbSourceClients { - dbclient.addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil) + dbclient.addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test_reverse'", resultid34, nil) dbclient.addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) dbclient.addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) dbclient.addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) @@ -897,7 +897,7 @@ func (tme *testShardMigraterEnv) expectCancelMigration() { dbclient.addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", &sqltypes.Result{}, nil) } for _, dbclient := range tme.dbSourceClients { - dbclient.addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow != 'test_reverse'", &sqltypes.Result{}, nil) + dbclient.addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", &sqltypes.Result{}, nil) } tme.expectDeleteReverseVReplication() } diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index f7aebed185a..f7b2e600389 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -434,11 +434,11 @@ func TestTableMigrateMainflow(t *testing.T) { createJournals() startReverseVReplication := func() { - tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1'", resultid34, nil) + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", resultid34, nil) tme.dbSourceClients[0].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) - tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1'", resultid34, nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", resultid34, nil) tme.dbSourceClients[1].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) @@ -631,10 +631,10 @@ func TestShardMigrateMainflow(t *testing.T) { checkJournals() stopStreams := func() { - tme.dbSourceClients[0].addQuery("select id, workflow, source, pos, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped' and message != 'FROZEN'", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("select id, workflow, source, pos, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped' and message != 'FROZEN'", &sqltypes.Result{}, nil) - tme.dbSourceClients[0].addQuery("select id, workflow, source, pos, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("select id, workflow, source, pos, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id, workflow, source, pos, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where db_name='vt_ks' and workflow='test' and state = 'Stopped' and message != 'FROZEN'", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id, workflow, source, pos, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where db_name='vt_ks' and workflow='test' and state = 'Stopped' and message != 'FROZEN'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id, workflow, source, pos, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where db_name='vt_ks' and workflow='test'", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id, workflow, source, pos, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where db_name='vt_ks' and workflow='test'", &sqltypes.Result{}, nil) } stopStreams() @@ -649,8 +649,8 @@ func TestShardMigrateMainflow(t *testing.T) { tme.dbSourceClients[1].addQuery("delete from _vt.post_copy_action where vrepl_id in (3, 4)", &sqltypes.Result{}, nil) } cancelMigration := func() { - tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow != 'test_reverse'", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow != 'test_reverse'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", &sqltypes.Result{}, nil) tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid12, nil) tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid2, nil) @@ -731,11 +731,11 @@ func TestShardMigrateMainflow(t *testing.T) { createJournals() startReverseVReplication := func() { - tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil) + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test_reverse'", resultid34, nil) tme.dbSourceClients[0].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) - tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test_reverse'", resultid34, nil) tme.dbSourceClients[1].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) @@ -1233,11 +1233,11 @@ func TestTableMigrateJournalExists(t *testing.T) { tme.dbSourceClients[1].addQueryRE(journal2, &sqltypes.Result{}, nil) // mi.startReverseVReplication - tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1'", resultid34, nil) + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", resultid34, nil) tme.dbSourceClients[0].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) - tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1'", resultid34, nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", resultid34, nil) tme.dbSourceClients[1].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) @@ -1312,11 +1312,11 @@ func TestShardMigrateJournalExists(t *testing.T) { tme.dbSourceClients[1].addQueryRE(journal2, &sqltypes.Result{}, nil) // mi.startReverseVReplication - tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil) + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test_reverse'", resultid34, nil) tme.dbSourceClients[0].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) - tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test_reverse'", resultid34, nil) tme.dbSourceClients[1].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) @@ -1942,10 +1942,10 @@ func TestShardMigrateNoAvailableTabletsForReverseReplication(t *testing.T) { checkJournals() stopStreams := func() { - tme.dbSourceClients[0].addQuery("select id, workflow, source, pos, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped' and message != 'FROZEN'", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("select id, workflow, source, pos, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped' and message != 'FROZEN'", &sqltypes.Result{}, nil) - tme.dbSourceClients[0].addQuery("select id, workflow, source, pos, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("select id, workflow, source, pos, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id, workflow, source, pos, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where db_name='vt_ks' and workflow='test' and state = 'Stopped' and message != 'FROZEN'", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id, workflow, source, pos, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where db_name='vt_ks' and workflow='test' and state = 'Stopped' and message != 'FROZEN'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id, workflow, source, pos, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where db_name='vt_ks' and workflow='test'", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id, workflow, source, pos, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where db_name='vt_ks' and workflow='test'", &sqltypes.Result{}, nil) } stopStreams() @@ -1960,8 +1960,8 @@ func TestShardMigrateNoAvailableTabletsForReverseReplication(t *testing.T) { tme.dbSourceClients[1].addQuery("delete from _vt.post_copy_action where vrepl_id in (3, 4)", &sqltypes.Result{}, nil) } cancelMigration := func() { - tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow != 'test_reverse'", &sqltypes.Result{}, nil) - tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow != 'test_reverse'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", &sqltypes.Result{}, nil) tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid12, nil) tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid2, nil) @@ -2043,11 +2043,11 @@ func TestShardMigrateNoAvailableTabletsForReverseReplication(t *testing.T) { createJournals() startReverseVReplication := func() { - tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil) + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test_reverse'", resultid34, nil) tme.dbSourceClients[0].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) - tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test_reverse'", resultid34, nil) tme.dbSourceClients[1].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil)