Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Errant GTID detection on the replicas when they set replication source #16833

Merged
merged 12 commits into from
Oct 3, 2024
Merged
31 changes: 31 additions & 0 deletions go/mysql/replication/mysql56_gtid_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,37 @@ func (set Mysql56GTIDSet) SIDBlock() []byte {
return buf.Bytes()
}

// ErrantGTIDsOnReplica gets the errant GTIDs on the replica by comparing against the primary position and UUID.
func ErrantGTIDsOnReplica(replicaPosition Position, primaryPositionStr string) (string, error) {
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
primaryPosition, err := DecodePosition(primaryPositionStr)
if err != nil {
return "", err
}
replicaGTIDSet, replicaOk := replicaPosition.GTIDSet.(Mysql56GTIDSet)
primaryGTIDSet, primaryOk := primaryPosition.GTIDSet.(Mysql56GTIDSet)

// Currently we only support errant GTID detection for MySQL 56 flavour.
if !replicaOk || !primaryOk {
return "", nil
}

// Calculate the difference between the replica and primary GTID sets.
diffSet := replicaGTIDSet.Difference(primaryGTIDSet)
return diffSet.String(), nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

// RemoveUUID removes a specific UUID from the gtid set.
func (set Mysql56GTIDSet) RemoveUUID(uuid SID) Mysql56GTIDSet {
newSet := make(Mysql56GTIDSet)
for sid, intervals := range set {
if sid == uuid {
continue
}
newSet[sid] = intervals
}
return newSet
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


// Difference will supply the difference between the receiver and supplied Mysql56GTIDSets, and supply the result
// as a Mysql56GTIDSet.
func (set Mysql56GTIDSet) Difference(other Mysql56GTIDSet) Mysql56GTIDSet {
Expand Down
88 changes: 88 additions & 0 deletions go/mysql/replication/mysql56_gtid_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,3 +704,91 @@ func BenchmarkMySQL56GTIDParsing(b *testing.B) {
}
}
}

func TestErrantGTIDsOnReplica(t *testing.T) {
tests := []struct {
name string
replicaPosition string
primaryPositionStr string
errantGtidWanted string
wantErr string
}{
{
name: "Empty replica position",
replicaPosition: "MySQL56/",
primaryPositionStr: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8",
errantGtidWanted: "",
}, {
name: "Empty primary position",
replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8",
primaryPositionStr: "MySQL56/",
errantGtidWanted: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8",
}, {
name: "Empty primary position - with multiple errant gtids",
replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1",
primaryPositionStr: "MySQL56/",
errantGtidWanted: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1",
}, {
name: "Primary position parse error",
replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1",
primaryPositionStr: "incorrect position",
wantErr: "unknown GTIDSet flavor",
}, {
name: "Single errant GTID",
replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:34",
primaryPositionStr: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-50,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1-30",
errantGtidWanted: "8bc65cca-3fe4-11ed-bbfb-091034d48bd3:34",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}, {
name: "Multiple errant GTID",
replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1-32,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:3-35",
primaryPositionStr: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-50,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1-30,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:34",
errantGtidWanted: "8bc65cca-3fe4-11ed-bbfb-091034d48b3e:31-32,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:3-33:35",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
replPos, err := DecodePosition(tt.replicaPosition)
require.NoError(t, err)
errantGTIDs, err := ErrantGTIDsOnReplica(replPos, tt.primaryPositionStr)
if tt.wantErr != "" {
require.ErrorContains(t, err, tt.wantErr)
} else {
require.NoError(t, err)
require.EqualValues(t, tt.errantGtidWanted, errantGTIDs)
}

})
}
}

