diff --git a/go/vt/binlog/binlogplayer/mock_dbclient.go b/go/vt/binlog/binlogplayer/mock_dbclient.go index 2135c8bf1e5..f0a811a30cf 100644 --- a/go/vt/binlog/binlogplayer/mock_dbclient.go +++ b/go/vt/binlog/binlogplayer/mock_dbclient.go @@ -100,14 +100,6 @@ func NewMockDbaClient(t *testing.T) *MockDBClient { } } -func (dc *MockDBClient) Reset() { - dc.expectMu.Lock() - defer dc.expectMu.Unlock() - dc.currentResult = 0 - dc.expect = nil - dc.done = make(chan struct{}) -} - // ExpectRequest adds an expected result to the mock. // This function should not be called conncurrently with other commands. func (dc *MockDBClient) ExpectRequest(query string, result *sqltypes.Result, err error) { diff --git a/go/vt/vttablet/tabletmanager/framework_test.go b/go/vt/vttablet/tabletmanager/framework_test.go index 872b96585f7..6b61303874f 100644 --- a/go/vt/vttablet/tabletmanager/framework_test.go +++ b/go/vt/vttablet/tabletmanager/framework_test.go @@ -420,8 +420,6 @@ func (tmc *fakeTMClient) ApplySchema(ctx context.Context, tablet *topodatapb.Tab } func (tmc *fakeTMClient) schemaRequested(uid int) { - tmc.mu.Lock() - defer tmc.mu.Unlock() key := strconv.Itoa(int(uid)) n, ok := tmc.getSchemaCounts[key] if !ok { @@ -439,6 +437,8 @@ func (tmc *fakeTMClient) getSchemaRequestCount(uid int) int { } func (tmc *fakeTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() tmc.schemaRequested(int(tablet.Alias.Uid)) // Return the schema for the tablet if it exists. if schema, ok := tmc.tabletSchemas[int(tablet.Alias.Uid)]; ok { @@ -466,6 +466,8 @@ func (tmc *fakeTMClient) ExecuteFetchAsDba(ctx context.Context, tablet *topodata // and their results. You can specify exact strings or strings prefixed with // a '/', in which case they will be treated as a valid regexp. func (tmc *fakeTMClient) setVReplicationExecResults(tablet *topodatapb.Tablet, query string, result *sqltypes.Result) { + tmc.mu.Lock() + defer tmc.mu.Unlock() queries, ok := tmc.vreQueries[int(tablet.Alias.Uid)] if !ok { queries = make(map[string]*querypb.QueryResult) @@ -475,6 +477,8 @@ func (tmc *fakeTMClient) setVReplicationExecResults(tablet *topodatapb.Tablet, q } func (tmc *fakeTMClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() if result, ok := tmc.vreQueries[int(tablet.Alias.Uid)][query]; ok { return result, nil } @@ -514,6 +518,8 @@ func (tmc *fakeTMClient) VDiff(ctx context.Context, tablet *topodatapb.Tablet, r } func (tmc *fakeTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() return tmc.tablets[int(tablet.Alias.Uid)].tm.CreateVReplicationWorkflow(ctx, req) } @@ -529,17 +535,25 @@ func (tmc *fakeTMClient) DeleteVReplicationWorkflow(ctx context.Context, tablet } func (tmc *fakeTMClient) HasVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.HasVReplicationWorkflowsRequest) (*tabletmanagerdatapb.HasVReplicationWorkflowsResponse, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() return tmc.tablets[int(tablet.Alias.Uid)].tm.HasVReplicationWorkflows(ctx, req) } func (tmc *fakeTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() return tmc.tablets[int(tablet.Alias.Uid)].tm.ReadVReplicationWorkflow(ctx, req) } func (tmc *fakeTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowsRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() return tmc.tablets[int(tablet.Alias.Uid)].tm.ReadVReplicationWorkflows(ctx, req) } func (tmc *fakeTMClient) UpdateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest) (*tabletmanagerdatapb.UpdateVReplicationWorkflowResponse, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() return tmc.tablets[int(tablet.Alias.Uid)].tm.UpdateVReplicationWorkflow(ctx, req) } diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index d266a1c5e0c..ffd137c8c69 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -472,7 +472,6 @@ func TestMoveTablesUnsharded(t *testing.T) { require.NoError(t, err) for _, ftc := range targetShards { - ftc.vrdbClient.Reset() ftc.vrdbClient.AddInvariant(binlogplayer.TestGetWorkflowQueryId1, sqltypes.MakeTestResult( sqltypes.MakeTestFields( "id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options", @@ -732,7 +731,6 @@ func TestMoveTablesSharded(t *testing.T) { }) require.NoError(t, err) for _, ftc := range targetShards { - ftc.vrdbClient.Reset() ftc.vrdbClient.AddInvariant(binlogplayer.TestGetWorkflowQueryId1, sqltypes.MakeTestResult( sqltypes.MakeTestFields( "id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options",