From c99eb2c5adc6537ff395ef080478f0f089fbf036 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 21 Sep 2024 19:57:06 -0400 Subject: [PATCH] Improvements Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/framework_test.go | 4 ++-- go/vt/vtctl/workflow/materializer.go | 23 +++++++++++++------- go/vt/vtctl/workflow/materializer_test.go | 26 +++++++++++++---------- 3 files changed, 32 insertions(+), 21 deletions(-) diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go index bb18ba0cff1..ecceaa4b41c 100644 --- a/go/vt/vtctl/workflow/framework_test.go +++ b/go/vt/vtctl/workflow/framework_test.go @@ -304,7 +304,7 @@ func (tmc *testTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet defer tmc.mu.Unlock() if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil { - if !proto.Equal(expect.req, req) { + if expect.req != nil && !proto.Equal(expect.req, req) { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", req, expect) } if expect.res != nil { @@ -443,7 +443,7 @@ func (tmc *testTMClient) VReplicationExec(ctx context.Context, tablet *topodatap qrs := tmc.vrQueries[int(tablet.Alias.Uid)] if len(qrs) == 0 { - return nil, fmt.Errorf("tablet %v does not expect any more queries: %s", tablet, query) + return nil, fmt.Errorf("tablet %v does not expect any more queries: %q", tablet, query) } matched := false if qrs[0].query[0] == '/' { diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index ea8b75c41c8..3378f9e9975 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -45,6 +45,7 @@ import ( tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) const ( @@ -113,13 +114,11 @@ func (mz *materializer) createWorkflowStreams(req *tabletmanagerdatapb.CreateVRe if err := validateNewWorkflow(mz.ctx, mz.ts, mz.tmc, mz.ms.TargetKeyspace, mz.ms.Workflow); err != nil { return err } + err := mz.buildMaterializer() if err != nil { return err } - if err := mz.deploySchema(); err != nil { - return err - } var workflowSubType binlogdatapb.VReplicationWorkflowSubType workflowSubType, err = mz.getWorkflowSubType() @@ -133,6 +132,10 @@ func (mz *materializer) createWorkflowStreams(req *tabletmanagerdatapb.CreateVRe } req.Options = optionsJSON + if err := mz.deploySchema(); err != nil { + return err + } + return mz.forAllTargets(func(target *topo.ShardInfo) error { targetPrimary, err := mz.ts.GetTablet(mz.ctx, target.PrimaryAlias) if err != nil { @@ -268,15 +271,15 @@ func (mz *materializer) deploySchema() error { return forAllShards(mz.targetShards, func(target *topo.ShardInfo) error { allTables := []string{"/.*/"} - hasTargetTable := map[string]bool{} req := &tabletmanagerdatapb.GetSchemaRequest{Tables: allTables} targetSchema, err := schematools.GetSchema(mz.ctx, mz.ts, mz.tmc, target.PrimaryAlias, req) if err != nil { return err } + hasTargetTable := make(map[string]*tabletmanagerdatapb.TableDefinition, len(targetSchema.TableDefinitions)) for _, td := range targetSchema.TableDefinitions { - hasTargetTable[td.Name] = true + hasTargetTable[td.Name] = td } targetTablet, err := mz.ts.GetTablet(mz.ctx, target.PrimaryAlias) @@ -286,12 +289,16 @@ func (mz *materializer) deploySchema() error { var applyDDLs []string for _, ts := range mz.ms.TableSettings { - if hasTargetTable[ts.TargetTable] { - // Table already exists. + if td := hasTargetTable[ts.TargetTable]; td != nil { + // Table already exists. Let's be sure that it doesn't already have data. + if td.RowCount > 0 { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, + "target table %s exists in the %s keyspace and is not empty", td.Name, target.Keyspace()) + } continue } if ts.CreateDdl == "" { - return fmt.Errorf("target table %v does not exist and there is no create ddl defined", ts.TargetTable) + return fmt.Errorf("target table %s does not exist and there is no create ddl defined", ts.TargetTable) } var err error diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 5d64be43cb0..a7fc71072d7 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -1655,7 +1655,7 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) { t.Fatal(err) } - _, _, got, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) + _, _, got, cancelFunc, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) if tcase.err != "" { if err == nil || !strings.Contains(err.Error(), tcase.err) { t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err) @@ -1663,6 +1663,7 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) { continue } require.NoError(t, err) + require.NotNil(t, cancelFunc) utils.MustMatch(t, tcase.out, got, tcase.description) } } @@ -2177,10 +2178,11 @@ func TestCreateLookupVindexFailures(t *testing.T) { require.NoError(t, err) testcases := []struct { - description string - input *vschemapb.Keyspace - createRequest *createVReplicationWorkflowRequestResponse - err string + description string + input *vschemapb.Keyspace + createRequest *createVReplicationWorkflowRequestResponse + vrepExecQueries []string + err string }{ { description: "dup vindex", @@ -2431,7 +2433,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { err: fmt.Sprintf("table other not found in the %s keyspace", ms.TargetKeyspace), }, { - description: "workflow name already exists", + description: "workflow creation error", input: &vschemapb.Keyspace{ Vindexes: map[string]*vschemapb.Vindex{ "v2": { @@ -2452,7 +2454,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { }, }, }, - //vrQuery: "CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)", + vrepExecQueries: []string{"CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)"}, createRequest: &createVReplicationWorkflowRequestResponse{ req: nil, res: &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{}, @@ -2468,10 +2470,12 @@ func TestCreateLookupVindexFailures(t *testing.T) { Keyspace: ms.TargetKeyspace, Vindex: tcase.input, } - if tcase.createRequest != nil { - for _, tablet := range env.tablets { - if tablet.Keyspace == ms.TargetKeyspace { - env.tmc.expectVRQuery(int(tablet.Alias.Uid), "CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)", &sqltypes.Result{}) + for _, tablet := range env.tablets { + if tablet.Keyspace == ms.TargetKeyspace { + for _, vrq := range tcase.vrepExecQueries { + env.tmc.expectVRQuery(int(tablet.Alias.Uid), vrq, &sqltypes.Result{}) + } + if tcase.createRequest != nil { env.tmc.expectCreateVReplicationWorkflowRequest(tablet.Alias.Uid, tcase.createRequest) } }