func TestMysql56GTIDSet_RemoveUUID(t *testing.T) {
tests := []struct {
name string
initialSet string
uuid string
wantSet string
}{
{
name: "Remove unknown UUID",
initialSet: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1:4-24",
uuid: "8bc65c84-3fe4-11ed-a912-257f0fcde6c9",
wantSet: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1:4-24",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

},
{
name: "Remove a single UUID",
initialSet: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1:4-24",
uuid: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9",
wantSet: "8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1:4-24",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gtidSet, err := ParseMysql56GTIDSet(tt.initialSet)
require.NoError(t, err)
sid, err := ParseSID(tt.uuid)
require.NoError(t, err)
gtidSet = gtidSet.RemoveUUID(sid)
require.EqualValues(t, tt.wantSet, gtidSet.String())
})
}
}
4 changes: 4 additions & 0 deletions go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type FakeMysqlDaemon struct {
// and ReplicationStatus.
CurrentPrimaryPosition replication.Position

// CurrentRelayLogPosition is returned by ReplicationStatus.
CurrentRelayLogPosition replication.Position

// CurrentSourceFilePosition is used to determine the executed
// file based positioning of the replication source.
CurrentSourceFilePosition replication.Position
Expand Down Expand Up @@ -313,6 +316,7 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus(ctx context.Context) (replication.
return replication.ReplicationStatus{
Position: fmd.CurrentPrimaryPosition,
FilePosition: fmd.CurrentSourceFilePosition,
RelayLogPosition: fmd.CurrentRelayLogPosition,
RelayLogSourceBinlogEquivalentPosition: fmd.CurrentSourceFilePosition,
ReplicationLagSeconds: fmd.ReplicationLagSeconds,
// Implemented as AND to avoid changing all tests that were
Expand Down
48 changes: 14 additions & 34 deletions go/vt/vttablet/tabletmanager/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,15 @@ import (
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/mysqlctl/backupstats"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/proto/vttime"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tmclient"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// This file handles the initial backup restore upon startup.
Expand Down Expand Up @@ -326,7 +323,7 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L
} else if keyspaceInfo.KeyspaceType == topodatapb.KeyspaceType_NORMAL {
// Reconnect to primary only for "NORMAL" keyspaces
params.Logger.Infof("Restore: starting replication at position %v", pos)
if err := tm.startReplication(context.Background(), pos, originalType); err != nil {
if err := tm.startReplication(ctx, pos, originalType); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return err
}
}
Expand Down Expand Up @@ -577,47 +574,30 @@ func (tm *TabletManager) disableReplication(ctx context.Context) error {
}

func (tm *TabletManager) startReplication(ctx context.Context, pos replication.Position, tabletType topodatapb.TabletType) error {
if err := tm.MysqlDaemon.StopReplication(ctx, nil); err != nil {
// The first three steps of stopping replication, and setting the replication position,
// we want to do even if the context expires, so we use a background context for these tasks.
if err := tm.MysqlDaemon.StopReplication(context.Background(), nil); err != nil {
return vterrors.Wrap(err, "failed to stop replication")
}
if err := tm.MysqlDaemon.ResetReplicationParameters(ctx); err != nil {
if err := tm.MysqlDaemon.ResetReplicationParameters(context.Background()); err != nil {
return vterrors.Wrap(err, "failed to reset replication")
}

// Set the position at which to resume from the primary.
if err := tm.MysqlDaemon.SetReplicationPosition(ctx, pos); err != nil {
if err := tm.MysqlDaemon.SetReplicationPosition(context.Background(), pos); err != nil {
return vterrors.Wrap(err, "failed to set replication position")
}

primary, err := tm.initializeReplication(ctx, tabletType)
primaryPosStr, err := tm.initializeReplication(ctx, tabletType)
// If we ran into an error while initializing replication, then there is no point in waiting for catch-up.
// Also, if there is no primary tablet in the shard, we don't need to proceed further.
if err != nil || primary == nil {
if err != nil || primaryPosStr == "" {
return err
}

// wait for reliable replication_lag_seconds
// we have pos where we want to resume from
// if PrimaryPosition is the same, that means no writes
// have happened to primary, so we are up-to-date
// otherwise, wait for replica's Position to change from
// the initial pos before proceeding
tmc := tmclient.NewTabletManagerClient()
defer tmc.Close()
remoteCtx, remoteCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer remoteCancel()
posStr, err := tmc.PrimaryPosition(remoteCtx, primary.Tablet)
if err != nil {
// It is possible that though PrimaryAlias is set, the primary tablet is unreachable
// Log a warning and let tablet restore in that case
// If we had instead considered this fatal, all tablets would crash-loop
// until a primary appears, which would make it impossible to elect a primary.
log.Warningf("Can't get primary replication position after restore: %v", err)
return nil
}
primaryPos, err := replication.DecodePosition(posStr)
primaryPos, err := replication.DecodePosition(primaryPosStr)
if err != nil {
return vterrors.Wrapf(err, "can't decode primary replication position: %q", posStr)
return vterrors.Wrapf(err, "can't decode primary replication position: %q", primaryPos)
}

if !pos.Equal(primaryPos) {
Expand Down
27 changes: 27 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,7 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA
wasReplicating := false
shouldbeReplicating := false
status, err := tm.MysqlDaemon.ReplicationStatus(ctx)
replicaPosition := status.RelayLogPosition
if err == mysql.ErrNotReplica {
// This is a special error that means we actually succeeded in reading
// the status, but the status is empty because replication is not
Expand All @@ -716,6 +717,12 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA
// Since we continue in the case of this error, make sure 'status' is
// in a known, empty state.
status = replication.ReplicationStatus{}
// The replica position we use for the errant GTID detection should be the executed
// GTID set since this tablet is not running replication at all.
replicaPosition, err = tm.MysqlDaemon.PrimaryPosition(ctx)
if err != nil {
return err
}
} else if err != nil {
// Abort on any other non-nil error.
return err
Expand Down Expand Up @@ -747,12 +754,32 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA
if err != nil {
return err
}

host := parent.Tablet.MysqlHostname
port := parent.Tablet.MysqlPort
// If host is empty, then we shouldn't even attempt the reparent. That tablet has already shutdown.
if host == "" {
return vterrors.New(vtrpc.Code_FAILED_PRECONDITION, "Shard primary has empty mysql hostname")
}
// Errant GTID detection.
{
// Find the executed GTID set of the tablet that we are reparenting to.
// We will then compare our own position against it to verify that we don't
// have an errant GTID. If we find any GTID that we have, but the primary doesn't,
// we will not enter the replication graph and instead fail replication.
var primaryPosition string
primaryPosition, err = tm.tmc.PrimaryPosition(ctx, parent.Tablet)
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
errantGtid, err := replication.ErrantGTIDsOnReplica(replicaPosition, primaryPosition)
if err != nil {
return err
}
if errantGtid != "" {
return vterrors.New(vtrpc.Code_FAILED_PRECONDITION, fmt.Sprintf("Errant GTID detected - %s; Primary GTID - %s, Replica GTID - %s", errantGtid, primaryPosition, replicaPosition.String()))
}
}
if status.SourceHost != host || status.SourcePort != port || heartbeatInterval != 0 {
// This handles both changing the address and starting replication.
if err := tm.MysqlDaemon.SetReplicationSource(ctx, host, port, heartbeatInterval, wasReplicating, shouldbeReplicating); err != nil {
Expand Down
Loading
Loading