diff --git a/go/test/endtoend/vreplication/lookup_vindex_helper_test.go b/go/test/endtoend/vreplication/lookup_vindex_helper_test.go index 80677c2bdde..359ecc1d21b 100644 --- a/go/test/endtoend/vreplication/lookup_vindex_helper_test.go +++ b/go/test/endtoend/vreplication/lookup_vindex_helper_test.go @@ -66,11 +66,18 @@ func (lv *lookupVindex) create() { err := vc.VtctldClient.ExecuteCommand(args...) require.NoError(lv.t, err, "error executing LookupVindex create: %v", err) waitForWorkflowState(lv.t, vc, fmt.Sprintf("%s.%s", lv.ownerTableKeyspace, lv.name), binlogdatapb.VReplicationWorkflowState_Running.String()) - lv.expectWriteOnly(true) + lv.expectParamsAndOwner(true) } func (lv *lookupVindex) cancel() { - panic("not implemented") + args := []string{ + "LookupVindex", + "--name", lv.name, + "--table-keyspace=" + lv.ownerTableKeyspace, + "cancel", + } + err := vc.VtctldClient.ExecuteCommand(args...) + require.NoError(lv.t, err, "error executing LookupVindex complete: %v", err) } func (lv *lookupVindex) externalize() { @@ -83,7 +90,7 @@ func (lv *lookupVindex) externalize() { } err := vc.VtctldClient.ExecuteCommand(args...) require.NoError(lv.t, err, "error executing LookupVindex externalize: %v", err) - lv.expectWriteOnly(false) + lv.expectParamsAndOwner(false) } func (lv *lookupVindex) internalize() { @@ -96,20 +103,39 @@ func (lv *lookupVindex) internalize() { } err := vc.VtctldClient.ExecuteCommand(args...) require.NoError(lv.t, err, "error executing LookupVindex internalize: %v", err) - lv.expectWriteOnly(true) + lv.expectParamsAndOwner(true) +} + +func (lv *lookupVindex) complete() { + args := []string{ + "LookupVindex", + "--name", lv.name, + "--table-keyspace=" + lv.ownerTableKeyspace, + "complete", + "--keyspace=" + lv.tableKeyspace, + } + err := vc.VtctldClient.ExecuteCommand(args...) + require.NoError(lv.t, err, "error executing LookupVindex complete: %v", err) + lv.expectParamsAndOwner(false) } func (lv *lookupVindex) show() error { return nil } -func (lv *lookupVindex) expectWriteOnly(expected bool) { +func (lv *lookupVindex) expectParamsAndOwner(expectedWriteOnlyParam bool) { vschema, err := vc.VtctldClient.ExecuteCommandWithOutput("GetVSchema", lv.ownerTableKeyspace) require.NoError(lv.t, err, "error executing GetVSchema: %v", err) vdx := gjson.Get(vschema, fmt.Sprintf("vindexes.%s", lv.name)) require.NotNil(lv.t, vdx, "lookup vindex %s not found", lv.name) + + expectedOwner, expectedFrom, expectedTo := lv.ownerTable, strings.Join(lv.columns, ","), "keyspace_id" + require.Equal(lv.t, expectedOwner, vdx.Get("owner").String(), "expected 'owner' parameter to be %s", expectedOwner) + require.Equal(lv.t, expectedFrom, vdx.Get("params.from").String(), "expected 'from' parameter to be %s", expectedFrom) + require.Equal(lv.t, expectedTo, vdx.Get("params.to").String(), "expected 'to' parameter to be %s", expectedTo) + want := "" - if expected { + if expectedWriteOnlyParam { want = "true" } require.Equal(lv.t, want, vdx.Get("params.write_only").String(), "expected write_only parameter to be %s", want) diff --git a/go/test/endtoend/vreplication/lookup_vindex_test.go b/go/test/endtoend/vreplication/lookup_vindex_test.go index 08858517104..0e6810f3887 100644 --- a/go/test/endtoend/vreplication/lookup_vindex_test.go +++ b/go/test/endtoend/vreplication/lookup_vindex_test.go @@ -85,6 +85,7 @@ type lookupTestCase struct { runningQuery string postExternalizeQuery string postInternalizeQuery string + postCompleteQuery string cleanupQuery string } @@ -108,6 +109,7 @@ func TestLookupVindex(t *testing.T) { runningQuery := "insert into t1 (c1, c2, val) values (4, 4, 'val4'), (5, 5, 'val5'), (6, 6, 'val6')" postExternalizeQuery := "insert into t1 (c1, c2, val) values (7, 7, 'val7'), (8, 8, 'val8'), (9, 9, 'val9')" postInternalizeQuery := "insert into t1 (c1, c2, val) values (10, 10, 'val10'), (11, 11, 'val11'), (12, 12, 'val12')" + postCompleteQuery := "insert into t1 (c1, c2, val) values (13, 13, 'val13'), (14, 14, 'val14'), (15, 15, 'val15')" cleanupQuery := "delete from t1" testCases := []lookupTestCase{ @@ -161,10 +163,42 @@ func TestLookupVindex(t *testing.T) { tc.runningQuery = runningQuery tc.postExternalizeQuery = postExternalizeQuery tc.postInternalizeQuery = postInternalizeQuery + tc.postCompleteQuery = postCompleteQuery tc.cleanupQuery = cleanupQuery testLookupVindex(t, &tc) }) } + + for _, tc := range testCases { + // Modify the testcase name, here we cancel just after creating lookupvindex. + tc.name = tc.name + ", cancel" + t.Run(tc.name, func(t *testing.T) { + // Modify the name to avoid error on duplicate lookup vindex/table name. + tc.lv.name = tc.lv.name + "_cancel" + + tc.initQuery = initQuery + tc.runningQuery = runningQuery + tc.postCompleteQuery = postCompleteQuery + tc.cleanupQuery = cleanupQuery + testLookupVindexCancel(t, &tc) + }) + } + + for _, tc := range testCases { + // Modify the testcase name, here we cancel just after externalizing lookupvindex. + tc.name = tc.name + ", externalize and cancel" + t.Run(tc.name, func(t *testing.T) { + // Modify the name to avoid error on duplicate lookup vindex/table name. + tc.lv.name = tc.lv.name + "_externalize_cancel" + + tc.initQuery = initQuery + tc.runningQuery = runningQuery + tc.postExternalizeQuery = postExternalizeQuery + tc.postCompleteQuery = postCompleteQuery + tc.cleanupQuery = cleanupQuery + testLookupVindexCancelAfterExternalize(t, &tc) + }) + } } func testLookupVindex(t *testing.T, tc *lookupTestCase) { @@ -207,6 +241,106 @@ func testLookupVindex(t *testing.T, tc *lookupTestCase) { waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, totalRows) }) + t.Run("externalize after internalize", func(t *testing.T) { + tc.lv.externalize() + }) + + t.Run("complete", func(t *testing.T) { + tc.lv.complete() + totalRows += getNumRowsInQuery(t, tc.postCompleteQuery) + _, err := vtgateConn.ExecuteFetch(tc.postCompleteQuery, 1000, false) + require.NoError(t, err) + waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, totalRows) + }) + + t.Run("cleanup", func(t *testing.T) { + _, err := vtgateConn.ExecuteFetch(tc.cleanupQuery, 1000, false) + require.NoError(t, err) + waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, 0) + }) +} + +func testLookupVindexCancel(t *testing.T, tc *lookupTestCase) { + vtgateConn, cancel := getVTGateConn() + defer cancel() + var totalRows int + lv := tc.lv + + t.Run("init data", func(t *testing.T) { + totalRows += getNumRowsInQuery(t, tc.initQuery) + _, err := vtgateConn.ExecuteFetch(tc.initQuery, 1000, false) + require.NoError(t, err) + }) + + t.Run("create", func(t *testing.T) { + tc.lv.create() + lks := lv.tableKeyspace + vindexName := lv.name + waitForRowCount(t, vtgateConn, lks, vindexName, totalRows) + totalRows += getNumRowsInQuery(t, tc.runningQuery) + _, err := vtgateConn.ExecuteFetch(tc.runningQuery, 1000, false) + require.NoError(t, err) + waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, totalRows) + }) + + t.Run("cancel", func(t *testing.T) { + tc.lv.cancel() + // Expect true as we cancelled the LookupVindex before externalizing. + lv.expectParamsAndOwner(true) + totalRows += getNumRowsInQuery(t, tc.postCompleteQuery) + _, err := vtgateConn.ExecuteFetch(tc.postCompleteQuery, 1000, false) + require.NoError(t, err) + waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, totalRows) + }) + + t.Run("cleanup", func(t *testing.T) { + _, err := vtgateConn.ExecuteFetch(tc.cleanupQuery, 1000, false) + require.NoError(t, err) + waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, 0) + }) +} + +func testLookupVindexCancelAfterExternalize(t *testing.T, tc *lookupTestCase) { + vtgateConn, cancel := getVTGateConn() + defer cancel() + var totalRows int + lv := tc.lv + + t.Run("init data", func(t *testing.T) { + totalRows += getNumRowsInQuery(t, tc.initQuery) + _, err := vtgateConn.ExecuteFetch(tc.initQuery, 1000, false) + require.NoError(t, err) + }) + + t.Run("create", func(t *testing.T) { + tc.lv.create() + lks := lv.tableKeyspace + vindexName := lv.name + waitForRowCount(t, vtgateConn, lks, vindexName, totalRows) + totalRows += getNumRowsInQuery(t, tc.runningQuery) + _, err := vtgateConn.ExecuteFetch(tc.runningQuery, 1000, false) + require.NoError(t, err) + waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, totalRows) + }) + + t.Run("externalize", func(t *testing.T) { + tc.lv.externalize() + totalRows += getNumRowsInQuery(t, tc.postExternalizeQuery) + _, err := vtgateConn.ExecuteFetch(tc.postExternalizeQuery, 1000, false) + require.NoError(t, err) + waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, totalRows) + }) + + t.Run("cancel", func(t *testing.T) { + tc.lv.cancel() + // Expect false as we cancelled the LookupVindex after externalizing. + lv.expectParamsAndOwner(false) + totalRows += getNumRowsInQuery(t, tc.postCompleteQuery) + _, err := vtgateConn.ExecuteFetch(tc.postCompleteQuery, 1000, false) + require.NoError(t, err) + waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, totalRows) + }) + t.Run("cleanup", func(t *testing.T) { _, err := vtgateConn.ExecuteFetch(tc.cleanupQuery, 1000, false) require.NoError(t, err) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index c03e89ab248..e69d5b9d086 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -567,14 +567,9 @@ func (s *Server) LookupVindexComplete(ctx context.Context, req *vtctldatapb.Look span.Annotate("name", req.Name) span.Annotate("table_keyspace", req.TableKeyspace) - // Find the lookup vindex by name. - sourceVschema, err := s.ts.GetVSchema(ctx, req.Keyspace) + vindex, _, err := getVindexAndVSchema(ctx, s.ts, req.Keyspace, req.Name) if err != nil { - return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get vschema for the %s keyspace", req.Keyspace) - } - vindex := sourceVschema.Vindexes[req.Name] - if vindex == nil { - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "vindex %s not found in the %s keyspace", req.Name, req.Keyspace) + return nil, err } targetShards, err := s.ts.GetServingShards(ctx, req.TableKeyspace) @@ -693,14 +688,9 @@ func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.L span.Annotate("name", req.Name) span.Annotate("table_keyspace", req.TableKeyspace) - // Find the lookup vindex by name. - sourceVschema, err := s.ts.GetVSchema(ctx, req.Keyspace) + vindex, sourceVSchema, err := getVindexAndVSchema(ctx, s.ts, req.Keyspace, req.Name) if err != nil { - return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get vschema for the %s keyspace", req.Keyspace) - } - vindex := sourceVschema.Vindexes[req.Name] - if vindex == nil { - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "vindex %s not found in the %s keyspace", req.Name, req.Keyspace) + return nil, err } targetShards, err := s.ts.GetServingShards(ctx, req.TableKeyspace) @@ -778,7 +768,7 @@ func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.L // Remove the write_only param and save the source vschema. delete(vindex.Params, "write_only") - if err := s.ts.SaveVSchema(ctx, req.Keyspace, sourceVschema); err != nil { + if err := s.ts.SaveVSchema(ctx, req.Keyspace, sourceVSchema); err != nil { return nil, err } return resp, s.ts.RebuildSrvVSchema(ctx, nil) @@ -794,14 +784,9 @@ func (s *Server) LookupVindexInternalize(ctx context.Context, req *vtctldatapb.L span.Annotate("name", req.Name) span.Annotate("table_keyspace", req.TableKeyspace) - // Find the lookup vindex by name. - sourceVschema, err := s.ts.GetVSchema(ctx, req.Keyspace) + vindex, sourceVSchema, err := getVindexAndVSchema(ctx, s.ts, req.Keyspace, req.Name) if err != nil { - return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get vschema for the %s keyspace", req.Keyspace) - } - vindex := sourceVschema.Vindexes[req.Name] - if vindex == nil { - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "vindex %s not found in the %s keyspace", req.Name, req.Keyspace) + return nil, err } targetShards, err := s.ts.GetServingShards(ctx, req.TableKeyspace) @@ -865,7 +850,7 @@ func (s *Server) LookupVindexInternalize(ctx context.Context, req *vtctldatapb.L // Make the vindex back to write_only and save the source vschema. vindex.Params["write_only"] = "true" - if err := s.ts.SaveVSchema(ctx, req.Keyspace, sourceVschema); err != nil { + if err := s.ts.SaveVSchema(ctx, req.Keyspace, sourceVSchema); err != nil { return nil, err } return resp, s.ts.RebuildSrvVSchema(ctx, nil) @@ -1013,7 +998,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl return nil, err } if len(tables) > 0 { - err = validateSourceTablesExist(ctx, sourceKeyspace, ksTables, tables) + err = validateSourceTablesExist(sourceKeyspace, ksTables, tables) if err != nil { return nil, err } @@ -1025,7 +1010,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl } } if len(req.ExcludeTables) > 0 { - err = validateSourceTablesExist(ctx, sourceKeyspace, ksTables, req.ExcludeTables) + err = validateSourceTablesExist(sourceKeyspace, ksTables, req.ExcludeTables) if err != nil { return nil, err } diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index 65fa49fde86..593cdeda46f 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -51,6 +51,7 @@ import ( querypb "vitess.io/vitess/go/vt/proto/query" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) @@ -1028,7 +1029,7 @@ func applyTargetShards(ts *trafficSwitcher, targetShards []string) error { // validateSourceTablesExist validates that tables provided are present // in the source keyspace. -func validateSourceTablesExist(ctx context.Context, sourceKeyspace string, ksTables, tables []string) error { +func validateSourceTablesExist(sourceKeyspace string, ksTables, tables []string) error { var missingTables []string for _, table := range tables { if schema.IsInternalOperationTableName(table) { @@ -1051,3 +1052,17 @@ func validateSourceTablesExist(ctx context.Context, sourceKeyspace string, ksTab } return nil } + +// getVindexAndVSchema gets the vindex (from VSchema) and VSchema with the +// provided vindex name and keyspace. +func getVindexAndVSchema(ctx context.Context, ts *topo.Server, keyspace string, vindexName string) (*vschemapb.Vindex, *vschemapb.Keyspace, error) { + vschema, err := ts.GetVSchema(ctx, keyspace) + if err != nil { + return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get vschema for the %s keyspace", keyspace) + } + vindex := vschema.Vindexes[vindexName] + if vindex == nil { + return nil, nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "vindex %s not found in the %s keyspace", vindexName, keyspace) + } + return vindex, vschema, nil +} diff --git a/go/vt/vtctl/workflow/utils_test.go b/go/vt/vtctl/workflow/utils_test.go index 8458cf60995..eecbfd6269b 100644 --- a/go/vt/vtctl/workflow/utils_test.go +++ b/go/vt/vtctl/workflow/utils_test.go @@ -247,7 +247,6 @@ func startEtcd(t *testing.T) string { } func TestValidateSourceTablesExist(t *testing.T) { - ctx := context.Background() ks := "source_keyspace" ksTables := []string{"table1", "table2"} @@ -272,7 +271,7 @@ func TestValidateSourceTablesExist(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - err := validateSourceTablesExist(ctx, ks, ksTables, tc.tables) + err := validateSourceTablesExist(ks, ksTables, tc.tables) if tc.errContains != "" { assert.ErrorContains(t, err, tc.errContains) } else {