diff --git a/go/vt/vttablet/tabletmanager/framework_test.go b/go/vt/vttablet/tabletmanager/framework_test.go index 872b96585f7..c88e64a1dbf 100644 --- a/go/vt/vttablet/tabletmanager/framework_test.go +++ b/go/vt/vttablet/tabletmanager/framework_test.go @@ -480,8 +480,8 @@ func (tmc *fakeTMClient) VReplicationExec(ctx context.Context, tablet *topodatap } for qry, res := range tmc.vreQueries[int(tablet.Alias.Uid)] { if strings.HasPrefix(qry, "/") { - re := regexp.MustCompile(qry) - if re.MatchString(qry) { + re := regexp.MustCompile(qry[1:]) + if re.MatchString(query) { return res, nil } } diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index d266a1c5e0c..94bbed9afd6 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -128,10 +128,11 @@ func TestCreateVReplicationWorkflow(t *testing.T) { ws := workflow.NewServer(vtenv.NewTestEnv(), tenv.ts, tenv.tmc) tests := []struct { - name string - req *vtctldatapb.MoveTablesCreateRequest - schema *tabletmanagerdatapb.SchemaDefinition - query string + name string + req *vtctldatapb.MoveTablesCreateRequest + schema *tabletmanagerdatapb.SchemaDefinition + query string + selectTableQuery string }{ { name: "defaults", @@ -144,6 +145,7 @@ func TestCreateVReplicationWorkflow(t *testing.T) { }, query: fmt.Sprintf(`%s values ('%s', 'keyspace:"%s" shard:"%s" filter:{rules:{match:"t1" filter:"select * from t1"}}', '', 0, 0, '%s', '', now(), 0, 'Stopped', '%s', 1, 0, 0, '{}')`, insertVReplicationPrefix, wf, sourceKs, shard, tenv.cells[0], tenv.dbName), + selectTableQuery: "(select 't1' from t1 limit 1)", }, { name: "all values", @@ -179,6 +181,7 @@ func TestCreateVReplicationWorkflow(t *testing.T) { }, query: fmt.Sprintf(`%s values ('%s', 'keyspace:"%s" shard:"%s" filter:{rules:{match:"t1" filter:"select * from t1"}} on_ddl:EXEC stop_after_copy:true source_time_zone:"EDT" target_time_zone:"UTC"', '', 0, 0, '%s', '', now(), 0, 'Stopped', '%s', 1, 0, 1, '{}')`, insertVReplicationPrefix, wf, sourceKs, shard, tenv.cells[0], tenv.dbName), + selectTableQuery: "(select 't1' from t1 limit 1)", }, { name: "binlog source order with include", @@ -219,6 +222,7 @@ func TestCreateVReplicationWorkflow(t *testing.T) { }, query: fmt.Sprintf(`%s values ('%s', 'keyspace:"%s" shard:"%s" filter:{rules:{match:"t1" filter:"select * from t1"} rules:{match:"wut" filter:"select * from wut"} rules:{match:"zt" filter:"select * from zt"}} on_ddl:EXEC stop_after_copy:true source_time_zone:"EDT" target_time_zone:"UTC"', '', 0, 0, '%s', '', now(), 0, 'Stopped', '%s', 1, 0, 1, '{}')`, insertVReplicationPrefix, wf, sourceKs, shard, tenv.cells[0], tenv.dbName), + selectTableQuery: "/.*union all.*union all.*", }, { name: "binlog source order with all-tables", @@ -259,6 +263,7 @@ func TestCreateVReplicationWorkflow(t *testing.T) { }, query: fmt.Sprintf(`%s values ('%s', 'keyspace:"%s" shard:"%s" filter:{rules:{match:"t1" filter:"select * from t1"} rules:{match:"wut" filter:"select * from wut"} rules:{match:"zt" filter:"select * from zt"}} on_ddl:EXEC stop_after_copy:true source_time_zone:"EDT" target_time_zone:"UTC"', '', 0, 0, '%s', '', now(), 0, 'Stopped', '%s', 1, 0, 1, '{}')`, insertVReplicationPrefix, wf, sourceKs, shard, tenv.cells[0], tenv.dbName), + selectTableQuery: "/.*union all.*union all.*", }, } @@ -289,6 +294,7 @@ func TestCreateVReplicationWorkflow(t *testing.T) { targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readAllWorkflows, tenv.dbName, ""), &sqltypes.Result{}, nil) targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.GetIdentifier()), &sqltypes.Result{}, nil) targetTablet.vrdbClient.ExpectRequest(tt.query, &sqltypes.Result{}, errShortCircuit) + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, tt.selectTableQuery, &sqltypes.Result{}) _, err := ws.MoveTablesCreate(ctx, tt.req) tenv.tmc.tablets[targetTabletUID].vrdbClient.Wait() require.ErrorIs(t, err, errShortCircuit) @@ -712,6 +718,7 @@ func TestMoveTablesSharded(t *testing.T) { fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", wf, vreplID, bls, position, targetKs), ), nil) tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(getLatestCopyState, vreplID, vreplID), &sqltypes.Result{}) + tenv.tmc.setVReplicationExecResults(ftc.tablet, "(select 't1' from t1 limit 1)", &sqltypes.Result{}) } // We use the tablet's UID in the mocked results for the max value used on each target shard. @@ -1334,6 +1341,7 @@ func TestSourceShardSelection(t *testing.T) { tt := targetTablets[uid] tt.vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.GetIdentifier()), &sqltypes.Result{}, nil) tt.vrdbClient.ExpectRequest(fmt.Sprintf(readAllWorkflows, tenv.dbName, ""), &sqltypes.Result{}, nil) + tenv.tmc.setVReplicationExecResults(tt.tablet, "(select 't1' from t1 limit 1)", &sqltypes.Result{}) for i, sourceShard := range streams { var err error if i == len(streams)-1 { @@ -1484,6 +1492,7 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) { fmt.Sprintf(deleteWorkflow, targetKs, wf), &sqltypes.Result{RowsAffected: 1}, ) + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, "(select 't1' from t1 limit 1)", &sqltypes.Result{}) // Save the current target vschema. vs, err := tenv.ts.GetVSchema(ctx, targetKs) @@ -2112,6 +2121,7 @@ func TestMaterializerOneToOne(t *testing.T) { fmt.Sprintf(` values ('%s', 'keyspace:"%s" shard:"%s" filter:{rules:{match:"t1" filter:"select * from t1"} rules:{match:"t2" filter:"select * from t3"} rules:{match:"t4"}}', '', 0, 0, '%s', 'primary,rdonly', now(), 0, 'Stopped', '%s', 0, 0, 0, '{}')`, wf, sourceKs, shard, tenv.cells[0], tenv.dbName) targetTablet.vrdbClient.ExpectRequest(insert, &sqltypes.Result{}, errShortCircuit) + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, "/.*union all.*union all.*", &sqltypes.Result{}) err := ws.Materialize(ctx, ms) targetTablet.vrdbClient.Wait() @@ -2198,6 +2208,7 @@ func TestMaterializerManyToOne(t *testing.T) { } else { targetTablet.vrdbClient.ExpectRequest(insert, &sqltypes.Result{InsertID: uint64(vreplID)}, errShortCircuit) } + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, "/.*union all.*", &sqltypes.Result{}) } err := ws.Materialize(ctx, ms) @@ -2300,6 +2311,7 @@ func TestMaterializerOneToMany(t *testing.T) { } else { targetTablet.vrdbClient.ExpectRequest(insert, &sqltypes.Result{InsertID: uint64(vreplID)}, errShortCircuit) } + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, "(select 't1' from t1 limit 1)", &sqltypes.Result{}) } err = ws.Materialize(ctx, ms) @@ -2408,6 +2420,7 @@ func TestMaterializerManyToMany(t *testing.T) { fmt.Sprintf("%d|%s", vreplID, bls), ), nil) } + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, "(select 't1' from t1 limit 1)", &sqltypes.Result{}) } } @@ -2516,6 +2529,7 @@ func TestMaterializerMulticolumnVindex(t *testing.T) { } else { targetTablet.vrdbClient.ExpectRequest(insert, &sqltypes.Result{InsertID: uint64(vreplID)}, errShortCircuit) } + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, "(select 't1' from t1 limit 1)", &sqltypes.Result{}) } err = ws.Materialize(ctx, ms) @@ -2587,6 +2601,7 @@ func TestMaterializerDeploySchema(t *testing.T) { fmt.Sprintf(` values ('%s', 'keyspace:"%s" shard:"%s" filter:{rules:{match:"t1" filter:"select * from t1"} rules:{match:"t2" filter:"select * from t3"}}', '', 0, 0, '%s', 'primary,rdonly', now(), 0, 'Stopped', '%s', 0, 0, 0, '{}')`, wf, sourceKs, shard, tenv.cells[0], tenv.dbName) targetTablet.vrdbClient.ExpectRequest(insert, &sqltypes.Result{}, errShortCircuit) + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, "(select 't1' from t1 limit 1)", &sqltypes.Result{}) err := ws.Materialize(ctx, ms) targetTablet.vrdbClient.Wait() @@ -2657,6 +2672,7 @@ func TestMaterializerCopySchema(t *testing.T) { fmt.Sprintf(` values ('%s', 'keyspace:"%s" shard:"%s" filter:{rules:{match:"t1" filter:"select * from t1"} rules:{match:"t2" filter:"select * from t3"}}', '', 0, 0, '%s', 'primary,rdonly', now(), 0, 'Stopped', '%s', 0, 0, 0, '{}')`, wf, sourceKs, shard, tenv.cells[0], tenv.dbName) targetTablet.vrdbClient.ExpectRequest(insert, &sqltypes.Result{}, errShortCircuit) + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, "/.*union all.*", &sqltypes.Result{}) err := ws.Materialize(ctx, ms) targetTablet.vrdbClient.Wait() @@ -2763,6 +2779,7 @@ func TestMaterializerExplicitColumns(t *testing.T) { } else { targetTablet.vrdbClient.ExpectRequest(insert, &sqltypes.Result{InsertID: uint64(vreplID)}, errShortCircuit) } + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, "(select 't1' from t1 limit 1)", &sqltypes.Result{}) } err = ws.Materialize(ctx, ms) @@ -2870,6 +2887,7 @@ func TestMaterializerRenamedColumns(t *testing.T) { } else { targetTablet.vrdbClient.ExpectRequest(insert, &sqltypes.Result{InsertID: uint64(vreplID)}, errShortCircuit) } + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, "(select 't1' from t1 limit 1)", &sqltypes.Result{}) } err = ws.Materialize(ctx, ms) @@ -2932,6 +2950,7 @@ func TestMaterializerStopAfterCopy(t *testing.T) { fmt.Sprintf(` values ('%s', 'keyspace:"%s" shard:"%s" filter:{rules:{match:"t1" filter:"select * from t1"} rules:{match:"t2" filter:"select * from t3"}} stop_after_copy:true', '', 0, 0, '%s', 'primary,rdonly', now(), 0, 'Stopped', '%s', 0, 0, 0, '{}')`, wf, sourceKs, shard, tenv.cells[0], tenv.dbName) targetTablet.vrdbClient.ExpectRequest(insert, &sqltypes.Result{}, errShortCircuit) + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, "/.*union all.*", &sqltypes.Result{}) err := ws.Materialize(ctx, ms) targetTablet.vrdbClient.Wait() @@ -3386,6 +3405,7 @@ func TestMaterializerNoGoodVindex(t *testing.T) { targetTablet := targetShards[targetShard] addInvariants(targetTablet.vrdbClient, vreplID, sourceTabletUID, position, wf, tenv.cells[0]) targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readAllWorkflows, tenv.dbName, ""), &sqltypes.Result{}, nil) + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, "(select 't1' from t1 limit 1)", &sqltypes.Result{}) errs = append(errs, errNoVindex) } @@ -3461,6 +3481,7 @@ func TestMaterializerComplexVindexExpression(t *testing.T) { targetTablet := targetShards[targetShard] addInvariants(targetTablet.vrdbClient, vreplID, sourceTabletUID, position, wf, tenv.cells[0]) targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readAllWorkflows, tenv.dbName, ""), &sqltypes.Result{}, nil) + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, "(select 't1' from t1 limit 1)", &sqltypes.Result{}) errs = append(errs, errNoVindex) } @@ -3536,6 +3557,7 @@ func TestMaterializerNoVindexInExpression(t *testing.T) { targetTablet := targetShards[targetShard] addInvariants(targetTablet.vrdbClient, vreplID, sourceTabletUID, position, wf, tenv.cells[0]) targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readAllWorkflows, tenv.dbName, ""), &sqltypes.Result{}, nil) + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, "(select 't1' from t1 limit 1)", &sqltypes.Result{}) errs = append(errs, errNoVindex) }