diff --git a/go/test/endtoend/throttler/util.go b/go/test/endtoend/throttler/util.go index 40cfdb53118..602f8622a3b 100644 --- a/go/test/endtoend/throttler/util.go +++ b/go/test/endtoend/throttler/util.go @@ -34,9 +34,10 @@ import ( "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/log" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) type Config struct { diff --git a/go/test/endtoend/vreplication/config_test.go b/go/test/endtoend/vreplication/config_test.go index c26c4edc2ff..843fa364985 100644 --- a/go/test/endtoend/vreplication/config_test.go +++ b/go/test/endtoend/vreplication/config_test.go @@ -144,6 +144,22 @@ create table nopk (name varchar(128), age int unsigned); "sequence": "customer_seq" } }, + "customer_name": { + "column_vindexes": [ + { + "column": "cid", + "name": "xxhash" + } + ] + }, + "customer_type": { + "column_vindexes": [ + { + "column": "cid", + "name": "xxhash" + } + ] + }, "customer2": { "column_vindexes": [ { @@ -401,6 +417,32 @@ create table nopk (name varchar(128), age int unsigned); "create_ddl": "create table cproduct(pid bigint, description varchar(128), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key(pid)) CHARSET=utf8mb4" }] } +` + + materializeCustomerNameSpec = ` +{ + "workflow": "customer_name", + "source_keyspace": "customer", + "target_keyspace": "customer", + "table_settings": [{ + "target_table": "customer_name", + "source_expression": "select cid, name from customer", + "create_ddl": "create table if not exists customer_name (cid bigint not null, name varchar(128), primary key(cid), key(name))" + }] +} +` + + materializeCustomerTypeSpec = ` +{ + "workflow": "customer_type", + "source_keyspace": "customer", + "target_keyspace": "customer", + "table_settings": [{ + "target_table": "customer_type", + "source_expression": "select cid, typ from customer", + "create_ddl": "create table if not exists customer_type (cid bigint not null, typ enum('individual','soho','enterprise'), primary key(cid), key(typ))" + }] +} ` merchantOrdersVSchema = ` diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 7b8c7d5f59b..0491b2edd9d 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -17,11 +17,13 @@ limitations under the License. package vreplication import ( + "context" "encoding/json" "fmt" "net" "strconv" "strings" + "sync" "testing" "time" @@ -30,6 +32,7 @@ import ( "google.golang.org/protobuf/encoding/protojson" "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/throttler" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/wrangler" @@ -335,8 +338,8 @@ func validateReadsRoute(t *testing.T, tabletTypes string, tablet *cluster.Vttabl for _, tt := range []string{"replica", "rdonly"} { destination := fmt.Sprintf("%s:%s@%s", tablet.Keyspace, tablet.Shard, tt) if strings.Contains(tabletTypes, tt) { - readQuery := "select * from customer" - assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, readQuery) + readQuery := "select cid from customer limit 10" + assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, "select cid from customer limit :vtg1") } } } @@ -355,7 +358,7 @@ func validateWritesRouteToSource(t *testing.T) { insertQuery := "insert into customer(name, cid) values('tempCustomer2', 200)" matchInsertQuery := "insert into customer(`name`, cid) values" assertQueryExecutesOnTablet(t, vtgateConn, sourceTab, "customer", insertQuery, matchInsertQuery) - execVtgateQuery(t, vtgateConn, "customer", "delete from customer where cid > 100") + execVtgateQuery(t, vtgateConn, "customer", "delete from customer where cid = 200") } func validateWritesRouteToTarget(t *testing.T) { @@ -366,7 +369,7 @@ func validateWritesRouteToTarget(t *testing.T) { assertQueryExecutesOnTablet(t, vtgateConn, targetTab2, "customer", insertQuery, matchInsertQuery) insertQuery = "insert into customer(name, cid) values('tempCustomer3', 102)" assertQueryExecutesOnTablet(t, vtgateConn, targetTab1, "customer", insertQuery, matchInsertQuery) - execVtgateQuery(t, vtgateConn, "customer", "delete from customer where cid > 100") + execVtgateQuery(t, vtgateConn, "customer", "delete from customer where cid in (101, 102)") } func revert(t *testing.T, workflowType string) { @@ -534,6 +537,31 @@ func testReshardV2Workflow(t *testing.T) { defer closeConn() currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard + // Generate customer records in the background for the rest of the test + // in order to confirm that no writes are lost in either the customer + // table or the customer_name and customer_type materializations + // against it during the Reshard and all of the traffic switches. + dataGenCtx, dataGenCancel := context.WithCancel(context.Background()) + defer dataGenCancel() + dataGenConn, dataGenCloseConn := getVTGateConn() + defer dataGenCloseConn() + dataGenWg := sync.WaitGroup{} + dataGenWg.Add(1) + go func() { + defer dataGenWg.Done() + id := 1000 + for { + select { + case <-dataGenCtx.Done(): + return + default: + _ = execVtgateQuery(t, dataGenConn, "customer", fmt.Sprintf("insert into customer (cid, name) values (%d, 'tempCustomer%d')", id, id)) + } + time.Sleep(1 * time.Millisecond) + id++ + } + }() + // create internal tables on the original customer shards that should be // ignored and not show up on the new shards execMultipleQueries(t, vtgateConn, targetKs+"/-80", internalSchema) @@ -553,6 +581,27 @@ func testReshardV2Workflow(t *testing.T) { testWorkflowUpdate(t) testRestOfWorkflow(t) + + // Confirm that we lost no customer related writes during the Reshard. + dataGenCancel() + dataGenWg.Wait() + cres := execVtgateQuery(t, dataGenConn, "customer", "select count(*) from customer") + require.Len(t, cres.Rows, 1) + waitForNoWorkflowLag(t, vc, "customer", "customer_name") + cnres := execVtgateQuery(t, dataGenConn, "customer", "select count(*) from customer_name") + require.Len(t, cnres.Rows, 1) + require.EqualValues(t, cres.Rows, cnres.Rows) + waitForNoWorkflowLag(t, vc, "customer", "customer_type") + ctres := execVtgateQuery(t, dataGenConn, "customer", "select count(*) from customer_type") + require.Len(t, ctres.Rows, 1) + require.EqualValues(t, cres.Rows, ctres.Rows) + if debugMode { + t.Logf("Done inserting customer data. Record counts in customer: %s, customer_name: %s, customer_type: %s", + cres.Rows[0][0].ToString(), cnres.Rows[0][0].ToString(), ctres.Rows[0][0].ToString()) + } + // We also do a vdiff on the materialize workflows for good measure. + doVtctldclientVDiff(t, "customer", "customer_name", "", nil) + doVtctldclientVDiff(t, "customer", "customer_type", "", nil) } func testMoveTablesV2Workflow(t *testing.T) { @@ -560,25 +609,17 @@ func testMoveTablesV2Workflow(t *testing.T) { defer closeConn() currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables - // test basic forward and reverse flows - setupCustomerKeyspace(t) - // The purge table should get skipped/ignored - // If it's not then we'll get an error as the table doesn't exist in the vschema - createMoveTablesWorkflow(t, "customer,loadtest,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431") - waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) - validateReadsRouteToSource(t, "replica") - validateWritesRouteToSource(t) - - // Verify that we've properly ignored any internal operational tables - // and that they were not copied to the new target keyspace - verifyNoInternalTables(t, vtgateConn, targetKs) - - testReplicatingWithPKEnumCols(t) - - // Confirm that updating MoveTable workflows works. - testWorkflowUpdate(t) + materializeShow := func() { + if !debugMode { + return + } + output, err := vc.VtctldClient.ExecuteCommandWithOutput("materialize", "--target-keyspace=customer", "show", "--workflow=customer_name", "--compact", "--include-logs=false") + require.NoError(t, err) + t.Logf("Materialize show output: %s", output) + } - testRestOfWorkflow(t) + // Test basic forward and reverse flows. + setupCustomerKeyspace(t) listOutputContainsWorkflow := func(output string, workflow string) bool { workflows := []string{} @@ -597,12 +638,39 @@ func testMoveTablesV2Workflow(t *testing.T) { require.NoError(t, err) return len(workflows) == 0 } - listAllArgs := []string{"workflow", "--keyspace", "customer", "list"} + output, err := vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...) require.NoError(t, err) require.True(t, listOutputIsEmpty(output)) + // The purge table should get skipped/ignored + // If it's not then we'll get an error as the table doesn't exist in the vschema + createMoveTablesWorkflow(t, "customer,loadtest,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431") + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) + validateReadsRouteToSource(t, "replica") + validateWritesRouteToSource(t) + + // Verify that we've properly ignored any internal operational tables + // and that they were not copied to the new target keyspace + verifyNoInternalTables(t, vtgateConn, targetKs) + + testReplicatingWithPKEnumCols(t) + + // Confirm that updating MoveTable workflows works. + testWorkflowUpdate(t) + + testRestOfWorkflow(t) + // Create our primary intra-keyspace materialization. + materialize(t, materializeCustomerNameSpec, false) + // Create a second one to confirm that multiple ones get migrated correctly. + materialize(t, materializeCustomerTypeSpec, false) + materializeShow() + + output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...) + require.NoError(t, err) + require.True(t, listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "customer_type") && !listOutputContainsWorkflow(output, "wf1")) + testVSchemaForSequenceAfterMoveTables(t) // Confirm that the auto_increment clause on customer.cid was removed. @@ -616,14 +684,14 @@ func testMoveTablesV2Workflow(t *testing.T) { createMoveTablesWorkflow(t, "Lead,Lead-1") output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...) require.NoError(t, err) - require.True(t, listOutputContainsWorkflow(output, "wf1")) + require.True(t, listOutputContainsWorkflow(output, "wf1") && listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "customer_type")) err = tstWorkflowCancel(t) require.NoError(t, err) output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...) require.NoError(t, err) - require.True(t, listOutputIsEmpty(output)) + require.True(t, listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "customer_type") && !listOutputContainsWorkflow(output, "wf1")) } func testPartialSwitches(t *testing.T) { @@ -671,6 +739,11 @@ func testPartialSwitches(t *testing.T) { } func testRestOfWorkflow(t *testing.T) { + // Relax the throttler so that it does not cause switches to fail because it can block + // the catchup for the intra-keyspace materialization. + res, err := throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, "customer", true, false, throttlerConfig.Threshold*5, throttlerConfig.Query, nil) + require.NoError(t, err, res) + testPartialSwitches(t) // test basic forward and reverse flows @@ -732,12 +805,14 @@ func testRestOfWorkflow(t *testing.T) { validateWritesRouteToSource(t) // trying to complete an unswitched workflow should error - err := tstWorkflowComplete(t) + err = tstWorkflowComplete(t) require.Error(t, err) require.Contains(t, err.Error(), wrangler.ErrWorkflowNotFullySwitched) // fully switch and complete waitForLowLag(t, "customer", "wf1") + waitForLowLag(t, "customer", "customer_name") + waitForLowLag(t, "customer", "customer_type") tstWorkflowSwitchReadsAndWrites(t) validateReadsRoute(t, "rdonly", targetRdonlyTab1) validateReadsRouteToTarget(t, "replica") diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 4197269feb6..71bacd57c51 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3302,8 +3302,27 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return 0, sw.logs(), nil } + // We stop writes on the source before stopping the source streams so that the catchup time + // is lessened and other workflows that we have to migrate such as intra-keyspace materialize + // workflows also have a chance to catch up as well because those are internally generated + // GTIDs within the shards we're switching traffic away from. + // For intra-keyspace materialization streams that we migrate where the source and target are + // the keyspace being resharded, we wait for those to catchup in the stopStreams path before + // we actually stop them. + ts.Logger().Infof("Stopping source writes") + if err := sw.stopSourceWrites(ctx); err != nil { + sw.cancelMigration(ctx, sm) + return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err) + } + ts.Logger().Infof("Stopping streams") - sourceWorkflows, err = sw.stopStreams(ctx, sm) + // Use a shorter context for this since since when doing a Reshard, if there are intra-keyspace + // materializations then we have to wait for them to catchup before switching traffic for the + // Reshard workflow. We use the the same timeout value here that is used for VReplication catchup + // with the inter-keyspace workflows. + stopCtx, stopCancel := context.WithTimeout(ctx, timeout) + defer stopCancel() + sourceWorkflows, err = sw.stopStreams(stopCtx, sm) if err != nil { for key, streams := range sm.Streams() { for _, stream := range streams { @@ -3311,13 +3330,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } } sw.cancelMigration(ctx, sm) - return handleError("failed to stop the workflow streams", err) - } - - ts.Logger().Infof("Stopping source writes") - if err := sw.stopSourceWrites(ctx); err != nil { - sw.cancelMigration(ctx, sm) - return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err) + return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err) } if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { diff --git a/go/vt/vtctl/workflow/stream_migrator.go b/go/vt/vtctl/workflow/stream_migrator.go index 06cee856319..b294ba1fcd0 100644 --- a/go/vt/vtctl/workflow/stream_migrator.go +++ b/go/vt/vtctl/workflow/stream_migrator.go @@ -18,27 +18,35 @@ package workflow import ( "context" + "errors" "fmt" + "sort" "strings" "sync" "text/template" + "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/encoding/prototext" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) /* @@ -249,7 +257,7 @@ func (sm *StreamMigrator) LegacyStopStreams(ctx context.Context) ([]string, erro return sm.legacyVerifyStreamPositions(ctx, positions) } -// StopStreams stops streams +// StopStreams stops streams. func (sm *StreamMigrator) StopStreams(ctx context.Context) ([]string, error) { if sm.streams == nil { return nil, nil @@ -678,11 +686,63 @@ func (sm *StreamMigrator) stopSourceStreams(ctx context.Context) error { ) err := sm.ts.ForAllSources(func(source *MigrationSource) error { - tabletStreams := sm.streams[source.GetShard().ShardName()] + shard := source.GetShard().ShardName() + tabletStreams := sm.streams[shard] if len(tabletStreams) == 0 { return nil } + // For intra-keyspace materialize workflows where the source and target are both + // the keyspace that is being resharded, we need to wait for those to catchup as + // well. New writes have already been blocked on the source, but the materialization + // workflow(s) still need to catchup with writes that happened just before writes + // were stopped on the source. + eg, egCtx := errgroup.WithContext(ctx) + for _, vrs := range tabletStreams { + if vrs.WorkflowType == binlogdatapb.VReplicationWorkflowType_Materialize && vrs.BinlogSource.Keyspace == sm.ts.TargetKeyspaceName() { + if vrs.BinlogSource == nil { // Should never happen + return fmt.Errorf("no binlog source is defined for materialization workflow %s", vrs.Workflow) + } + eg.Go(func() error { + sourceTablet := source.primary.Tablet.CloneVT() + if sourceTablet.Shard != vrs.BinlogSource.Shard { + si, err := sm.ts.TopoServer().GetTabletMapForShard(egCtx, vrs.BinlogSource.GetKeyspace(), vrs.BinlogSource.GetShard()) + if err != nil { + return err + } + for _, tablet := range si { + if tablet.GetType() == topodatapb.TabletType_PRIMARY { + sourceTablet = tablet.CloneVT() + break + } + } + } + if sourceTablet == nil { + return fmt.Errorf("no primary tablet found for materialization workflow %s and its stream from the binary log source %s/%s", + vrs.Workflow, vrs.BinlogSource.GetKeyspace(), vrs.BinlogSource.GetShard()) + } + pos, err := sm.ts.TabletManagerClient().PrimaryPosition(egCtx, sourceTablet) + if err != nil { + return err + } + sm.ts.Logger().Infof("Waiting for intra-keyspace materialization workflow %s on %v/%v to reach position %v for stream source from %s/%s, starting from position %s on tablet %s", + vrs.Workflow, source.primary.Keyspace, source.primary.Shard, pos, vrs.BinlogSource.Keyspace, vrs.BinlogSource.Shard, vrs.Position, topoproto.TabletAliasString(source.primary.Tablet.Alias)) + if err := sm.ts.TabletManagerClient().VReplicationWaitForPos(egCtx, source.primary.Tablet, vrs.ID, pos); err != nil { + return err + } + return nil + }) + } + } + if err := eg.Wait(); err != nil { + var xtra string + if errors.Is(err, context.DeadlineExceeded) { + xtra = " (increase the --timeout value if needed)" + } + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "error waiting for intra-keyspace materialization workflow %s to catch up%s: %v", + tabletStreams[0].Workflow, xtra, err) + } + query := fmt.Sprintf("update _vt.vreplication set state='Stopped', message='for cutover' where id in %s", VReplicationStreams(tabletStreams).Values()) _, err := sm.ts.TabletManagerClient().VReplicationExec(ctx, source.GetPrimary().Tablet, query) if err != nil { @@ -753,8 +813,19 @@ func (sm *StreamMigrator) syncSourceStreams(ctx context.Context) (map[string]rep allErrors.RecordError(err) return } - - query := fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos='%s', message='synchronizing for cutover' where id=%d", replication.EncodePosition(pos), vrs.ID) + comment := "" + if vrs.WorkflowType == binlogdatapb.VReplicationWorkflowType_Materialize && vrs.BinlogSource.Keyspace == sm.ts.TargetKeyspaceName() { + // For intra-keyspace materializations in a keyspace that's being + // resharded, we don't have serving tablets on the workflow's current + // target side. So we instruct the VReplication engine and controller + // on the target tablets to include non-serving tablets in their + // search for source tablets to stream from as we migrate and setup + // these intra-keyspace materializations on the current target side + // that we're preparing to switch traffic to. + comment = fmt.Sprintf("/*vt+ %s=1 */ ", vreplication.IncludeNonServingTabletsCommentDirective) + } + query := fmt.Sprintf("update %s_vt.vreplication set state='Running', stop_pos='%s', message='synchronizing for cutover' where id=%d", + comment, replication.EncodePosition(pos), vrs.ID) if _, err := sm.ts.TabletManagerClient().VReplicationExec(ctx, primary.Tablet, query); err != nil { allErrors.RecordError(err) return @@ -915,13 +986,14 @@ func (sm *StreamMigrator) createTargetStreams(ctx context.Context, tmpl []*VRepl return sm.ts.ForAllTargets(func(target *MigrationTarget) error { ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, target.GetPrimary().DbName()) tabletStreams := VReplicationStreams(tmpl).Copy().ToSlice() + var err error - for _, vrs := range tabletStreams { + addStreamRow := func(vrs *VReplicationStream) error { for _, rule := range vrs.BinlogSource.Filter.Rules { buf := &strings.Builder{} t := template.Must(template.New("").Parse(rule.Filter)) - if err := t.Execute(buf, key.KeyRangeString(target.GetShard().KeyRange)); err != nil { + if err := t.Execute(buf, key.KeyRangeString(target.GetShard().GetKeyRange())); err != nil { return err } @@ -930,9 +1002,53 @@ func (sm *StreamMigrator) createTargetStreams(ctx context.Context, tmpl []*VRepl ig.AddRow(vrs.Workflow, vrs.BinlogSource, replication.EncodePosition(vrs.Position), "", "", vrs.WorkflowType, vrs.WorkflowSubType, vrs.DeferSecondaryKeys) + return nil + } + + var intraKeyspaceStreams map[string]bool + + for _, vrs := range tabletStreams { + // If we have an intra-keyspace materialization workflow, we need to + // create the streams from each target shard to each target shard + // rather than simply copying the streams from the source shards. + if vrs.WorkflowType == binlogdatapb.VReplicationWorkflowType_Materialize && vrs.BinlogSource.Keyspace == sm.ts.TargetKeyspaceName() { + if intraKeyspaceStreams == nil { + intraKeyspaceStreams = make(map[string]bool) + } + targets := maps.Values(sm.ts.Targets()) + sort.Slice(targets, func(i, j int) bool { + return key.KeyRangeLess(targets[i].GetShard().GetKeyRange(), targets[j].GetShard().GetKeyRange()) + }) + for _, st := range targets { + stream := *vrs // Copy + stream.BinlogSource.Shard = st.GetShard().ShardName() + key := fmt.Sprintf("%s:%s/%s:%s/%s", stream.Workflow, target.si.Keyspace(), target.GetShard().ShardName(), st.GetShard().Keyspace(), st.GetShard().ShardName()) + if intraKeyspaceStreams[key] { + continue // We've already created the stream. + } + pos, err := sm.ts.TabletManagerClient().PrimaryPosition(ctx, st.primary.Tablet) + if err != nil { + return err + } + sm.ts.Logger().Infof("Setting position for intra-keyspace materialization workflow %s on %v/%v to %v on tablet %s", + stream.Workflow, st.primary.Keyspace, st.primary.Shard, pos, topoproto.TabletAliasString(st.primary.Tablet.Alias)) + stream.Position, err = binlogplayer.DecodePosition(pos) + if err != nil { + return err + } + intraKeyspaceStreams[key] = true + if err := addStreamRow(&stream); err != nil { + return err + } + } + continue + } + if err := addStreamRow(vrs); err != nil { + return err + } } - _, err := sm.ts.VReplicationExec(ctx, target.GetPrimary().GetAlias(), ig.String()) + _, err = sm.ts.VReplicationExec(ctx, target.GetPrimary().GetAlias(), ig.String()) return err }) } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index d7a6f5a31b6..a5c7c2a95d4 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -74,7 +74,7 @@ type controller struct { // newController creates a new controller. Unless a stream is explicitly 'Stopped', // this function launches a goroutine to perform continuous vreplication. -func newController(ctx context.Context, params map[string]string, dbClientFactory func() binlogplayer.DBClient, mysqld mysqlctl.MysqlDaemon, ts *topo.Server, cell, tabletTypesStr string, blpStats *binlogplayer.Stats, vre *Engine) (*controller, error) { +func newController(ctx context.Context, params map[string]string, dbClientFactory func() binlogplayer.DBClient, mysqld mysqlctl.MysqlDaemon, ts *topo.Server, cell, tabletTypesStr string, blpStats *binlogplayer.Stats, vre *Engine, tpo discovery.TabletPickerOptions) (*controller, error) { if blpStats == nil { blpStats = binlogplayer.NewStats() } @@ -131,7 +131,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor return nil, err } } - tp, err := discovery.NewTabletPicker(ctx, sourceTopo, cells, ct.vre.cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr, discovery.TabletPickerOptions{}) + tp, err := discovery.NewTabletPicker(ctx, sourceTopo, cells, ct.vre.cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr, tpo) if err != nil { return nil, err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller_plan.go b/go/vt/vttablet/tabletmanager/vreplication/controller_plan.go index 0273f05ab4e..42da92fe2cf 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller_plan.go @@ -21,6 +21,7 @@ import ( "strings" "vitess.io/vitess/go/constants/sidecar" + "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/sqlparser" ) @@ -29,6 +30,9 @@ type controllerPlan struct { query string opcode int + // tabletPickerOptions is set for updateQuery. + tabletPickerOptions discovery.TabletPickerOptions + // numInserts is set for insertQuery. numInserts int @@ -57,6 +61,13 @@ const ( // delete /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ from _vt.vreplication const AllowUnsafeWriteCommentDirective = "ALLOW_UNSAFE_VREPLICATION_WRITE" +// A comment directive that you need to include in your VReplication +// statements if you want the controller to include non-serving tablets +// in the execution plan (via tablet picker options). The full comment +// directive looks like this: +// update /*vt+ INCLUDE_NON_SERVING_TABLETS_IN_PLAN=1 */ _vt.vreplication set ... +const IncludeNonServingTabletsCommentDirective = "INCLUDE_NON_SERVING_TABLETS_IN_PLAN" + // Check that the given WHERE clause is using at least one of the specified // columns with an equality or in operator to ensure that it is being // properly selective and not unintentionally going to potentially affect @@ -232,6 +243,9 @@ func buildUpdatePlan(upd *sqlparser.Update) (*controllerPlan, error) { if tableName.Qualifier.String() != sidecar.GetName() && tableName.Qualifier.String() != sidecar.DefaultName { return nil, fmt.Errorf("invalid database name: %s", tableName.Qualifier.String()) } + cp := &controllerPlan{ + opcode: updateQuery, + } switch tableName.Name.String() { case reshardingJournalTableName: return &controllerPlan{ @@ -244,6 +258,12 @@ func buildUpdatePlan(upd *sqlparser.Update) (*controllerPlan, error) { AllowUnsafeWriteCommentDirective, sqlparser.String(upd.Where), columnsAsCSV(tableSelectiveColumns[vreplicationTableName])) } } + + if upd.Comments != nil && upd.Comments.Directives().IsSet(IncludeNonServingTabletsCommentDirective) { + cp.tabletPickerOptions = discovery.TabletPickerOptions{ + IncludeNonServingTablets: true, + } + } default: return nil, fmt.Errorf("invalid table name: %s", tableName.Name.String()) } @@ -266,15 +286,13 @@ func buildUpdatePlan(upd *sqlparser.Update) (*controllerPlan, error) { Right: sqlparser.ListArg("ids"), }, } + cp.selector = buf1.String() buf2 := sqlparser.NewTrackedBuffer(nil) buf2.Myprintf("%v", upd) + cp.applier = buf2.ParsedQuery() - return &controllerPlan{ - opcode: updateQuery, - selector: buf1.String(), - applier: buf2.ParsedQuery(), - }, nil + return cp, nil } func buildDeletePlan(del *sqlparser.Delete) (*controllerPlan, error) { diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go index efab9693fa2..57cb60384c6 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go @@ -25,6 +25,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/mysqlctl/tmutils" @@ -62,10 +63,11 @@ var ( }, }, } - testSelectorResponse1 = &sqltypes.Result{Rows: [][]sqltypes.Value{{sqltypes.NewInt64(1)}}} - testSelectorResponse2 = &sqltypes.Result{Rows: [][]sqltypes.Value{{sqltypes.NewInt64(1)}, {sqltypes.NewInt64(2)}}} - testDMLResponse = &sqltypes.Result{RowsAffected: 1} - testPos = "MariaDB/0-1-1083" + testSelectorResponse1 = &sqltypes.Result{Rows: [][]sqltypes.Value{{sqltypes.NewInt64(1)}}} + testSelectorResponse2 = &sqltypes.Result{Rows: [][]sqltypes.Value{{sqltypes.NewInt64(1)}, {sqltypes.NewInt64(2)}}} + testDMLResponse = &sqltypes.Result{RowsAffected: 1} + testPos = "MariaDB/0-1-1083" + defaultTabletPickerOptions = discovery.TabletPickerOptions{} ) func TestControllerKeyRange(t *testing.T) { @@ -92,7 +94,7 @@ func TestControllerKeyRange(t *testing.T) { mysqld.MysqlPort.Store(3306) vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) - ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, vre) + ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, vre, defaultTabletPickerOptions) if err != nil { t.Fatal(err) } @@ -154,7 +156,7 @@ func TestControllerTables(t *testing.T) { mysqld.MysqlPort.Store(3306) vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) - ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, vre) + ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, vre, defaultTabletPickerOptions) if err != nil { t.Fatal(err) } @@ -171,7 +173,7 @@ func TestControllerBadID(t *testing.T) { params := map[string]string{ "id": "bad", } - _, err := newController(context.Background(), params, nil, nil, nil, "", "", nil, nil) + _, err := newController(context.Background(), params, nil, nil, nil, "", "", nil, nil, defaultTabletPickerOptions) want := `strconv.ParseInt: parsing "bad": invalid syntax` if err == nil || err.Error() != want { t.Errorf("newController err: %v, want %v", err, want) @@ -184,7 +186,7 @@ func TestControllerStopped(t *testing.T) { "state": binlogdatapb.VReplicationWorkflowState_Stopped.String(), } - ct, err := newController(context.Background(), params, nil, nil, nil, "", "", nil, nil) + ct, err := newController(context.Background(), params, nil, nil, nil, "", "", nil, nil, defaultTabletPickerOptions) if err != nil { t.Fatal(err) } @@ -224,7 +226,7 @@ func TestControllerOverrides(t *testing.T) { mysqld.MysqlPort.Store(3306) vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) - ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil, vre) + ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil, vre, defaultTabletPickerOptions) if err != nil { t.Fatal(err) } @@ -251,7 +253,7 @@ func TestControllerCanceledContext(t *testing.T) { cancel() vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, nil, nil, nil, "", nil) - ct, err := newController(ctx, params, nil, nil, env.TopoServ, env.Cells[0], "rdonly", nil, vre) + ct, err := newController(ctx, params, nil, nil, env.TopoServ, env.Cells[0], "rdonly", nil, vre, defaultTabletPickerOptions) if err != nil { t.Fatal(err) } @@ -297,7 +299,7 @@ func TestControllerRetry(t *testing.T) { mysqld.MysqlPort.Store(3306) vre := NewTestEngine(nil, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) - ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil, vre) + ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil, vre, defaultTabletPickerOptions) if err != nil { t.Fatal(err) } @@ -359,7 +361,7 @@ func TestControllerStopPosition(t *testing.T) { mysqld.MysqlPort.Store(3306) vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) - ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, vre) + ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, vre, defaultTabletPickerOptions) if err != nil { t.Fatal(err) } diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index 5a31ff62be7..abbec84aa2f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -32,6 +32,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/dbconfigs" + "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/topo" @@ -280,7 +281,7 @@ func (vre *Engine) retry(ctx context.Context, err error) { func (vre *Engine) initControllers(rows []map[string]string) { for _, row := range rows { - ct, err := newController(vre.ctx, row, vre.dbClientFactoryFiltered, vre.mysqld, vre.ts, vre.cell, tabletTypesStr, nil, vre) + ct, err := newController(vre.ctx, row, vre.dbClientFactoryFiltered, vre.mysqld, vre.ts, vre.cell, tabletTypesStr, nil, vre, discovery.TabletPickerOptions{}) if err != nil { log.Errorf("Controller could not be initialized for stream: %v", row) continue @@ -337,12 +338,12 @@ func (vre *Engine) getDBClient(isAdmin bool) binlogplayer.DBClient { return vre.dbClientFactoryFiltered() } -// ExecWithDBA runs the specified query as the DBA user +// ExecWithDBA runs the specified query as the DBA user. func (vre *Engine) ExecWithDBA(query string) (*sqltypes.Result, error) { return vre.exec(query, true /*runAsAdmin*/) } -// Exec runs the specified query as the Filtered user +// Exec runs the specified query as the Filtered user. func (vre *Engine) Exec(query string) (*sqltypes.Result, error) { return vre.exec(query, false /*runAsAdmin*/) } @@ -428,7 +429,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error) if err != nil { return nil, err } - ct, err := newController(vre.ctx, params, vre.dbClientFactoryFiltered, vre.mysqld, vre.ts, vre.cell, tabletTypesStr, nil, vre) + ct, err := newController(vre.ctx, params, vre.dbClientFactoryFiltered, vre.mysqld, vre.ts, vre.cell, tabletTypesStr, nil, vre, plan.tabletPickerOptions) if err != nil { return nil, err } @@ -468,7 +469,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error) } // Create a new controller in place of the old one. // For continuity, the new controller inherits the previous stats. - ct, err := newController(vre.ctx, params, vre.dbClientFactoryFiltered, vre.mysqld, vre.ts, vre.cell, tabletTypesStr, blpStats[id], vre) + ct, err := newController(vre.ctx, params, vre.dbClientFactoryFiltered, vre.mysqld, vre.ts, vre.cell, tabletTypesStr, blpStats[id], vre, plan.tabletPickerOptions) if err != nil { return nil, err } @@ -728,7 +729,7 @@ func (vre *Engine) transitionJournal(je *journalEvent) { log.Errorf("transitionJournal: %v", err) return } - ct, err := newController(vre.ctx, params, vre.dbClientFactoryFiltered, vre.mysqld, vre.ts, vre.cell, tabletTypesStr, nil, vre) + ct, err := newController(vre.ctx, params, vre.dbClientFactoryFiltered, vre.mysqld, vre.ts, vre.cell, tabletTypesStr, nil, vre, discovery.TabletPickerOptions{}) if err != nil { log.Errorf("transitionJournal: %v", err) return diff --git a/test/config.json b/test/config.json index 9cce5851225..1bc7f97dff4 100644 --- a/test/config.json +++ b/test/config.json @@ -1222,7 +1222,7 @@ }, "vreplication_v2": { "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestBasicV2Workflows", "-timeout", "20m"], + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestBasicV2Workflows", "-timeout", "30m"], "Command": [], "Manual": false, "Shard": "vreplication_v2",