Skip to content

Commit

Permalink
Improvements
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Sep 22, 2024
1 parent c62d0d2 commit c99eb2c
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 21 deletions.
4 changes: 2 additions & 2 deletions go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (tmc *testTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet
defer tmc.mu.Unlock()

if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil {
if !proto.Equal(expect.req, req) {
if expect.req != nil && !proto.Equal(expect.req, req) {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", req, expect)
}
if expect.res != nil {
Expand Down Expand Up @@ -443,7 +443,7 @@ func (tmc *testTMClient) VReplicationExec(ctx context.Context, tablet *topodatap

qrs := tmc.vrQueries[int(tablet.Alias.Uid)]
if len(qrs) == 0 {
return nil, fmt.Errorf("tablet %v does not expect any more queries: %s", tablet, query)
return nil, fmt.Errorf("tablet %v does not expect any more queries: %q", tablet, query)
}
matched := false
if qrs[0].query[0] == '/' {
Expand Down
23 changes: 15 additions & 8 deletions go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

const (
Expand Down Expand Up @@ -113,13 +114,11 @@ func (mz *materializer) createWorkflowStreams(req *tabletmanagerdatapb.CreateVRe
if err := validateNewWorkflow(mz.ctx, mz.ts, mz.tmc, mz.ms.TargetKeyspace, mz.ms.Workflow); err != nil {
return err
}

err := mz.buildMaterializer()
if err != nil {
return err
}
if err := mz.deploySchema(); err != nil {
return err
}

var workflowSubType binlogdatapb.VReplicationWorkflowSubType
workflowSubType, err = mz.getWorkflowSubType()
Expand All @@ -133,6 +132,10 @@ func (mz *materializer) createWorkflowStreams(req *tabletmanagerdatapb.CreateVRe
}
req.Options = optionsJSON

if err := mz.deploySchema(); err != nil {
return err
}

return mz.forAllTargets(func(target *topo.ShardInfo) error {
targetPrimary, err := mz.ts.GetTablet(mz.ctx, target.PrimaryAlias)
if err != nil {
Expand Down Expand Up @@ -268,15 +271,15 @@ func (mz *materializer) deploySchema() error {
return forAllShards(mz.targetShards, func(target *topo.ShardInfo) error {
allTables := []string{"/.*/"}

hasTargetTable := map[string]bool{}
req := &tabletmanagerdatapb.GetSchemaRequest{Tables: allTables}
targetSchema, err := schematools.GetSchema(mz.ctx, mz.ts, mz.tmc, target.PrimaryAlias, req)
if err != nil {
return err
}

hasTargetTable := make(map[string]*tabletmanagerdatapb.TableDefinition, len(targetSchema.TableDefinitions))
for _, td := range targetSchema.TableDefinitions {
hasTargetTable[td.Name] = true
hasTargetTable[td.Name] = td
}

targetTablet, err := mz.ts.GetTablet(mz.ctx, target.PrimaryAlias)
Expand All @@ -286,12 +289,16 @@ func (mz *materializer) deploySchema() error {

var applyDDLs []string
for _, ts := range mz.ms.TableSettings {
if hasTargetTable[ts.TargetTable] {
// Table already exists.
if td := hasTargetTable[ts.TargetTable]; td != nil {
// Table already exists. Let's be sure that it doesn't already have data.
if td.RowCount > 0 {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
"target table %s exists in the %s keyspace and is not empty", td.Name, target.Keyspace())
}
continue
}
if ts.CreateDdl == "" {
return fmt.Errorf("target table %v does not exist and there is no create ddl defined", ts.TargetTable)
return fmt.Errorf("target table %s does not exist and there is no create ddl defined", ts.TargetTable)
}

var err error
Expand Down
26 changes: 15 additions & 11 deletions go/vt/vtctl/workflow/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1655,14 +1655,15 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) {
t.Fatal(err)
}

_, _, got, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false)
_, _, got, cancelFunc, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false)
if tcase.err != "" {
if err == nil || !strings.Contains(err.Error(), tcase.err) {
t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err)
}
continue
}
require.NoError(t, err)
require.NotNil(t, cancelFunc)
utils.MustMatch(t, tcase.out, got, tcase.description)
}
}
Expand Down Expand Up @@ -2177,10 +2178,11 @@ func TestCreateLookupVindexFailures(t *testing.T) {
require.NoError(t, err)

testcases := []struct {
description string
input *vschemapb.Keyspace
createRequest *createVReplicationWorkflowRequestResponse
err string
description string
input *vschemapb.Keyspace
createRequest *createVReplicationWorkflowRequestResponse
vrepExecQueries []string
err string
}{
{
description: "dup vindex",
Expand Down Expand Up @@ -2431,7 +2433,7 @@ func TestCreateLookupVindexFailures(t *testing.T) {
err: fmt.Sprintf("table other not found in the %s keyspace", ms.TargetKeyspace),
},
{
description: "workflow name already exists",
description: "workflow creation error",
input: &vschemapb.Keyspace{
Vindexes: map[string]*vschemapb.Vindex{
"v2": {
Expand All @@ -2452,7 +2454,7 @@ func TestCreateLookupVindexFailures(t *testing.T) {
},
},
},
//vrQuery: "CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)",
vrepExecQueries: []string{"CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)"},
createRequest: &createVReplicationWorkflowRequestResponse{
req: nil,
res: &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{},
Expand All @@ -2468,10 +2470,12 @@ func TestCreateLookupVindexFailures(t *testing.T) {
Keyspace: ms.TargetKeyspace,
Vindex: tcase.input,
}
if tcase.createRequest != nil {
for _, tablet := range env.tablets {
if tablet.Keyspace == ms.TargetKeyspace {
env.tmc.expectVRQuery(int(tablet.Alias.Uid), "CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)", &sqltypes.Result{})
for _, tablet := range env.tablets {
if tablet.Keyspace == ms.TargetKeyspace {
for _, vrq := range tcase.vrepExecQueries {
env.tmc.expectVRQuery(int(tablet.Alias.Uid), vrq, &sqltypes.Result{})
}
if tcase.createRequest != nil {
env.tmc.expectCreateVReplicationWorkflowRequest(tablet.Alias.Uid, tcase.createRequest)
}
}
Expand Down

0 comments on commit c99eb2c

Please sign in to comment.