From f4714df5b4a3fea3bf4553c62e8743fcc054be84 Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Tue, 15 Oct 2024 12:59:33 +0530 Subject: [PATCH] Flaky test fixes (#16940) Signed-off-by: Manan Gupta --- go/mysql/server_test.go | 20 +++++- go/vt/mysqlctl/fakemysqldaemon.go | 25 ++++--- go/vt/vtctl/grpcvtctldserver/server_test.go | 2 +- .../vttablet/tabletmanager/framework_test.go | 4 +- go/vt/wrangler/testlib/backup_test.go | 66 +++++++++---------- .../testlib/emergency_reparent_shard_test.go | 26 ++++---- .../testlib/planned_reparent_shard_test.go | 26 ++++---- go/vt/wrangler/testlib/reparent_utils_test.go | 8 +-- go/vt/wrangler/traffic_switcher_env_test.go | 8 +-- 9 files changed, 106 insertions(+), 79 deletions(-) diff --git a/go/mysql/server_test.go b/go/mysql/server_test.go index 082a176e3af..72b6f25d0c8 100644 --- a/go/mysql/server_test.go +++ b/go/mysql/server_test.go @@ -1424,7 +1424,7 @@ func TestListenerShutdown(t *testing.T) { l.Shutdown() - assert.EqualValues(t, 1, connRefuse.Get(), "connRefuse") + waitForConnRefuse(t, 1) err = conn.Ping() require.EqualError(t, err, "Server shutdown in progress (errno 1053) (sqlstate 08S01)") @@ -1436,6 +1436,24 @@ func TestListenerShutdown(t *testing.T) { require.Equal(t, "Server shutdown in progress", sqlErr.Message) } +func waitForConnRefuse(t *testing.T, valWanted int64) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + tick := time.NewTicker(100 * time.Millisecond) + defer tick.Stop() + + for { + select { + case <-ctx.Done(): + require.FailNow(t, "connRefuse did not reach %v", valWanted) + case <-tick.C: + if connRefuse.Get() == valWanted { + return + } + } + } +} + func TestParseConnAttrs(t *testing.T) { expected := map[string]string{ "_client_version": "8.0.11", diff --git a/go/vt/mysqlctl/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go index 317aed4f578..d29bbaf079a 100644 --- a/go/vt/mysqlctl/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon.go @@ -292,13 +292,6 @@ func (fmd *FakeMysqlDaemon) GetServerUUID(ctx context.Context) (string, error) { return "000000", nil } -// CurrentPrimaryPositionLocked is thread-safe. -func (fmd *FakeMysqlDaemon) CurrentPrimaryPositionLocked(pos replication.Position) { - fmd.mu.Lock() - defer fmd.mu.Unlock() - fmd.CurrentPrimaryPosition = pos -} - // ReplicationStatus is part of the MysqlDaemon interface. func (fmd *FakeMysqlDaemon) ReplicationStatus(ctx context.Context) (replication.ReplicationStatus, error) { if fmd.ReplicationStatusError != nil { @@ -322,6 +315,8 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus(ctx context.Context) (replication. // PrimaryStatus is part of the MysqlDaemon interface. func (fmd *FakeMysqlDaemon) PrimaryStatus(ctx context.Context) (replication.PrimaryStatus, error) { + fmd.mu.Lock() + defer fmd.mu.Unlock() if fmd.PrimaryStatusError != nil { return replication.PrimaryStatus{}, fmd.PrimaryStatusError } @@ -391,7 +386,21 @@ func (fmd *FakeMysqlDaemon) GetPreviousGTIDs(ctx context.Context, binlog string) // PrimaryPosition is part of the MysqlDaemon interface. func (fmd *FakeMysqlDaemon) PrimaryPosition(ctx context.Context) (replication.Position, error) { - return fmd.CurrentPrimaryPosition, nil + return fmd.GetPrimaryPositionLocked(), nil +} + +// GetPrimaryPositionLocked gets the primary position while holding the lock. +func (fmd *FakeMysqlDaemon) GetPrimaryPositionLocked() replication.Position { + fmd.mu.Lock() + defer fmd.mu.Unlock() + return fmd.CurrentPrimaryPosition +} + +// SetPrimaryPositionLocked is thread-safe. +func (fmd *FakeMysqlDaemon) SetPrimaryPositionLocked(pos replication.Position) { + fmd.mu.Lock() + defer fmd.mu.Unlock() + fmd.CurrentPrimaryPosition = pos } // IsReadOnly is part of the MysqlDaemon interface. diff --git a/go/vt/vtctl/grpcvtctldserver/server_test.go b/go/vt/vtctl/grpcvtctldserver/server_test.go index f72b92479a1..71f521967fd 100644 --- a/go/vt/vtctl/grpcvtctldserver/server_test.go +++ b/go/vt/vtctl/grpcvtctldserver/server_test.go @@ -12816,7 +12816,7 @@ func TestTabletExternallyReparented(t *testing.T) { defer func() { topofactory.SetError(nil) - ctx, cancel := context.WithTimeout(ctx, time.Millisecond*10) + ctx, cancel := context.WithTimeout(ctx, time.Second*10) defer cancel() resp, err := vtctld.GetTablets(ctx, &vtctldatapb.GetTabletsRequest{}) diff --git a/go/vt/vttablet/tabletmanager/framework_test.go b/go/vt/vttablet/tabletmanager/framework_test.go index 27a3a562cd3..052e0189a2c 100644 --- a/go/vt/vttablet/tabletmanager/framework_test.go +++ b/go/vt/vttablet/tabletmanager/framework_test.go @@ -113,8 +113,8 @@ func newTestEnv(t *testing.T, ctx context.Context, sourceKeyspace string, source tmclienttest.SetProtocol(fmt.Sprintf("go.vt.vttablet.tabletmanager.framework_test_%s", t.Name()), tenv.protoName) tenv.mysqld = mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) - var err error - tenv.mysqld.CurrentPrimaryPosition, err = replication.ParsePosition(gtidFlavor, gtidPosition) + curPosition, err := replication.ParsePosition(gtidFlavor, gtidPosition) + tenv.mysqld.SetPrimaryPositionLocked(curPosition) require.NoError(t, err) return tenv diff --git a/go/vt/wrangler/testlib/backup_test.go b/go/vt/wrangler/testlib/backup_test.go index 5e73d266705..5573d0f4561 100644 --- a/go/vt/wrangler/testlib/backup_test.go +++ b/go/vt/wrangler/testlib/backup_test.go @@ -150,7 +150,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { primary := NewFakeTablet(t, wr, "cell1", 0, topodatapb.TabletType_PRIMARY, db) primary.FakeMysqlDaemon.ReadOnly = false primary.FakeMysqlDaemon.Replicating = false - primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -158,7 +158,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { Sequence: 457, }, }, - } + }) // start primary so that replica can fetch primary position from it primary.StartActionLoop(t, wr) @@ -170,7 +170,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { sourceTablet.FakeMysqlDaemon.ReadOnly = true sourceTablet.FakeMysqlDaemon.Replicating = true sourceTablet.FakeMysqlDaemon.SetReplicationSourceInputs = []string{fmt.Sprintf("%s:%d", primary.Tablet.MysqlHostname, primary.Tablet.MysqlPort)} - sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + sourceTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -178,7 +178,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { Sequence: 457, }, }, - } + }) sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup "STOP REPLICA", @@ -221,7 +221,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { destTablet := NewFakeTablet(t, wr, "cell1", 2, topodatapb.TabletType_REPLICA, db) destTablet.FakeMysqlDaemon.ReadOnly = true destTablet.FakeMysqlDaemon.Replicating = true - destTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + destTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -229,7 +229,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { Sequence: 457, }, }, - } + }) destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup "STOP REPLICA", @@ -248,7 +248,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ "SHOW DATABASES": {}, } - destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition + destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.GetPrimaryPositionLocked() destTablet.FakeMysqlDaemon.SetReplicationSourceInputs = append(destTablet.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) destTablet.StartActionLoop(t, wr) @@ -301,7 +301,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { "START REPLICA", } - primary.FakeMysqlDaemon.SetReplicationPositionPos = primary.FakeMysqlDaemon.CurrentPrimaryPosition + primary.FakeMysqlDaemon.SetReplicationPositionPos = primary.FakeMysqlDaemon.GetPrimaryPositionLocked() // restore primary from latest backup require.NoError(t, primary.TM.RestoreData(ctx, logutil.NewConsoleLogger(), 0 /* waitForBackupInterval */, false /* deleteBeforeRestore */, time.Time{} /* restoreFromBackupTs */, time.Time{} /* restoreToTimestamp */, "", mysqlShutdownTimeout), @@ -388,7 +388,7 @@ func TestBackupRestoreLagged(t *testing.T) { primary := NewFakeTablet(t, wr, "cell1", 0, topodatapb.TabletType_PRIMARY, db) primary.FakeMysqlDaemon.ReadOnly = false primary.FakeMysqlDaemon.Replicating = false - primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -396,7 +396,7 @@ func TestBackupRestoreLagged(t *testing.T) { Sequence: 457, }, }, - } + }) // start primary so that replica can fetch primary position from it primary.StartActionLoop(t, wr) @@ -407,7 +407,7 @@ func TestBackupRestoreLagged(t *testing.T) { sourceTablet := NewFakeTablet(t, wr, "cell1", 1, topodatapb.TabletType_REPLICA, db) sourceTablet.FakeMysqlDaemon.ReadOnly = true sourceTablet.FakeMysqlDaemon.Replicating = true - sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + sourceTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -415,7 +415,7 @@ func TestBackupRestoreLagged(t *testing.T) { Sequence: 456, }, }, - } + }) sourceTablet.FakeMysqlDaemon.SetReplicationSourceInputs = []string{fmt.Sprintf("%s:%d", primary.Tablet.MysqlHostname, primary.Tablet.MysqlPort)} sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup @@ -449,7 +449,7 @@ func TestBackupRestoreLagged(t *testing.T) { timer := time.NewTicker(1 * time.Second) <-timer.C - sourceTablet.FakeMysqlDaemon.CurrentPrimaryPositionLocked(replication.Position{ + sourceTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -468,7 +468,7 @@ func TestBackupRestoreLagged(t *testing.T) { require.NoError(t, sourceTablet.FakeMysqlDaemon.CheckSuperQueryList()) assert.True(t, sourceTablet.FakeMysqlDaemon.Replicating) assert.True(t, sourceTablet.FakeMysqlDaemon.Running) - assert.Equal(t, primary.FakeMysqlDaemon.CurrentPrimaryPosition, sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition) + assert.Equal(t, primary.FakeMysqlDaemon.GetPrimaryPositionLocked(), sourceTablet.FakeMysqlDaemon.GetPrimaryPositionLocked()) case <-timer2.C: require.FailNow(t, "Backup timed out") } @@ -477,7 +477,7 @@ func TestBackupRestoreLagged(t *testing.T) { destTablet := NewFakeTablet(t, wr, "cell1", 2, topodatapb.TabletType_REPLICA, db) destTablet.FakeMysqlDaemon.ReadOnly = true destTablet.FakeMysqlDaemon.Replicating = true - destTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + destTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -485,7 +485,7 @@ func TestBackupRestoreLagged(t *testing.T) { Sequence: 456, }, }, - } + }) destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup "STOP REPLICA", @@ -504,7 +504,7 @@ func TestBackupRestoreLagged(t *testing.T) { destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ "SHOW DATABASES": {}, } - destTablet.FakeMysqlDaemon.SetReplicationPositionPos = destTablet.FakeMysqlDaemon.CurrentPrimaryPosition + destTablet.FakeMysqlDaemon.SetReplicationPositionPos = destTablet.FakeMysqlDaemon.GetPrimaryPositionLocked() destTablet.FakeMysqlDaemon.SetReplicationSourceInputs = append(destTablet.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) destTablet.StartActionLoop(t, wr) @@ -527,7 +527,7 @@ func TestBackupRestoreLagged(t *testing.T) { timer = time.NewTicker(1 * time.Second) <-timer.C - destTablet.FakeMysqlDaemon.CurrentPrimaryPositionLocked(replication.Position{ + destTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -545,7 +545,7 @@ func TestBackupRestoreLagged(t *testing.T) { require.NoError(t, destTablet.FakeMysqlDaemon.CheckSuperQueryList(), "destTablet.FakeMysqlDaemon.CheckSuperQueryList failed") assert.True(t, destTablet.FakeMysqlDaemon.Replicating) assert.True(t, destTablet.FakeMysqlDaemon.Running) - assert.Equal(t, primary.FakeMysqlDaemon.CurrentPrimaryPosition, destTablet.FakeMysqlDaemon.CurrentPrimaryPosition) + assert.Equal(t, primary.FakeMysqlDaemon.GetPrimaryPositionLocked(), destTablet.FakeMysqlDaemon.GetPrimaryPositionLocked()) case <-timer2.C: require.FailNow(t, "Restore timed out") } @@ -608,7 +608,7 @@ func TestRestoreUnreachablePrimary(t *testing.T) { primary := NewFakeTablet(t, wr, "cell1", 0, topodatapb.TabletType_PRIMARY, db) primary.FakeMysqlDaemon.ReadOnly = false primary.FakeMysqlDaemon.Replicating = false - primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -616,7 +616,7 @@ func TestRestoreUnreachablePrimary(t *testing.T) { Sequence: 457, }, }, - } + }) // start primary so that replica can fetch primary position from it primary.StartActionLoop(t, wr) @@ -626,7 +626,7 @@ func TestRestoreUnreachablePrimary(t *testing.T) { sourceTablet := NewFakeTablet(t, wr, "cell1", 1, topodatapb.TabletType_REPLICA, db) sourceTablet.FakeMysqlDaemon.ReadOnly = true sourceTablet.FakeMysqlDaemon.Replicating = true - sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + sourceTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -634,7 +634,7 @@ func TestRestoreUnreachablePrimary(t *testing.T) { Sequence: 457, }, }, - } + }) sourceTablet.FakeMysqlDaemon.SetReplicationSourceInputs = []string{fmt.Sprintf("%s:%d", primary.Tablet.MysqlHostname, primary.Tablet.MysqlPort)} sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup @@ -668,7 +668,7 @@ func TestRestoreUnreachablePrimary(t *testing.T) { destTablet := NewFakeTablet(t, wr, "cell1", 2, topodatapb.TabletType_REPLICA, db) destTablet.FakeMysqlDaemon.ReadOnly = true destTablet.FakeMysqlDaemon.Replicating = true - destTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + destTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -676,7 +676,7 @@ func TestRestoreUnreachablePrimary(t *testing.T) { Sequence: 457, }, }, - } + }) destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup "STOP REPLICA", @@ -695,7 +695,7 @@ func TestRestoreUnreachablePrimary(t *testing.T) { destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ "SHOW DATABASES": {}, } - destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition + destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.GetPrimaryPositionLocked() destTablet.FakeMysqlDaemon.SetReplicationSourceInputs = append(destTablet.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) destTablet.StartActionLoop(t, wr) @@ -784,7 +784,7 @@ func TestDisableActiveReparents(t *testing.T) { primary := NewFakeTablet(t, wr, "cell1", 0, topodatapb.TabletType_PRIMARY, db) primary.FakeMysqlDaemon.ReadOnly = false primary.FakeMysqlDaemon.Replicating = false - primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -792,7 +792,7 @@ func TestDisableActiveReparents(t *testing.T) { Sequence: 457, }, }, - } + }) // start primary so that replica can fetch primary position from it primary.StartActionLoop(t, wr) @@ -803,7 +803,7 @@ func TestDisableActiveReparents(t *testing.T) { sourceTablet := NewFakeTablet(t, wr, "cell1", 1, topodatapb.TabletType_REPLICA, db) sourceTablet.FakeMysqlDaemon.ReadOnly = true sourceTablet.FakeMysqlDaemon.Replicating = true - sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + sourceTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -811,7 +811,7 @@ func TestDisableActiveReparents(t *testing.T) { Sequence: 457, }, }, - } + }) sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "STOP REPLICA", } @@ -836,7 +836,7 @@ func TestDisableActiveReparents(t *testing.T) { destTablet := NewFakeTablet(t, wr, "cell1", 2, topodatapb.TabletType_REPLICA, db) destTablet.FakeMysqlDaemon.ReadOnly = true destTablet.FakeMysqlDaemon.Replicating = true - destTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + destTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -844,7 +844,7 @@ func TestDisableActiveReparents(t *testing.T) { Sequence: 457, }, }, - } + }) destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "FAKE RESET BINARY LOGS AND GTIDS", "FAKE SET GLOBAL gtid_purged", @@ -856,7 +856,7 @@ func TestDisableActiveReparents(t *testing.T) { destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ "SHOW DATABASES": {}, } - destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition + destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.GetPrimaryPositionLocked() destTablet.FakeMysqlDaemon.SetReplicationSourceInputs = append(destTablet.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) destTablet.StartActionLoop(t, wr) diff --git a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go index 96f9df74405..3167be5e512 100644 --- a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go @@ -63,7 +63,7 @@ func TestEmergencyReparentShard(t *testing.T) { reparenttestutil.SetKeyspaceDurability(context.Background(), t, ts, "test_keyspace", "semi_sync") oldPrimary.FakeMysqlDaemon.Replicating = false - oldPrimary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + oldPrimary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -71,7 +71,7 @@ func TestEmergencyReparentShard(t *testing.T) { Sequence: 456, }, }, - } + }) currentPrimaryFilePosition, _ := replication.ParseFilePosGTIDSet("mariadb-bin.000010:456") oldPrimary.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ GTIDSet: currentPrimaryFilePosition, @@ -80,7 +80,7 @@ func TestEmergencyReparentShard(t *testing.T) { // new primary newPrimary.FakeMysqlDaemon.ReadOnly = true newPrimary.FakeMysqlDaemon.Replicating = true - newPrimary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + newPrimary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -88,7 +88,7 @@ func TestEmergencyReparentShard(t *testing.T) { Sequence: 456, }, }, - } + }) newPrimaryRelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:456") newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ GTIDSet: newPrimaryRelayLogPos, @@ -123,7 +123,7 @@ func TestEmergencyReparentShard(t *testing.T) { // good replica 1 is replicating goodReplica1.FakeMysqlDaemon.ReadOnly = true goodReplica1.FakeMysqlDaemon.Replicating = true - goodReplica1.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + goodReplica1.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -131,7 +131,7 @@ func TestEmergencyReparentShard(t *testing.T) { Sequence: 455, }, }, - } + }) goodReplica1RelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:455") goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ GTIDSet: goodReplica1RelayLogPos, @@ -154,7 +154,7 @@ func TestEmergencyReparentShard(t *testing.T) { // good replica 2 is not replicating goodReplica2.FakeMysqlDaemon.ReadOnly = true goodReplica2.FakeMysqlDaemon.Replicating = false - goodReplica2.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + goodReplica2.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -162,7 +162,7 @@ func TestEmergencyReparentShard(t *testing.T) { Sequence: 454, }, }, - } + }) goodReplica2RelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:454") goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ GTIDSet: goodReplica2RelayLogPos, @@ -217,7 +217,7 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) { newPrimary.FakeMysqlDaemon.Replicating = true // It has transactions in its relay log, but not as many as // moreAdvancedReplica - newPrimary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + newPrimary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -225,7 +225,7 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) { Sequence: 456, }, }, - } + }) newPrimaryRelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:456") newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ GTIDSet: newPrimaryRelayLogPos, @@ -250,7 +250,7 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) { // more advanced replica moreAdvancedReplica.FakeMysqlDaemon.Replicating = true // relay log position is more advanced than desired new primary - moreAdvancedReplica.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + moreAdvancedReplica.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -258,14 +258,14 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) { Sequence: 457, }, }, - } + }) moreAdvancedReplicaLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:457") moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ GTIDSet: moreAdvancedReplicaLogPos, } moreAdvancedReplica.FakeMysqlDaemon.SetReplicationSourceInputs = append(moreAdvancedReplica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet), topoproto.MysqlAddr(oldPrimary.Tablet)) moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions = append(moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions, moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition) - newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, moreAdvancedReplica.FakeMysqlDaemon.CurrentPrimaryPosition) + newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, moreAdvancedReplica.FakeMysqlDaemon.GetPrimaryPositionLocked()) moreAdvancedReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup "STOP REPLICA", diff --git a/go/vt/wrangler/testlib/planned_reparent_shard_test.go b/go/vt/wrangler/testlib/planned_reparent_shard_test.go index 28ffd34b756..1894c6bb4eb 100644 --- a/go/vt/wrangler/testlib/planned_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/planned_reparent_shard_test.go @@ -96,7 +96,7 @@ func TestPlannedReparentShardNoPrimaryProvided(t *testing.T) { oldPrimary.FakeMysqlDaemon.ReadOnly = false oldPrimary.FakeMysqlDaemon.Replicating = false oldPrimary.FakeMysqlDaemon.ReplicationStatusError = mysql.ErrNotReplica - oldPrimary.FakeMysqlDaemon.CurrentPrimaryPosition = newPrimary.FakeMysqlDaemon.WaitPrimaryPositions[0] + oldPrimary.FakeMysqlDaemon.SetPrimaryPositionLocked(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions[0]) oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet)) oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "FAKE SET SOURCE", @@ -213,7 +213,7 @@ func TestPlannedReparentShardNoError(t *testing.T) { oldPrimary.FakeMysqlDaemon.ReadOnly = false oldPrimary.FakeMysqlDaemon.Replicating = false oldPrimary.FakeMysqlDaemon.ReplicationStatusError = mysql.ErrNotReplica - oldPrimary.FakeMysqlDaemon.CurrentPrimaryPosition = newPrimary.FakeMysqlDaemon.WaitPrimaryPositions[0] + oldPrimary.FakeMysqlDaemon.SetPrimaryPositionLocked(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions[0]) oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet)) oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "FAKE SET SOURCE", @@ -434,7 +434,7 @@ func TestPlannedReparentShardWaitForPositionFail(t *testing.T) { oldPrimary.FakeMysqlDaemon.ReadOnly = false oldPrimary.FakeMysqlDaemon.Replicating = false // set to incorrect value to make promote fail on WaitForReplicationPos - oldPrimary.FakeMysqlDaemon.CurrentPrimaryPosition = newPrimary.FakeMysqlDaemon.PromoteResult + oldPrimary.FakeMysqlDaemon.SetPrimaryPositionLocked(newPrimary.FakeMysqlDaemon.PromoteResult) oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet)) oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "FAKE SET SOURCE", @@ -542,7 +542,7 @@ func TestPlannedReparentShardWaitForPositionTimeout(t *testing.T) { // old primary oldPrimary.FakeMysqlDaemon.ReadOnly = false oldPrimary.FakeMysqlDaemon.Replicating = false - oldPrimary.FakeMysqlDaemon.CurrentPrimaryPosition = newPrimary.FakeMysqlDaemon.WaitPrimaryPositions[0] + oldPrimary.FakeMysqlDaemon.SetPrimaryPositionLocked(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions[0]) oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet)) oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "FAKE SET SOURCE", @@ -616,7 +616,7 @@ func TestPlannedReparentShardRelayLogError(t *testing.T) { primary.FakeMysqlDaemon.ReadOnly = false primary.FakeMysqlDaemon.Replicating = false primary.FakeMysqlDaemon.ReplicationStatusError = mysql.ErrNotReplica - primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 7: replication.MariadbGTID{ Domain: 7, @@ -624,7 +624,7 @@ func TestPlannedReparentShardRelayLogError(t *testing.T) { Sequence: 990, }, }, - } + }) primary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, primary_alias, replication_position) VALUES", } @@ -697,7 +697,7 @@ func TestPlannedReparentShardRelayLogErrorStartReplication(t *testing.T) { primary.FakeMysqlDaemon.ReadOnly = false primary.FakeMysqlDaemon.Replicating = false primary.FakeMysqlDaemon.ReplicationStatusError = mysql.ErrNotReplica - primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 7: replication.MariadbGTID{ Domain: 7, @@ -705,7 +705,7 @@ func TestPlannedReparentShardRelayLogErrorStartReplication(t *testing.T) { Sequence: 990, }, }, - } + }) primary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, primary_alias, replication_position) VALUES", } @@ -815,7 +815,7 @@ func TestPlannedReparentShardPromoteReplicaFail(t *testing.T) { oldPrimary.FakeMysqlDaemon.ReadOnly = false oldPrimary.FakeMysqlDaemon.Replicating = false oldPrimary.FakeMysqlDaemon.ReplicationStatusError = mysql.ErrNotReplica - oldPrimary.FakeMysqlDaemon.CurrentPrimaryPosition = newPrimary.FakeMysqlDaemon.WaitPrimaryPositions[0] + oldPrimary.FakeMysqlDaemon.SetPrimaryPositionLocked(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions[0]) oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet)) oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "FAKE SET SOURCE", @@ -823,7 +823,7 @@ func TestPlannedReparentShardPromoteReplicaFail(t *testing.T) { // We call a SetReplicationSource explicitly "FAKE SET SOURCE", "START REPLICA", - // extra SetReplicationSource call due to retry + // extra SetReplicationSource call due to retry) "FAKE SET SOURCE", "START REPLICA", } @@ -885,7 +885,7 @@ func TestPlannedReparentShardPromoteReplicaFail(t *testing.T) { // retrying should work newPrimary.FakeMysqlDaemon.PromoteError = nil - newPrimary.FakeMysqlDaemon.CurrentPrimaryPosition = newPrimary.FakeMysqlDaemon.WaitPrimaryPositions[0] + newPrimary.FakeMysqlDaemon.SetPrimaryPositionLocked(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions[0]) // run PlannedReparentShard err = vp.Run([]string{"PlannedReparentShard", "--wait_replicas_timeout", "10s", "--keyspace_shard", newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard, "--new_primary", topoproto.TabletAliasString(newPrimary.Tablet.Alias)}) @@ -922,7 +922,7 @@ func TestPlannedReparentShardSamePrimary(t *testing.T) { oldPrimary.FakeMysqlDaemon.ReadOnly = true oldPrimary.FakeMysqlDaemon.Replicating = false oldPrimary.FakeMysqlDaemon.ReplicationStatusError = mysql.ErrNotReplica - oldPrimary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + oldPrimary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 7: replication.MariadbGTID{ Domain: 7, @@ -930,7 +930,7 @@ func TestPlannedReparentShardSamePrimary(t *testing.T) { Sequence: 990, }, }, - } + }) oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, primary_alias, replication_position) VALUES", } diff --git a/go/vt/wrangler/testlib/reparent_utils_test.go b/go/vt/wrangler/testlib/reparent_utils_test.go index e0a2077c778..83450aeb7d0 100644 --- a/go/vt/wrangler/testlib/reparent_utils_test.go +++ b/go/vt/wrangler/testlib/reparent_utils_test.go @@ -67,7 +67,7 @@ func TestShardReplicationStatuses(t *testing.T) { } // primary action loop (to initialize host and port) - primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 5: replication.MariadbGTID{ Domain: 5, @@ -75,12 +75,12 @@ func TestShardReplicationStatuses(t *testing.T) { Sequence: 892, }, }, - } + }) primary.StartActionLoop(t, wr) defer primary.StopActionLoop(t) // replica loop - replica.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + replica.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 5: replication.MariadbGTID{ Domain: 5, @@ -88,7 +88,7 @@ func TestShardReplicationStatuses(t *testing.T) { Sequence: 890, }, }, - } + }) replica.FakeMysqlDaemon.CurrentSourceHost = primary.Tablet.MysqlHostname replica.FakeMysqlDaemon.CurrentSourcePort = primary.Tablet.MysqlPort replica.FakeMysqlDaemon.SetReplicationSourceInputs = append(replica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index 4e58024785d..a99e6ba2c43 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -776,7 +776,7 @@ func (tme *testMigraterEnv) createDBClients(ctx context.Context, t *testing.T) { func (tme *testMigraterEnv) setPrimaryPositions() { for _, primary := range tme.sourcePrimaries { - primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 5: replication.MariadbGTID{ Domain: 5, @@ -784,10 +784,10 @@ func (tme *testMigraterEnv) setPrimaryPositions() { Sequence: 892, }, }, - } + }) } for _, primary := range tme.targetPrimaries { - primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 5: replication.MariadbGTID{ Domain: 5, @@ -795,7 +795,7 @@ func (tme *testMigraterEnv) setPrimaryPositions() { Sequence: 893, }, }, - } + }) } }