Skip to content

Commit

Permalink
Address flakes in TestMoveTables*
Browse files Browse the repository at this point in the history
Stop unnecessarily calling vrdbclient.Reset.
And use mutex to protect fakeTMClient members.

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Oct 13, 2024
1 parent f0062e6 commit c14175c
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 12 deletions.
8 changes: 0 additions & 8 deletions go/vt/binlog/binlogplayer/mock_dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,6 @@ func NewMockDbaClient(t *testing.T) *MockDBClient {
}
}

func (dc *MockDBClient) Reset() {
dc.expectMu.Lock()
defer dc.expectMu.Unlock()
dc.currentResult = 0
dc.expect = nil
dc.done = make(chan struct{})
}

// ExpectRequest adds an expected result to the mock.
// This function should not be called conncurrently with other commands.
func (dc *MockDBClient) ExpectRequest(query string, result *sqltypes.Result, err error) {
Expand Down
18 changes: 16 additions & 2 deletions go/vt/vttablet/tabletmanager/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,6 @@ func (tmc *fakeTMClient) ApplySchema(ctx context.Context, tablet *topodatapb.Tab
}

func (tmc *fakeTMClient) schemaRequested(uid int) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
key := strconv.Itoa(int(uid))
n, ok := tmc.getSchemaCounts[key]
if !ok {
Expand All @@ -439,6 +437,8 @@ func (tmc *fakeTMClient) getSchemaRequestCount(uid int) int {
}

func (tmc *fakeTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
tmc.schemaRequested(int(tablet.Alias.Uid))
// Return the schema for the tablet if it exists.
if schema, ok := tmc.tabletSchemas[int(tablet.Alias.Uid)]; ok {
Expand Down Expand Up @@ -466,6 +466,8 @@ func (tmc *fakeTMClient) ExecuteFetchAsDba(ctx context.Context, tablet *topodata
// and their results. You can specify exact strings or strings prefixed with
// a '/', in which case they will be treated as a valid regexp.
func (tmc *fakeTMClient) setVReplicationExecResults(tablet *topodatapb.Tablet, query string, result *sqltypes.Result) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
queries, ok := tmc.vreQueries[int(tablet.Alias.Uid)]
if !ok {
queries = make(map[string]*querypb.QueryResult)
Expand All @@ -475,6 +477,8 @@ func (tmc *fakeTMClient) setVReplicationExecResults(tablet *topodatapb.Tablet, q
}

func (tmc *fakeTMClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
if result, ok := tmc.vreQueries[int(tablet.Alias.Uid)][query]; ok {
return result, nil
}
Expand Down Expand Up @@ -514,6 +518,8 @@ func (tmc *fakeTMClient) VDiff(ctx context.Context, tablet *topodatapb.Tablet, r
}

func (tmc *fakeTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
return tmc.tablets[int(tablet.Alias.Uid)].tm.CreateVReplicationWorkflow(ctx, req)
}

Expand All @@ -529,17 +535,25 @@ func (tmc *fakeTMClient) DeleteVReplicationWorkflow(ctx context.Context, tablet
}

func (tmc *fakeTMClient) HasVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.HasVReplicationWorkflowsRequest) (*tabletmanagerdatapb.HasVReplicationWorkflowsResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
return tmc.tablets[int(tablet.Alias.Uid)].tm.HasVReplicationWorkflows(ctx, req)
}

func (tmc *fakeTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
return tmc.tablets[int(tablet.Alias.Uid)].tm.ReadVReplicationWorkflow(ctx, req)
}

func (tmc *fakeTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowsRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
return tmc.tablets[int(tablet.Alias.Uid)].tm.ReadVReplicationWorkflows(ctx, req)
}

func (tmc *fakeTMClient) UpdateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest) (*tabletmanagerdatapb.UpdateVReplicationWorkflowResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
return tmc.tablets[int(tablet.Alias.Uid)].tm.UpdateVReplicationWorkflow(ctx, req)
}
2 changes: 0 additions & 2 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,6 @@ func TestMoveTablesUnsharded(t *testing.T) {
require.NoError(t, err)

for _, ftc := range targetShards {
ftc.vrdbClient.Reset()
ftc.vrdbClient.AddInvariant(binlogplayer.TestGetWorkflowQueryId1, sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options",
Expand Down Expand Up @@ -732,7 +731,6 @@ func TestMoveTablesSharded(t *testing.T) {
})
require.NoError(t, err)
for _, ftc := range targetShards {
ftc.vrdbClient.Reset()
ftc.vrdbClient.AddInvariant(binlogplayer.TestGetWorkflowQueryId1, sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options",
Expand Down

0 comments on commit c14175c

Please sign in to comment.