diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index 9a16ce5a130..276f2215b94 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -854,6 +854,7 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) { targetTabletUID := 300 targetKs := "targetks" wf := "testwf" + vreplID := 1 table := defaultSchema.TableDefinitions[0].Name invalidTimeZone := "NOPE" bls := fmt.Sprintf("keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"%s\" filter:\"select * from %s\"}}", @@ -883,18 +884,9 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) { err := topotools.SaveRoutingRules(ctx, tenv.ts, nil) require.NoError(t, err, "failed to save routing rules") - tenv.tmc.setVReplicationExecResults(targetTablet.tablet, fmt.Sprintf(checkForWorkflow, targetKs, wf), &sqltypes.Result{}) - tenv.tmc.setVReplicationExecResults(targetTablet.tablet, fmt.Sprintf(checkForFrozenWorkflow, targetKs), &sqltypes.Result{}) - tenv.tmc.setVReplicationExecResults(targetTablet.tablet, fmt.Sprintf(getWorkflow, targetKs, wf), - sqltypes.MakeTestResult( - sqltypes.MakeTestFields( - "id", - "int64", - ), - "1", - ), - ) - targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.GetIdentifier()), &sqltypes.Result{}, nil) + addInvariants(targetTablet.vrdbClient, vreplID, sourceTabletUID, position, wf, tenv.cells[0]) + + tenv.tmc.tablets[targetTabletUID].vrdbClient.ExpectRequest(fmt.Sprintf(readAllWorkflows, targetKs, ""), &sqltypes.Result{}, nil) targetTablet.vrdbClient.ExpectRequest( fmt.Sprintf("%s %s", insertVReplicationPrefix, @@ -908,82 +900,24 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) { nil, ) targetTablet.vrdbClient.ExpectRequest(getAutoIncrementStep, &sqltypes.Result{}, nil) - targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(getVReplicationRecord, 1), + targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(getVReplicationRecord, vreplID), sqltypes.MakeTestResult( sqltypes.MakeTestFields( "id|source", "int64|varchar", ), - fmt.Sprintf("%d|%s", 1, bls), - ), - nil, - ) - targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(updatePickedSourceTablet, tenv.cells[0], sourceTabletUID, 1), - &sqltypes.Result{}, nil) - targetTablet.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil) - targetTablet.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil) - targetTablet.vrdbClient.ExpectRequest(setNetReadTimeout, &sqltypes.Result{}, nil) - targetTablet.vrdbClient.ExpectRequest(setNetWriteTimeout, &sqltypes.Result{}, nil) - targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(getRowsCopied, 1), - sqltypes.MakeTestResult( - sqltypes.MakeTestFields( - "rows_copied", - "int64", - ), - "0", - ), - nil, - ) - targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(getWorkflowState, 1), - sqltypes.MakeTestResult( - sqltypes.MakeTestFields( - "pos|stop_pos|max_tps|max_replication_lag|state|workflow_type|workflow|workflow_sub_type|defer_secondary_keys", - "varchar|varchar|int64|int64|varchar|int64|varchar|int64|int64", - ), - fmt.Sprintf("||0|0|Stopped|1|%s|0|0", wf), + fmt.Sprintf("%d|%s", vreplID, bls), ), nil, ) - targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(getNumCopyStateTable, 1), - sqltypes.MakeTestResult( - sqltypes.MakeTestFields( - "count(distinct table_name)", - "int64", - ), - "1", - ), - nil, - ) - targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(getWorkflowState, 1), - sqltypes.MakeTestResult( - sqltypes.MakeTestFields( - "pos|stop_pos|max_tps|max_replication_lag|state|workflow_type|workflow|workflow_sub_type|defer_secondary_keys", - "varchar|varchar|int64|int64|varchar|int64|varchar|int64|int64", - ), - fmt.Sprintf("||0|0|Stopped|1|%s|0|0", wf), - ), - nil, - ) - targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(getNumCopyStateTable, 1), - sqltypes.MakeTestResult( - sqltypes.MakeTestFields( - "count(distinct table_name)", - "int64", - ), - "1", - ), - nil, - ) - targetTablet.vrdbClient.ExpectRequest(getBinlogRowImage, - sqltypes.MakeTestResult( - sqltypes.MakeTestFields( - "@@binlog_row_image", - "varchar", - ), - "FULL", + + targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflow, wf, tenv.dbName), sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys", + "int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64", ), - nil, - ) + fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Stopped||%s|1||0|0|0||0|1", vreplID, bls, position, targetKs), + ), nil) targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(insertStreamsCreatedLog, bls), &sqltypes.Result{}, nil) tenv.tmc.setVReplicationExecResults(targetTablet.tablet,