From eab262e152a538307fcdf8509c374107c43e441d Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Thu, 3 Oct 2024 12:01:20 +0530 Subject: [PATCH] Fix: Errant GTID detection on the replicas when they set replication source (#16833) Signed-off-by: Manan Gupta --- changelog/21.0/21.0.0/summary.md | 9 ++ go/mysql/replication/mysql56_gtid_set.go | 27 ++++++ go/mysql/replication/mysql56_gtid_set_test.go | 85 +++++++++++++++++++ go/vt/mysqlctl/fakemysqldaemon.go | 4 + go/vt/vttablet/tabletmanager/restore.go | 48 +++-------- .../vttablet/tabletmanager/rpc_replication.go | 30 +++++++ go/vt/vttablet/tabletmanager/tm_init.go | 64 +++++++++++--- go/vt/wrangler/testlib/backup_test.go | 17 ++-- go/vt/wrangler/testlib/reparent_utils_test.go | 33 +++++++ 9 files changed, 260 insertions(+), 57 deletions(-) diff --git a/changelog/21.0/21.0.0/summary.md b/changelog/21.0/21.0.0/summary.md index 91adc6d642d..d237f24fe5c 100644 --- a/changelog/21.0/21.0.0/summary.md +++ b/changelog/21.0/21.0.0/summary.md @@ -18,6 +18,7 @@ - **[Dynamic VReplication Configuration](#dynamic-vreplication-configuration)** - **[Reference Table Materialization](#reference-table-materialization)** - **[New VEXPLAIN Modes: TRACE and KEYS](#new-vexplain-modes)** + - **[Errant GTID Detection on Vttablets](#errant-gtid-vttablet)** ## Major Changes @@ -197,3 +198,11 @@ The KEYS mode for VEXPLAIN offers a concise summary of query structure, highligh KEYS mode analyzes the query structure without executing it, providing JSON output that includes grouping columns, join columns, filter columns (potential candidates for indexes, primary keys, or sharding keys), and the statement type. These new VEXPLAIN modes enhance Vitess's query analysis capabilities, allowing for more informed decisions about sharding strategies and query optimization. + +### Errant GTID Detection on Vttablets + +Vttablets now run an errant GTID detection logic before they join the replication stream. So, if a replica has an errant GTID, it will +not start replicating from the primary. It will fail the call the set its replication source because of the errant GTID. This prevents us +from running into situations from which recovery is very hard. + +For users running with the vitess operator on kubernetes, this change means that the replicas with errant GTIDs will have broken replication and will report as unready. The users will need to manually clean up these errant replica tablets. diff --git a/go/mysql/replication/mysql56_gtid_set.go b/go/mysql/replication/mysql56_gtid_set.go index 348af5b5274..918a6ec3b6b 100644 --- a/go/mysql/replication/mysql56_gtid_set.go +++ b/go/mysql/replication/mysql56_gtid_set.go @@ -467,6 +467,33 @@ func (set Mysql56GTIDSet) SIDBlock() []byte { return buf.Bytes() } +// ErrantGTIDsOnReplica gets the errant GTIDs on the replica by comparing against the primary position and UUID. +func ErrantGTIDsOnReplica(replicaPosition Position, primaryPosition Position) (string, error) { + replicaGTIDSet, replicaOk := replicaPosition.GTIDSet.(Mysql56GTIDSet) + primaryGTIDSet, primaryOk := primaryPosition.GTIDSet.(Mysql56GTIDSet) + + // Currently we only support errant GTID detection for MySQL 56 flavour. + if !replicaOk || !primaryOk { + return "", nil + } + + // Calculate the difference between the replica and primary GTID sets. + diffSet := replicaGTIDSet.Difference(primaryGTIDSet) + return diffSet.String(), nil +} + +// RemoveUUID removes a specific UUID from the gtid set. +func (set Mysql56GTIDSet) RemoveUUID(uuid SID) Mysql56GTIDSet { + newSet := make(Mysql56GTIDSet) + for sid, intervals := range set { + if sid == uuid { + continue + } + newSet[sid] = intervals + } + return newSet +} + // Difference will supply the difference between the receiver and supplied Mysql56GTIDSets, and supply the result // as a Mysql56GTIDSet. func (set Mysql56GTIDSet) Difference(other Mysql56GTIDSet) Mysql56GTIDSet { diff --git a/go/mysql/replication/mysql56_gtid_set_test.go b/go/mysql/replication/mysql56_gtid_set_test.go index 323baae3885..bff23679afb 100644 --- a/go/mysql/replication/mysql56_gtid_set_test.go +++ b/go/mysql/replication/mysql56_gtid_set_test.go @@ -704,3 +704,88 @@ func BenchmarkMySQL56GTIDParsing(b *testing.B) { } } } + +func TestErrantGTIDsOnReplica(t *testing.T) { + tests := []struct { + name string + replicaPosition string + primaryPosition string + errantGtidWanted string + wantErr string + }{ + { + name: "Empty replica position", + replicaPosition: "MySQL56/", + primaryPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8", + errantGtidWanted: "", + }, { + name: "Empty primary position", + replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8", + primaryPosition: "MySQL56/", + errantGtidWanted: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8", + }, { + name: "Empty primary position - with multiple errant gtids", + replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1", + primaryPosition: "MySQL56/", + errantGtidWanted: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1", + }, { + name: "Single errant GTID", + replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:34", + primaryPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-50,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1-30", + errantGtidWanted: "8bc65cca-3fe4-11ed-bbfb-091034d48bd3:34", + }, { + name: "Multiple errant GTID", + replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1-32,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:3-35", + primaryPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-50,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1-30,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:34", + errantGtidWanted: "8bc65cca-3fe4-11ed-bbfb-091034d48b3e:31-32,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:3-33:35", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + replPos, err := DecodePosition(tt.replicaPosition) + require.NoError(t, err) + primaryPos, err := DecodePosition(tt.primaryPosition) + require.NoError(t, err) + errantGTIDs, err := ErrantGTIDsOnReplica(replPos, primaryPos) + if tt.wantErr != "" { + require.ErrorContains(t, err, tt.wantErr) + } else { + require.NoError(t, err) + require.EqualValues(t, tt.errantGtidWanted, errantGTIDs) + } + + }) + } +} + +func TestMysql56GTIDSet_RemoveUUID(t *testing.T) { + tests := []struct { + name string + initialSet string + uuid string + wantSet string + }{ + { + name: "Remove unknown UUID", + initialSet: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1:4-24", + uuid: "8bc65c84-3fe4-11ed-a912-257f0fcde6c9", + wantSet: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1:4-24", + }, + { + name: "Remove a single UUID", + initialSet: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1:4-24", + uuid: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9", + wantSet: "8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1:4-24", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gtidSet, err := ParseMysql56GTIDSet(tt.initialSet) + require.NoError(t, err) + sid, err := ParseSID(tt.uuid) + require.NoError(t, err) + gtidSet = gtidSet.RemoveUUID(sid) + require.EqualValues(t, tt.wantSet, gtidSet.String()) + }) + } +} diff --git a/go/vt/mysqlctl/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go index 2557f7a7e51..1bab1639acb 100644 --- a/go/vt/mysqlctl/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon.go @@ -81,6 +81,9 @@ type FakeMysqlDaemon struct { // and ReplicationStatus. CurrentPrimaryPosition replication.Position + // CurrentRelayLogPosition is returned by ReplicationStatus. + CurrentRelayLogPosition replication.Position + // CurrentSourceFilePosition is used to determine the executed // file based positioning of the replication source. CurrentSourceFilePosition replication.Position @@ -313,6 +316,7 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus(ctx context.Context) (replication. return replication.ReplicationStatus{ Position: fmd.CurrentPrimaryPosition, FilePosition: fmd.CurrentSourceFilePosition, + RelayLogPosition: fmd.CurrentRelayLogPosition, RelayLogSourceBinlogEquivalentPosition: fmd.CurrentSourceFilePosition, ReplicationLagSeconds: fmd.ReplicationLagSeconds, // Implemented as AND to avoid changing all tests that were diff --git a/go/vt/vttablet/tabletmanager/restore.go b/go/vt/vttablet/tabletmanager/restore.go index a56eb4da37b..35587124108 100644 --- a/go/vt/vttablet/tabletmanager/restore.go +++ b/go/vt/vttablet/tabletmanager/restore.go @@ -36,18 +36,15 @@ import ( "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/mysqlctl/backupstats" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/proto/vttime" "vitess.io/vitess/go/vt/servenv" - "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" - "vitess.io/vitess/go/vt/vttablet/tmclient" - - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) // This file handles the initial backup restore upon startup. @@ -326,7 +323,7 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L } else if keyspaceInfo.KeyspaceType == topodatapb.KeyspaceType_NORMAL { // Reconnect to primary only for "NORMAL" keyspaces params.Logger.Infof("Restore: starting replication at position %v", pos) - if err := tm.startReplication(context.Background(), pos, originalType); err != nil { + if err := tm.startReplication(ctx, pos, originalType); err != nil { return err } } @@ -577,47 +574,30 @@ func (tm *TabletManager) disableReplication(ctx context.Context) error { } func (tm *TabletManager) startReplication(ctx context.Context, pos replication.Position, tabletType topodatapb.TabletType) error { - if err := tm.MysqlDaemon.StopReplication(ctx, nil); err != nil { + // The first three steps of stopping replication, and setting the replication position, + // we want to do even if the context expires, so we use a background context for these tasks. + if err := tm.MysqlDaemon.StopReplication(context.Background(), nil); err != nil { return vterrors.Wrap(err, "failed to stop replication") } - if err := tm.MysqlDaemon.ResetReplicationParameters(ctx); err != nil { + if err := tm.MysqlDaemon.ResetReplicationParameters(context.Background()); err != nil { return vterrors.Wrap(err, "failed to reset replication") } // Set the position at which to resume from the primary. - if err := tm.MysqlDaemon.SetReplicationPosition(ctx, pos); err != nil { + if err := tm.MysqlDaemon.SetReplicationPosition(context.Background(), pos); err != nil { return vterrors.Wrap(err, "failed to set replication position") } - primary, err := tm.initializeReplication(ctx, tabletType) + primaryPosStr, err := tm.initializeReplication(ctx, tabletType) // If we ran into an error while initializing replication, then there is no point in waiting for catch-up. // Also, if there is no primary tablet in the shard, we don't need to proceed further. - if err != nil || primary == nil { + if err != nil || primaryPosStr == "" { return err } - // wait for reliable replication_lag_seconds - // we have pos where we want to resume from - // if PrimaryPosition is the same, that means no writes - // have happened to primary, so we are up-to-date - // otherwise, wait for replica's Position to change from - // the initial pos before proceeding - tmc := tmclient.NewTabletManagerClient() - defer tmc.Close() - remoteCtx, remoteCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) - defer remoteCancel() - posStr, err := tmc.PrimaryPosition(remoteCtx, primary.Tablet) - if err != nil { - // It is possible that though PrimaryAlias is set, the primary tablet is unreachable - // Log a warning and let tablet restore in that case - // If we had instead considered this fatal, all tablets would crash-loop - // until a primary appears, which would make it impossible to elect a primary. - log.Warningf("Can't get primary replication position after restore: %v", err) - return nil - } - primaryPos, err := replication.DecodePosition(posStr) + primaryPos, err := replication.DecodePosition(primaryPosStr) if err != nil { - return vterrors.Wrapf(err, "can't decode primary replication position: %q", posStr) + return vterrors.Wrapf(err, "can't decode primary replication position: %q", primaryPos) } if !pos.Equal(primaryPos) { diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 545eb1471bd..6fe4e779cbb 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -708,6 +708,7 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA wasReplicating := false shouldbeReplicating := false status, err := tm.MysqlDaemon.ReplicationStatus(ctx) + replicaPosition := status.RelayLogPosition if err == mysql.ErrNotReplica { // This is a special error that means we actually succeeded in reading // the status, but the status is empty because replication is not @@ -717,6 +718,12 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA // Since we continue in the case of this error, make sure 'status' is // in a known, empty state. status = replication.ReplicationStatus{} + // The replica position we use for the errant GTID detection should be the executed + // GTID set since this tablet is not running replication at all. + replicaPosition, err = tm.MysqlDaemon.PrimaryPosition(ctx) + if err != nil { + return err + } } else if err != nil { // Abort on any other non-nil error. return err @@ -748,12 +755,35 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA if err != nil { return err } + host := parent.Tablet.MysqlHostname port := parent.Tablet.MysqlPort // If host is empty, then we shouldn't even attempt the reparent. That tablet has already shutdown. if host == "" { return vterrors.New(vtrpc.Code_FAILED_PRECONDITION, "Shard primary has empty mysql hostname") } + // Errant GTID detection. + { + // Find the executed GTID set of the tablet that we are reparenting to. + // We will then compare our own position against it to verify that we don't + // have an errant GTID. If we find any GTID that we have, but the primary doesn't, + // we will not enter the replication graph and instead fail replication. + primaryPositionStr, err := tm.tmc.PrimaryPosition(ctx, parent.Tablet) + if err != nil { + return err + } + primaryPosition, err := replication.DecodePosition(primaryPositionStr) + if err != nil { + return err + } + errantGtid, err := replication.ErrantGTIDsOnReplica(replicaPosition, primaryPosition) + if err != nil { + return err + } + if errantGtid != "" { + return vterrors.New(vtrpc.Code_FAILED_PRECONDITION, fmt.Sprintf("Errant GTID detected - %s; Primary GTID - %s, Replica GTID - %s", errantGtid, primaryPosition, replicaPosition.String())) + } + } if status.SourceHost != host || status.SourcePort != port || heartbeatInterval != 0 { // This handles both changing the address and starting replication. if err := tm.MysqlDaemon.SetReplicationSource(ctx, host, port, heartbeatInterval, wasReplicating, shouldbeReplicating); err != nil { diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go index fbba2b6c713..efc6c57661a 100644 --- a/go/vt/vttablet/tabletmanager/tm_init.go +++ b/go/vt/vttablet/tabletmanager/tm_init.go @@ -50,6 +50,7 @@ import ( "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/flagutil" "vitess.io/vitess/go/mysql/collations" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/protoutil" @@ -64,6 +65,7 @@ import ( "vitess.io/vitess/go/vt/mysqlctl" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" @@ -75,6 +77,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" "vitess.io/vitess/go/vt/vttablet/tabletserver" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tmclient" ) const ( @@ -162,6 +165,9 @@ type TabletManager struct { VDiffEngine *vdiff.Engine Env *vtenv.Environment + // tmc is used to run an RPC against other vttablets. + tmc tmclient.TabletManagerClient + // tmState manages the TabletManager state. tmState *tmState @@ -356,6 +362,7 @@ func (tm *TabletManager) Start(tablet *topodatapb.Tablet, config *tabletenv.Tabl log.Infof("TabletManager Start") tm.DBConfigs.DBName = topoproto.TabletDbName(tablet) tm.tabletAlias = tablet.Alias + tm.tmc = tmclient.NewTabletManagerClient() tm.tmState = newTMState(tm, tablet) tm.actionSema = semaphore.NewWeighted(1) tm._waitForGrantsComplete = make(chan struct{}) @@ -958,50 +965,50 @@ func (tm *TabletManager) hookExtraEnv() map[string]string { // initializeReplication is used to initialize the replication when the tablet starts. // It returns the current primary tablet for use externally -func (tm *TabletManager) initializeReplication(ctx context.Context, tabletType topodatapb.TabletType) (primary *topo.TabletInfo, err error) { +func (tm *TabletManager) initializeReplication(ctx context.Context, tabletType topodatapb.TabletType) (primaryPosStr string, err error) { // If active reparents are disabled, we do not touch replication. // There is nothing to do if mysqlctl.DisableActiveReparents { - return nil, nil + return "", nil } // If the desired tablet type is primary, then we shouldn't be setting our replication source. // So there is nothing to do. if tabletType == topodatapb.TabletType_PRIMARY { - return nil, nil + return "", nil } // Read the shard to find the current primary, and its location. tablet := tm.Tablet() si, err := tm.TopoServer.GetShard(ctx, tablet.Keyspace, tablet.Shard) if err != nil { - return nil, vterrors.Wrap(err, "cannot read shard") + return "", vterrors.Wrap(err, "cannot read shard") } if si.PrimaryAlias == nil { // There's no primary. This is fine, since there might be no primary currently log.Warningf("cannot start replication during initialization: shard %v/%v has no primary.", tablet.Keyspace, tablet.Shard) - return nil, nil + return "", nil } if topoproto.TabletAliasEqual(si.PrimaryAlias, tablet.Alias) { // We used to be the primary before we got restarted, // and no other primary has been elected in the meantime. // There isn't anything to do here either. log.Warningf("cannot start replication during initialization: primary in shard record still points to this tablet.") - return nil, nil + return "", nil } currentPrimary, err := tm.TopoServer.GetTablet(ctx, si.PrimaryAlias) if err != nil { - return nil, vterrors.Wrapf(err, "cannot read primary tablet %v", si.PrimaryAlias) + return "", vterrors.Wrapf(err, "cannot read primary tablet %v", si.PrimaryAlias) } durabilityName, err := tm.TopoServer.GetKeyspaceDurability(ctx, tablet.Keyspace) if err != nil { - return nil, vterrors.Wrapf(err, "cannot read keyspace durability policy %v", tablet.Keyspace) + return "", vterrors.Wrapf(err, "cannot read keyspace durability policy %v", tablet.Keyspace) } log.Infof("Getting a new durability policy for %v", durabilityName) durability, err := reparentutil.GetDurabilityPolicy(durabilityName) if err != nil { - return nil, vterrors.Wrapf(err, "cannot get durability policy %v", durabilityName) + return "", vterrors.Wrapf(err, "cannot get durability policy %v", durabilityName) } // If using semi-sync, we need to enable it before connecting to primary. // We should set the correct type, since it is used in replica semi-sync @@ -1010,21 +1017,50 @@ func (tm *TabletManager) initializeReplication(ctx context.Context, tabletType t semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, reparentutil.IsReplicaSemiSync(durability, currentPrimary.Tablet, tablet)) if err != nil { - return nil, err + return "", err } if err := tm.fixSemiSync(ctx, tabletType, semiSyncAction); err != nil { - return nil, err + return "", err } // Set primary and start replication. if currentPrimary.Tablet.MysqlHostname == "" { log.Warningf("primary tablet in the shard record does not have mysql hostname specified, possibly because that tablet has been shut down.") - return nil, nil + return "", nil + } + + // Find our own executed GTID set and, + // the executed GTID set of the tablet that we are reparenting to. + // We will then compare our own position against it to verify that we don't + // have an errant GTID. If we find any GTID that we have, but the primary doesn't, + // we will not enter the replication graph and instead fail replication. + replicaPos, err := tm.MysqlDaemon.PrimaryPosition(ctx) + if err != nil { + return "", err + } + + primaryPosStr, err = tm.tmc.PrimaryPosition(ctx, currentPrimary.Tablet) + if err != nil { + return "", err } + + primaryPosition, err := replication.DecodePosition(primaryPosStr) + if err != nil { + return "", err + } + + errantGTIDs, err := replication.ErrantGTIDsOnReplica(replicaPos, primaryPosition) + if err != nil { + return "", err + } + if errantGTIDs != "" { + return "", vterrors.New(vtrpc.Code_FAILED_PRECONDITION, fmt.Sprintf("Errant GTID detected - %s", errantGTIDs)) + } + if err := tm.MysqlDaemon.SetReplicationSource(ctx, currentPrimary.Tablet.MysqlHostname, currentPrimary.Tablet.MysqlPort, 0, true, true); err != nil { - return nil, vterrors.Wrap(err, "MysqlDaemon.SetReplicationSource failed") + return "", vterrors.Wrap(err, "MysqlDaemon.SetReplicationSource failed") } - return currentPrimary, nil + return primaryPosStr, nil } diff --git a/go/vt/wrangler/testlib/backup_test.go b/go/vt/wrangler/testlib/backup_test.go index 0c7bc412f40..f1977df3f16 100644 --- a/go/vt/wrangler/testlib/backup_test.go +++ b/go/vt/wrangler/testlib/backup_test.go @@ -37,7 +37,6 @@ import ( "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" "vitess.io/vitess/go/vt/mysqlctl/filebackupstorage" - "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtenv" @@ -688,9 +687,6 @@ func TestRestoreUnreachablePrimary(t *testing.T) { "FAKE RESET REPLICA ALL", "FAKE RESET BINARY LOGS AND GTIDS", "FAKE SET GLOBAL gtid_purged", - "STOP REPLICA", - "FAKE SET SOURCE", - "START REPLICA", } destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ "SHOW DATABASES": {}, @@ -714,13 +710,16 @@ func TestRestoreUnreachablePrimary(t *testing.T) { // stop primary so that it is unreachable primary.StopActionLoop(t) - // set a short timeout so that we don't have to wait 30 seconds - topo.RemoteOperationTimeout = 2 * time.Second - // Restore should still succeed - require.NoError(t, destTablet.TM.RestoreData(ctx, logutil.NewConsoleLogger(), 0 /* waitForBackupInterval */, false /* deleteBeforeRestore */, time.Time{} /* restoreFromBackupTs */, time.Time{} /* restoreToTimestamp */, "" /* restoreToPos */, []string{} /* ignoreBackupEngines */, mysqlShutdownTimeout)) + // Attempt to fix the test, but its still failing :man_shrugging. + ctx, cancel = context.WithTimeout(ctx, 2*time.Second) + defer cancel() + // Restore will return an error while trying to contact the primary for its position, but otherwise will succeed. + // The replication won't be running however, since we can't run errant GTID detection without the primary being online. + err = destTablet.TM.RestoreData(ctx, logutil.NewConsoleLogger(), 0 /* waitForBackupInterval */, false /* deleteBeforeRestore */, time.Time{} /* restoreFromBackupTs */, time.Time{} /* restoreToTimestamp */, "" /* restoreToPos */, []string{} /* ignoreBackupEngines */, mysqlShutdownTimeout) + require.ErrorContains(t, err, "DeadlineExceeded") // verify the full status require.NoError(t, destTablet.FakeMysqlDaemon.CheckSuperQueryList(), "destTablet.FakeMysqlDaemon.CheckSuperQueryList failed") - assert.True(t, destTablet.FakeMysqlDaemon.Replicating) + assert.False(t, destTablet.FakeMysqlDaemon.Replicating) assert.True(t, destTablet.FakeMysqlDaemon.Running) } diff --git a/go/vt/wrangler/testlib/reparent_utils_test.go b/go/vt/wrangler/testlib/reparent_utils_test.go index e0a2077c778..ea2e34b66bd 100644 --- a/go/vt/wrangler/testlib/reparent_utils_test.go +++ b/go/vt/wrangler/testlib/reparent_utils_test.go @@ -205,6 +205,9 @@ func TestSetReplicationSource(t *testing.T) { return nil }) require.NoError(t, err, "UpdateShardFields failed") + pos, err := replication.DecodePosition("MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8") + require.NoError(t, err) + primary.FakeMysqlDaemon.CurrentPrimaryPositionLocked(pos) // primary action loop (to initialize host and port) primary.StartActionLoop(t, wr) @@ -246,6 +249,36 @@ func TestSetReplicationSource(t *testing.T) { checkSemiSyncEnabled(t, false, true, replica) }) + t.Run("Errant GTIDs on the replica", func(t *testing.T) { + replica := NewFakeTablet(t, wr, "cell1", 4, topodatapb.TabletType_REPLICA, nil) + // replica loop + replica.FakeMysqlDaemon.Replicating = true + replica.FakeMysqlDaemon.IOThreadRunning = true + replica.FakeMysqlDaemon.SetReplicationSourceInputs = append(replica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) + replica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ + // These 3 statements come from tablet startup + "STOP REPLICA", + "FAKE SET SOURCE", + "START REPLICA", + } + replica.StartActionLoop(t, wr) + defer replica.StopActionLoop(t) + + // Set replica's GTID to have a write that the primary's GTID doesn't have + pos, err = replication.DecodePosition("MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-7,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1") + require.NoError(t, err) + replica.FakeMysqlDaemon.CurrentRelayLogPosition = pos + + // run SetReplicationSource + err = wr.SetReplicationSource(ctx, replica.Tablet) + require.ErrorContains(t, err, "Errant GTID detected") + + // check what was run + err = replica.FakeMysqlDaemon.CheckSuperQueryList() + require.NoError(t, err, "CheckSuperQueryList failed") + checkSemiSyncEnabled(t, false, true, replica) + }) + // test setting an empty hostname because of primary shutdown t.Run("Primary tablet already shutdown", func(t *testing.T) { replica := NewFakeTablet(t, wr, "cell1", 3, topodatapb.TabletType_REPLICA, nil)