diff --git a/go/vt/vtctl/workflow/materializer_env_test.go b/go/vt/vtctl/workflow/materializer_env_test.go index f6c578d45d8..8b407dbcb0c 100644 --- a/go/vt/vtctl/workflow/materializer_env_test.go +++ b/go/vt/vtctl/workflow/materializer_env_test.go @@ -244,6 +244,8 @@ func newTestMaterializerTMClient(keyspace string, sourceShards []string, tableSe } func (tmc *testMaterializerTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil { if expect.req != nil && !proto.Equal(expect.req, request) { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", request, expect) @@ -257,6 +259,9 @@ func (tmc *testMaterializerTMClient) CreateVReplicationWorkflow(ctx context.Cont } func (tmc *testMaterializerTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + if tmc.readVReplicationWorkflow != nil { return tmc.readVReplicationWorkflow(ctx, tablet, request) } @@ -310,6 +315,9 @@ func (tmc *testMaterializerTMClient) DeleteVReplicationWorkflow(ctx context.Cont } func (tmc *testMaterializerTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + schemaDefn := &tabletmanagerdatapb.SchemaDefinition{} for _, table := range request.Tables { if table == "/.*/" { @@ -430,6 +438,9 @@ func (tmc *testMaterializerTMClient) HasVReplicationWorkflows(ctx context.Contex } func (tmc *testMaterializerTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowsRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + workflowType := binlogdatapb.VReplicationWorkflowType_MoveTables if len(req.IncludeWorkflows) > 0 { for _, wf := range req.IncludeWorkflows { diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 419e702c454..30c8c24c987 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -1667,7 +1667,7 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) { } require.NoError(t, err) // withTable is a vschema that already contains the table and thus - // we make not vschema changes and there's nothing to cancel. + // we don't make any vschema changes and there's nothing to cancel. require.True(t, (cancelFunc != nil) == (tcase.targetVSchema != withTable)) utils.MustMatch(t, tcase.out, got, tcase.description) } @@ -2463,7 +2463,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { }, vrepExecQueries: []string{"CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)"}, createRequest: &createVReplicationWorkflowRequestResponse{ - req: nil, + req: nil, // We don't care about defining it in this case res: &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{}, err: errors.New("we gots us an error"), }, diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index ff96e881e99..1674055ca4f 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1139,7 +1139,8 @@ func (s *Server) LookupVindexCreate(ctx context.Context, req *vtctldatapb.Lookup } if err := s.ts.SaveVSchema(ctx, ms.TargetKeyspace, targetVSchema); err != nil { - return nil, err + return nil, vterrors.Wrapf(err, "failed to save updated vschema '%v' in the %s keyspace", + targetVSchema, ms.TargetKeyspace) } ms.TabletTypes = topoproto.MakeStringTypeCSV(req.TabletTypes) ms.TabletSelectionPreference = req.TabletSelectionPreference @@ -4146,6 +4147,8 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } materializeQuery = buf.String() + // Save a copy of the original vschema if we modify it and need to provide + // a cancelFunc. ogTargetVSchema := targetVSchema.CloneVT() targetChanged := false