From c62d0d27ff5d27cf1b26df57d6caaf1113681125 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 20 Sep 2024 17:43:45 -0400 Subject: [PATCH] Add unit test Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/framework_test.go | 26 ++++- go/vt/vtctl/workflow/materializer_env_test.go | 39 +++++-- go/vt/vtctl/workflow/materializer_test.go | 103 +++++++++++++++--- go/vt/vtctl/workflow/server.go | 7 +- 4 files changed, 142 insertions(+), 33 deletions(-) diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go index f3bf5869ab1..bb18ba0cff1 100644 --- a/go/vt/vtctl/workflow/framework_test.go +++ b/go/vt/vtctl/workflow/framework_test.go @@ -267,7 +267,7 @@ type testTMClient struct { mu sync.Mutex vrQueries map[int][]*queryResult - createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest + createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse readVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest primaryPositions map[uint32]string vdiffRequests map[uint32]*vdiffRequestResponse @@ -289,7 +289,7 @@ func newTestTMClient(env *testEnv) *testTMClient { return &testTMClient{ schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition), vrQueries: make(map[int][]*queryResult), - createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest), + createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse), readVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest), readVReplicationWorkflowsResponses: make(map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse), primaryPositions: make(map[uint32]string), @@ -304,9 +304,12 @@ func (tmc *testTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet defer tmc.mu.Unlock() if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil { - if !proto.Equal(expect, req) { + if !proto.Equal(expect.req, req) { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", req, expect) } + if expect.res != nil { + return expect.res, expect.err + } } res := sqltypes.MakeTestResult(sqltypes.MakeTestFields("rowsaffected", "int64"), "1") return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil @@ -418,13 +421,22 @@ func (tmc *testTMClient) expectVRQueryResultOnKeyspaceTablets(keyspace string, q } } -func (tmc *testTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) { +func (tmc *testTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *createVReplicationWorkflowRequestResponse) { tmc.mu.Lock() defer tmc.mu.Unlock() tmc.createVReplicationWorkflowRequests[tabletID] = req } +func (tmc *testTMClient) expectCreateVReplicationWorkflowRequestOnTargetTablets(req *createVReplicationWorkflowRequestResponse) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + for _, tablet := range tmc.env.tablets[tmc.env.targetKeyspace.KeyspaceName] { + tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid] = req + } +} + func (tmc *testTMClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) { tmc.mu.Lock() defer tmc.mu.Unlock() @@ -479,6 +491,12 @@ type vdiffRequestResponse struct { err error } +type createVReplicationWorkflowRequestResponse struct { + req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest + res *tabletmanagerdatapb.CreateVReplicationWorkflowResponse + err error +} + func (tmc *testTMClient) expectVDiffRequest(tablet *topodatapb.Tablet, vrr *vdiffRequestResponse) { tmc.mu.Lock() defer tmc.mu.Unlock() diff --git a/go/vt/vtctl/workflow/materializer_env_test.go b/go/vt/vtctl/workflow/materializer_env_test.go index aada59c244d..349450a6a04 100644 --- a/go/vt/vtctl/workflow/materializer_env_test.go +++ b/go/vt/vtctl/workflow/materializer_env_test.go @@ -33,6 +33,7 @@ import ( "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/tmutils" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topotools" @@ -120,16 +121,33 @@ func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.M if err == nil { tableName = table.Name.String() } + stmt, err := env.venv.Parser().ParseStrictDDL(ts.CreateDdl) + require.NoError(t, err) + ddl, ok := stmt.(*sqlparser.CreateTable) + require.True(t, ok) + cols := make([]string, len(ddl.TableSpec.Columns)) + fields := make([]*querypb.Field, len(ddl.TableSpec.Columns)) + for i, col := range ddl.TableSpec.Columns { + cols[i] = col.Name.String() + fields[i] = &querypb.Field{ + Name: col.Name.String(), + Type: col.Type.SQLType(), + } + } env.tmc.schema[ms.SourceKeyspace+"."+tableName] = &tabletmanagerdatapb.SchemaDefinition{ TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ - Name: tableName, - Schema: fmt.Sprintf("%s_schema", tableName), + Name: tableName, + Schema: ts.CreateDdl, + Columns: cols, + Fields: fields, }}, } env.tmc.schema[ms.TargetKeyspace+"."+ts.TargetTable] = &tabletmanagerdatapb.SchemaDefinition{ TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ - Name: ts.TargetTable, - Schema: fmt.Sprintf("%s_schema", ts.TargetTable), + Name: ts.TargetTable, + Schema: ts.CreateDdl, + Columns: cols, + Fields: fields, }}, } } @@ -199,7 +217,7 @@ type testMaterializerTMClient struct { mu sync.Mutex vrQueries map[int][]*queryResult - createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest + createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse // Used to confirm the number of times WorkflowDelete was called. workflowDeleteCalls int @@ -215,15 +233,18 @@ func newTestMaterializerTMClient(keyspace string, sourceShards []string, tableSe sourceShards: sourceShards, tableSettings: tableSettings, vrQueries: make(map[int][]*queryResult), - createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest), + createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse), } } func (tmc *testMaterializerTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) { if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil { - if !proto.Equal(expect, request) { + if expect.req != nil && !proto.Equal(expect.req, request) { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", request, expect) } + if expect.res != nil { + return expect.res, expect.err + } } res := sqltypes.MakeTestResult(sqltypes.MakeTestFields("rowsaffected", "int64"), "1") return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil @@ -315,7 +336,7 @@ func (tmc *testMaterializerTMClient) expectVRQuery(tabletID int, query string, r }) } -func (tmc *testMaterializerTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) { +func (tmc *testMaterializerTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *createVReplicationWorkflowRequestResponse) { tmc.mu.Lock() defer tmc.mu.Unlock() @@ -344,7 +365,7 @@ func (tmc *testMaterializerTMClient) VReplicationExec(ctx context.Context, table qrs := tmc.vrQueries[int(tablet.Alias.Uid)] if len(qrs) == 0 { - return nil, fmt.Errorf("tablet %v does not expect any more queries: %s", tablet, query) + return nil, fmt.Errorf("tablet %v does not expect any more queries: %q", tablet, query) } matched := false if qrs[0].query[0] == '/' { diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 84119735dee..5d64be43cb0 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -18,6 +18,7 @@ package workflow import ( "context" + "errors" "fmt" "slices" "strings" @@ -2111,6 +2112,16 @@ func TestCreateLookupVindexFailures(t *testing.T) { SourceKeyspace: "sourceks", // Keyspace where the lookup table and VReplication workflow is created. TargetKeyspace: "targetks", + TableSettings: []*vtctldatapb.TableMaterializeSettings{ + { + TargetTable: "t1", + CreateDdl: "CREATE TABLE `t1` (\n`c1` INT,\n PRIMARY KEY(`c1`)\n)", + }, + { + TargetTable: "t2", + CreateDdl: "CREATE TABLE `t2` (\n`c2` INT,\n PRIMARY KEY(`c2`)\n)", + }, + }, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -2122,7 +2133,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "v": { Type: "lookup_unique", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1", "to": "c2", }, @@ -2135,10 +2146,10 @@ func TestCreateLookupVindexFailures(t *testing.T) { "xxhash": { Type: "xxhash", }, - "v": { + "v1": { Type: "lookup_unique", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1", "to": "c2", "write_only": "true", @@ -2148,19 +2159,28 @@ func TestCreateLookupVindexFailures(t *testing.T) { Tables: map[string]*vschemapb.Table{ "t1": { ColumnVindexes: []*vschemapb.ColumnVindex{{ - Name: "v", + Name: "v1", Column: "c1", }}, }, + "t2": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v2", + Column: "c2", + }}, + }, }, } - err := env.topoServ.SaveVSchema(ctx, ms.TargetKeyspace, vs) + err := env.topoServ.SaveVSchema(ctx, ms.SourceKeyspace, vs) + require.NoError(t, err) + err = env.topoServ.SaveVSchema(ctx, ms.TargetKeyspace, vs) require.NoError(t, err) testcases := []struct { - description string - input *vschemapb.Keyspace - err string + description string + input *vschemapb.Keyspace + createRequest *createVReplicationWorkflowRequestResponse + err string }{ { description: "dup vindex", @@ -2208,7 +2228,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "v": { Type: "lookup_unique", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1,c2", "to": "c3", }, @@ -2224,7 +2244,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "v": { Type: "lookup", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1", "to": "c2", }, @@ -2240,7 +2260,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "v": { Type: "lookup_noexist", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1,c2", "to": "c2", }, @@ -2264,7 +2284,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "v": { Type: "lookup_unique", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1", "to": "c2", }, @@ -2324,7 +2344,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "v": { Type: "lookup_unique", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1", "to": "c2", }, @@ -2377,7 +2397,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "xxhash": { Type: "lookup_unique", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1", "to": "c2", }, @@ -2393,7 +2413,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { }, }, }, - err: "a conflicting vindex named xxhash already exists in the targetks keyspace", + err: fmt.Sprintf("a conflicting vindex named xxhash already exists in the %s keyspace", ms.TargetKeyspace), }, { description: "source table not in vschema", @@ -2408,7 +2428,37 @@ func TestCreateLookupVindexFailures(t *testing.T) { }, }, }, - err: "table other not found in the targetks keyspace", + err: fmt.Sprintf("table other not found in the %s keyspace", ms.TargetKeyspace), + }, + { + description: "workflow name already exists", + input: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v2": { + Type: "consistent_lookup_unique", + Params: map[string]string{ + "table": fmt.Sprintf("%s.t1_lkp", ms.TargetKeyspace), + "from": "c1", + "to": "keyspace_id", + }, + }, + }, + Tables: map[string]*vschemapb.Table{ + "t2": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v2", + Column: "c2", + }}, + }, + }, + }, + //vrQuery: "CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)", + createRequest: &createVReplicationWorkflowRequestResponse{ + req: nil, + res: &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{}, + err: errors.New("we gots us an error"), + }, + err: "we gots us an error", }, } for _, tcase := range testcases { @@ -2418,10 +2468,26 @@ func TestCreateLookupVindexFailures(t *testing.T) { Keyspace: ms.TargetKeyspace, Vindex: tcase.input, } + if tcase.createRequest != nil { + for _, tablet := range env.tablets { + if tablet.Keyspace == ms.TargetKeyspace { + env.tmc.expectVRQuery(int(tablet.Alias.Uid), "CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)", &sqltypes.Result{}) + env.tmc.expectCreateVReplicationWorkflowRequest(tablet.Alias.Uid, tcase.createRequest) + } + } + } _, err := env.ws.LookupVindexCreate(ctx, req) if !strings.Contains(err.Error(), tcase.err) { t.Errorf("CreateLookupVindex(%s) err: %v, must contain %v", tcase.description, err, tcase.err) } + // Confirm that the original vschema where the vindex would + // be created is still in place -- since the workflow + // creation failed in each test case. That vindex is created + // in the source keyspace based on the MaterializeSettings + // definition. + cvs, err := env.ws.ts.GetVSchema(ctx, ms.TargetKeyspace) + require.NoError(t, err) + require.True(t, proto.Equal(vs, cvs), "expected: %+v, got: %+v", vs, cvs) }) } } @@ -2696,7 +2762,10 @@ func TestKeyRangesEqualOptimization(t *testing.T) { if len(tc.moveTablesReq.SourceShards) > 0 && !slices.Contains(tc.moveTablesReq.SourceShards, tablet.Shard) { continue } - env.tmc.expectCreateVReplicationWorkflowRequest(tablet.Alias.Uid, tc.wantReqs[tablet.Alias.Uid]) + reqRes := &createVReplicationWorkflowRequestResponse{ + req: tc.wantReqs[tablet.Alias.Uid], + } + env.tmc.expectCreateVReplicationWorkflowRequest(tablet.Alias.Uid, reqRes) } mz := &materializer{ diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index d35b281dba6..ee345be8137 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3967,8 +3967,8 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str // Validate input table and vindex consistency. if sourceTable == nil || len(sourceTable.ColumnVindexes) != 1 { return nil, nil, nil, nil, - vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No ColumnVindex found for the owner table in the %s keyspace", - keyspace) + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No ColumnVindex found for the owner table (%s) in the %s keyspace", + sourceTable, keyspace) } if sourceTable.ColumnVindexes[0].Name != vindexName { return nil, nil, nil, nil, @@ -4146,9 +4146,10 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } materializeQuery = buf.String() + tc := targetVSchema.CloneVT() cancelFunc = func() error { // Restore the original target vschema. - return s.ts.SaveVSchema(ctx, targetKeyspace, targetVSchema) + return s.ts.SaveVSchema(ctx, targetKeyspace, tc) } // Update targetVSchema.