diff --git a/go/test/endtoend/vreplication/vdiff_multiple_movetables_test.go b/go/test/endtoend/vreplication/vdiff_multiple_movetables_test.go new file mode 100644 index 00000000000..0f6a9f668d0 --- /dev/null +++ b/go/test/endtoend/vreplication/vdiff_multiple_movetables_test.go @@ -0,0 +1,135 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/tidwall/gjson" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/vt/log" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" +) + +func TestMultipleConcurrentVDiffs(t *testing.T) { + cellName := "zone" + cells := []string{cellName} + vc = NewVitessCluster(t, t.Name(), cells, mainClusterConfig) + + require.NotNil(t, vc) + allCellNames = cellName + defaultCellName := cellName + defaultCell = vc.Cells[defaultCellName] + sourceKeyspace := "product" + shardName := "0" + + defer vc.TearDown(t) + + cell := vc.Cells[cellName] + vc.AddKeyspace(t, []*Cell{cell}, sourceKeyspace, shardName, initialProductVSchema, initialProductSchema, 0, 0, 100, sourceKsOpts) + + vtgate = cell.Vtgates[0] + require.NotNil(t, vtgate) + err := cluster.WaitForHealthyShard(vc.VtctldClient, sourceKeyspace, shardName) + require.NoError(t, err) + vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", sourceKeyspace, shardName), 1, 30*time.Second) + + vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + verifyClusterHealth(t, vc) + + insertInitialData(t) + targetTabletId := 200 + targetKeyspace := "customer" + vc.AddKeyspace(t, []*Cell{cell}, targetKeyspace, shardName, initialProductVSchema, initialProductSchema, 0, 0, targetTabletId, sourceKsOpts) + vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", targetKeyspace, shardName), 1, 30*time.Second) + + index := 1000 + var loadCtx context.Context + var loadCancel context.CancelFunc + loadCtx, loadCancel = context.WithCancel(context.Background()) + load := func(tableName string) { + query := "insert into %s(cid, name) values(%d, 'customer-%d')" + for { + select { + case <-loadCtx.Done(): + log.Infof("load cancelled") + return + default: + index += 1 + vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + q := fmt.Sprintf(query, tableName, index, index) + vtgateConn.ExecuteFetch(q, 1000, false) + vtgateConn.Close() + } + time.Sleep(10 * time.Millisecond) + } + } + targetKs := vc.Cells[cellName].Keyspaces[targetKeyspace] + targetTab := targetKs.Shards["0"].Tablets[fmt.Sprintf("%s-%d", cellName, targetTabletId)].Vttablet + require.NotNil(t, targetTab) + + time.Sleep(15 * time.Second) // wait for some rows to be inserted. + + createWorkflow := func(workflowName, tables string) { + mt := newMoveTables(vc, &moveTables{ + workflowName: workflowName, + targetKeyspace: targetKeyspace, + sourceKeyspace: sourceKeyspace, + tables: tables, + }, moveTablesFlavorVtctld) + mt.Create() + waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", targetKeyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String()) + catchup(t, targetTab, workflowName, "MoveTables") + } + + createWorkflow("wf1", "customer") + createWorkflow("wf2", "customer2") + + go load("customer") + go load("customer2") + + var wg sync.WaitGroup + wg.Add(2) + + doVdiff := func(workflowName, table string) { + defer wg.Done() + vdiff(t, targetKeyspace, workflowName, cellName, true, false, nil) + } + go doVdiff("wf1", "customer") + go doVdiff("wf2", "customer2") + wg.Wait() + loadCancel() + + // confirm that show all shows the correct workflow and only that workflow. + output, err := vc.VtctldClient.ExecuteCommandWithOutput("VDiff", "--format", "json", "--workflow", "wf1", "--target-keyspace", "customer", "show", "all") + require.NoError(t, err) + log.Infof("VDiff output: %s", output) + count := gjson.Get(output, "..#").Int() + wf := gjson.Get(output, "0.Workflow").String() + ksName := gjson.Get(output, "0.Keyspace").String() + require.Equal(t, int64(1), count) + require.Equal(t, "wf1", wf) + require.Equal(t, "customer", ksName) +} diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index c26a38c4811..6927b56b89d 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1781,7 +1781,6 @@ func (s *Server) VDiffShow(ctx context.Context, req *vtctldatapb.VDiffShowReques log.Errorf("Error executing vdiff show action: %v", output.err) return nil, output.err } - return &vtctldatapb.VDiffShowResponse{ TabletResponses: output.responses, }, nil diff --git a/go/vt/vttablet/tabletmanager/vdiff/action.go b/go/vt/vttablet/tabletmanager/vdiff/action.go index ac9bec86990..59ee79077f7 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/action.go +++ b/go/vt/vttablet/tabletmanager/vdiff/action.go @@ -47,6 +47,8 @@ const ( DeleteAction VDiffAction = "delete" AllActionArg = "all" LastActionArg = "last" + + maxVDiffsToReport = 100 ) var ( @@ -267,13 +269,13 @@ func (vde *Engine) handleCreateResumeAction(ctx context.Context, dbClient binlog func (vde *Engine) handleShowAction(ctx context.Context, dbClient binlogplayer.DBClient, action VDiffAction, req *tabletmanagerdatapb.VDiffRequest, resp *tabletmanagerdatapb.VDiffResponse) error { var qr *sqltypes.Result - var err error vdiffUUID := "" if req.ActionArg == LastActionArg { - query, err := sqlparser.ParseAndBind(sqlGetMostRecentVDiff, + query, err := sqlparser.ParseAndBind(sqlGetMostRecentVDiffByKeyspaceWorkflow, sqltypes.StringBindVariable(req.Keyspace), sqltypes.StringBindVariable(req.Workflow), + sqltypes.Int64BindVariable(1), ) if err != nil { return err @@ -322,7 +324,15 @@ func (vde *Engine) handleShowAction(ctx context.Context, dbClient binlogplayer.D } switch req.ActionArg { case AllActionArg: - if qr, err = dbClient.ExecuteFetch(sqlGetAllVDiffs, -1); err != nil { + query, err := sqlparser.ParseAndBind(sqlGetMostRecentVDiffByKeyspaceWorkflow, + sqltypes.StringBindVariable(req.Keyspace), + sqltypes.StringBindVariable(req.Workflow), + sqltypes.Int64BindVariable(maxVDiffsToReport), + ) + if err != nil { + return err + } + if qr, err = dbClient.ExecuteFetch(query, -1); err != nil { return err } resp.Output = sqltypes.ResultToProto3(qr) diff --git a/go/vt/vttablet/tabletmanager/vdiff/action_test.go b/go/vt/vttablet/tabletmanager/vdiff/action_test.go index 9bbfbaa4d68..1049bc8607d 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/action_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/action_test.go @@ -175,6 +175,38 @@ func TestPerformVDiffAction(t *testing.T) { }, }, }, + { + name: "show last", + req: &tabletmanagerdatapb.VDiffRequest{ + Action: string(ShowAction), + ActionArg: "last", + Keyspace: keyspace, + Workflow: workflow, + }, + expectQueries: []queryAndResult{ + { + query: fmt.Sprintf("select * from _vt.vdiff where keyspace = %s and workflow = %s order by id desc limit %d", + encodeString(keyspace), encodeString(workflow), 1), + result: noResults, + }, + }, + }, + { + name: "show all", + req: &tabletmanagerdatapb.VDiffRequest{ + Action: string(ShowAction), + ActionArg: "all", + Keyspace: keyspace, + Workflow: workflow, + }, + expectQueries: []queryAndResult{ + { + query: fmt.Sprintf("select * from _vt.vdiff where keyspace = %s and workflow = %s order by id desc limit %d", + encodeString(keyspace), encodeString(workflow), maxVDiffsToReport), + result: noResults, + }, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/go/vt/vttablet/tabletmanager/vdiff/schema.go b/go/vt/vttablet/tabletmanager/vdiff/schema.go index 72da9f15ada..a63e60d9434 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/schema.go +++ b/go/vt/vttablet/tabletmanager/vdiff/schema.go @@ -24,10 +24,10 @@ const ( and vdt.state in ('completed', 'stopped')` sqlRetryVDiff = `update _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) set vd.state = 'pending', vd.last_error = '', vdt.state = 'pending' where vd.id = %a and (vd.state = 'error' or vdt.state = 'error')` - sqlGetVDiffByKeyspaceWorkflowUUID = "select * from _vt.vdiff where keyspace = %a and workflow = %a and vdiff_uuid = %a" - sqlGetMostRecentVDiff = "select * from _vt.vdiff where keyspace = %a and workflow = %a order by id desc limit 1" - sqlGetVDiffByID = "select * from _vt.vdiff where id = %a" - sqlDeleteVDiffs = `delete from vd, vdt, vdl using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) + sqlGetVDiffByKeyspaceWorkflowUUID = "select * from _vt.vdiff where keyspace = %a and workflow = %a and vdiff_uuid = %a" + sqlGetMostRecentVDiffByKeyspaceWorkflow = "select * from _vt.vdiff where keyspace = %a and workflow = %a order by id desc limit %a" + sqlGetVDiffByID = "select * from _vt.vdiff where id = %a" + sqlDeleteVDiffs = `delete from vd, vdt, vdl using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) left join _vt.vdiff_log as vdl on (vd.id = vdl.vdiff_id) where vd.keyspace = %a and vd.workflow = %a` sqlDeleteVDiffByUUID = `delete from vd, vdt using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) @@ -48,7 +48,6 @@ const ( sqlGetVDiffsToRetry = "select * from _vt.vdiff where state = 'error' and json_unquote(json_extract(options, '$.core_options.auto_retry')) = 'true'" sqlGetVDiffID = "select id as id from _vt.vdiff where vdiff_uuid = %a" sqlGetVDiffIDsByKeyspaceWorkflow = "select id as id from _vt.vdiff where keyspace = %a and workflow = %a" - sqlGetAllVDiffs = "select * from _vt.vdiff order by id desc" sqlGetTableRows = "select table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %a and table_name = %a" sqlGetAllTableRows = "select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %s and table_name in (%s)" diff --git a/test/config.json b/test/config.json index 2894f54060a..66657b4f37e 100644 --- a/test/config.json +++ b/test/config.json @@ -1049,6 +1049,15 @@ "RetryMax": 0, "Tags": [] }, + "vdiff_multiple_movetables_test.go": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMultipleConcurrentVDiffs"], + "Command": [], + "Manual": false, + "Shard": "vreplication_partial_movetables_basic", + "RetryMax": 0, + "Tags": [] + }, "vreplication_movetables_buffering": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMoveTablesBuffering"],