diff --git a/go/mysql/server_test.go b/go/mysql/server_test.go index 082a176e3af..72b6f25d0c8 100644 --- a/go/mysql/server_test.go +++ b/go/mysql/server_test.go @@ -1424,7 +1424,7 @@ func TestListenerShutdown(t *testing.T) { l.Shutdown() - assert.EqualValues(t, 1, connRefuse.Get(), "connRefuse") + waitForConnRefuse(t, 1) err = conn.Ping() require.EqualError(t, err, "Server shutdown in progress (errno 1053) (sqlstate 08S01)") @@ -1436,6 +1436,24 @@ func TestListenerShutdown(t *testing.T) { require.Equal(t, "Server shutdown in progress", sqlErr.Message) } +func waitForConnRefuse(t *testing.T, valWanted int64) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + tick := time.NewTicker(100 * time.Millisecond) + defer tick.Stop() + + for { + select { + case <-ctx.Done(): + require.FailNow(t, "connRefuse did not reach %v", valWanted) + case <-tick.C: + if connRefuse.Get() == valWanted { + return + } + } + } +} + func TestParseConnAttrs(t *testing.T) { expected := map[string]string{ "_client_version": "8.0.11", diff --git a/go/vt/mysqlctl/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go index 1bab1639acb..bc14823ddcd 100644 --- a/go/vt/mysqlctl/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon.go @@ -299,13 +299,6 @@ func (fmd *FakeMysqlDaemon) GetServerUUID(ctx context.Context) (string, error) { return "000000", nil } -// CurrentPrimaryPositionLocked is thread-safe. -func (fmd *FakeMysqlDaemon) CurrentPrimaryPositionLocked(pos replication.Position) { - fmd.mu.Lock() - defer fmd.mu.Unlock() - fmd.CurrentPrimaryPosition = pos -} - // ReplicationStatus is part of the MysqlDaemon interface. func (fmd *FakeMysqlDaemon) ReplicationStatus(ctx context.Context) (replication.ReplicationStatus, error) { if fmd.ReplicationStatusError != nil { @@ -330,6 +323,8 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus(ctx context.Context) (replication. // PrimaryStatus is part of the MysqlDaemon interface. func (fmd *FakeMysqlDaemon) PrimaryStatus(ctx context.Context) (replication.PrimaryStatus, error) { + fmd.mu.Lock() + defer fmd.mu.Unlock() if fmd.PrimaryStatusError != nil { return replication.PrimaryStatus{}, fmd.PrimaryStatusError } @@ -399,7 +394,21 @@ func (fmd *FakeMysqlDaemon) GetPreviousGTIDs(ctx context.Context, binlog string) // PrimaryPosition is part of the MysqlDaemon interface. func (fmd *FakeMysqlDaemon) PrimaryPosition(ctx context.Context) (replication.Position, error) { - return fmd.CurrentPrimaryPosition, nil + return fmd.GetPrimaryPositionLocked(), nil +} + +// GetPrimaryPositionLocked gets the primary position while holding the lock. +func (fmd *FakeMysqlDaemon) GetPrimaryPositionLocked() replication.Position { + fmd.mu.Lock() + defer fmd.mu.Unlock() + return fmd.CurrentPrimaryPosition +} + +// SetPrimaryPositionLocked is thread-safe. +func (fmd *FakeMysqlDaemon) SetPrimaryPositionLocked(pos replication.Position) { + fmd.mu.Lock() + defer fmd.mu.Unlock() + fmd.CurrentPrimaryPosition = pos } // IsReadOnly is part of the MysqlDaemon interface. diff --git a/go/vt/vtctl/grpcvtctldserver/server_test.go b/go/vt/vtctl/grpcvtctldserver/server_test.go index 83dc36b0e9b..a26e2480ba7 100644 --- a/go/vt/vtctl/grpcvtctldserver/server_test.go +++ b/go/vt/vtctl/grpcvtctldserver/server_test.go @@ -13193,7 +13193,7 @@ func TestTabletExternallyReparented(t *testing.T) { defer func() { topofactory.SetError(nil) - ctx, cancel := context.WithTimeout(ctx, time.Millisecond*10) + ctx, cancel := context.WithTimeout(ctx, time.Second*10) defer cancel() resp, err := vtctld.GetTablets(ctx, &vtctldatapb.GetTabletsRequest{}) diff --git a/go/vt/vttablet/tabletmanager/framework_test.go b/go/vt/vttablet/tabletmanager/framework_test.go index 6b61303874f..a6c15d604b9 100644 --- a/go/vt/vttablet/tabletmanager/framework_test.go +++ b/go/vt/vttablet/tabletmanager/framework_test.go @@ -116,8 +116,8 @@ func newTestEnv(t *testing.T, ctx context.Context, sourceKeyspace string, source tmclienttest.SetProtocol(fmt.Sprintf("go.vt.vttablet.tabletmanager.framework_test_%s", t.Name()), tenv.protoName) tenv.mysqld = mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) - var err error - tenv.mysqld.CurrentPrimaryPosition, err = replication.ParsePosition(gtidFlavor, gtidPosition) + curPosition, err := replication.ParsePosition(gtidFlavor, gtidPosition) + tenv.mysqld.SetPrimaryPositionLocked(curPosition) require.NoError(t, err) return tenv diff --git a/go/vt/wrangler/testlib/backup_test.go b/go/vt/wrangler/testlib/backup_test.go index f1977df3f16..cb61c4bab99 100644 --- a/go/vt/wrangler/testlib/backup_test.go +++ b/go/vt/wrangler/testlib/backup_test.go @@ -149,7 +149,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { primary := NewFakeTablet(t, wr, "cell1", 0, topodatapb.TabletType_PRIMARY, db) primary.FakeMysqlDaemon.ReadOnly = false primary.FakeMysqlDaemon.Replicating = false - primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -157,7 +157,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { Sequence: 457, }, }, - } + }) // start primary so that replica can fetch primary position from it primary.StartActionLoop(t, wr) @@ -169,7 +169,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { sourceTablet.FakeMysqlDaemon.ReadOnly = true sourceTablet.FakeMysqlDaemon.Replicating = true sourceTablet.FakeMysqlDaemon.SetReplicationSourceInputs = []string{fmt.Sprintf("%s:%d", primary.Tablet.MysqlHostname, primary.Tablet.MysqlPort)} - sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + sourceTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -177,7 +177,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { Sequence: 457, }, }, - } + }) sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup "STOP REPLICA", @@ -220,7 +220,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { destTablet := NewFakeTablet(t, wr, "cell1", 2, topodatapb.TabletType_REPLICA, db) destTablet.FakeMysqlDaemon.ReadOnly = true destTablet.FakeMysqlDaemon.Replicating = true - destTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + destTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -228,7 +228,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { Sequence: 457, }, }, - } + }) destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup "STOP REPLICA", @@ -247,7 +247,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ "SHOW DATABASES": {}, } - destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition + destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.GetPrimaryPositionLocked() destTablet.FakeMysqlDaemon.SetReplicationSourceInputs = append(destTablet.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) destTablet.StartActionLoop(t, wr) @@ -300,7 +300,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { "START REPLICA", } - primary.FakeMysqlDaemon.SetReplicationPositionPos = primary.FakeMysqlDaemon.CurrentPrimaryPosition + primary.FakeMysqlDaemon.SetReplicationPositionPos = primary.FakeMysqlDaemon.GetPrimaryPositionLocked() // restore primary from latest backup require.NoError(t, primary.TM.RestoreData(ctx, logutil.NewConsoleLogger(), 0 /* waitForBackupInterval */, false /* deleteBeforeRestore */, time.Time{} /* restoreFromBackupTs */, time.Time{} /* restoreToTimestamp */, "" /* restoreToPos */, []string{} /* ignoreBackupEngines */, mysqlShutdownTimeout), @@ -387,7 +387,7 @@ func TestBackupRestoreLagged(t *testing.T) { primary := NewFakeTablet(t, wr, "cell1", 0, topodatapb.TabletType_PRIMARY, db) primary.FakeMysqlDaemon.ReadOnly = false primary.FakeMysqlDaemon.Replicating = false - primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -395,7 +395,7 @@ func TestBackupRestoreLagged(t *testing.T) { Sequence: 457, }, }, - } + }) // start primary so that replica can fetch primary position from it primary.StartActionLoop(t, wr) @@ -406,7 +406,7 @@ func TestBackupRestoreLagged(t *testing.T) { sourceTablet := NewFakeTablet(t, wr, "cell1", 1, topodatapb.TabletType_REPLICA, db) sourceTablet.FakeMysqlDaemon.ReadOnly = true sourceTablet.FakeMysqlDaemon.Replicating = true - sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + sourceTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -414,7 +414,7 @@ func TestBackupRestoreLagged(t *testing.T) { Sequence: 456, }, }, - } + }) sourceTablet.FakeMysqlDaemon.SetReplicationSourceInputs = []string{fmt.Sprintf("%s:%d", primary.Tablet.MysqlHostname, primary.Tablet.MysqlPort)} sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup @@ -448,7 +448,7 @@ func TestBackupRestoreLagged(t *testing.T) { timer := time.NewTicker(1 * time.Second) <-timer.C - sourceTablet.FakeMysqlDaemon.CurrentPrimaryPositionLocked(replication.Position{ + sourceTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -467,7 +467,7 @@ func TestBackupRestoreLagged(t *testing.T) { require.NoError(t, sourceTablet.FakeMysqlDaemon.CheckSuperQueryList()) assert.True(t, sourceTablet.FakeMysqlDaemon.Replicating) assert.True(t, sourceTablet.FakeMysqlDaemon.Running) - assert.Equal(t, primary.FakeMysqlDaemon.CurrentPrimaryPosition, sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition) + assert.Equal(t, primary.FakeMysqlDaemon.GetPrimaryPositionLocked(), sourceTablet.FakeMysqlDaemon.GetPrimaryPositionLocked()) case <-timer2.C: require.FailNow(t, "Backup timed out") } @@ -476,7 +476,7 @@ func TestBackupRestoreLagged(t *testing.T) { destTablet := NewFakeTablet(t, wr, "cell1", 2, topodatapb.TabletType_REPLICA, db) destTablet.FakeMysqlDaemon.ReadOnly = true destTablet.FakeMysqlDaemon.Replicating = true - destTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + destTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -484,7 +484,7 @@ func TestBackupRestoreLagged(t *testing.T) { Sequence: 456, }, }, - } + }) destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup "STOP REPLICA", @@ -503,7 +503,7 @@ func TestBackupRestoreLagged(t *testing.T) { destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ "SHOW DATABASES": {}, } - destTablet.FakeMysqlDaemon.SetReplicationPositionPos = destTablet.FakeMysqlDaemon.CurrentPrimaryPosition + destTablet.FakeMysqlDaemon.SetReplicationPositionPos = destTablet.FakeMysqlDaemon.GetPrimaryPositionLocked() destTablet.FakeMysqlDaemon.SetReplicationSourceInputs = append(destTablet.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) destTablet.StartActionLoop(t, wr) @@ -526,7 +526,7 @@ func TestBackupRestoreLagged(t *testing.T) { timer = time.NewTicker(1 * time.Second) <-timer.C - destTablet.FakeMysqlDaemon.CurrentPrimaryPositionLocked(replication.Position{ + destTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -544,7 +544,7 @@ func TestBackupRestoreLagged(t *testing.T) { require.NoError(t, destTablet.FakeMysqlDaemon.CheckSuperQueryList(), "destTablet.FakeMysqlDaemon.CheckSuperQueryList failed") assert.True(t, destTablet.FakeMysqlDaemon.Replicating) assert.True(t, destTablet.FakeMysqlDaemon.Running) - assert.Equal(t, primary.FakeMysqlDaemon.CurrentPrimaryPosition, destTablet.FakeMysqlDaemon.CurrentPrimaryPosition) + assert.Equal(t, primary.FakeMysqlDaemon.GetPrimaryPositionLocked(), destTablet.FakeMysqlDaemon.GetPrimaryPositionLocked()) case <-timer2.C: require.FailNow(t, "Restore timed out") } @@ -607,7 +607,7 @@ func TestRestoreUnreachablePrimary(t *testing.T) { primary := NewFakeTablet(t, wr, "cell1", 0, topodatapb.TabletType_PRIMARY, db) primary.FakeMysqlDaemon.ReadOnly = false primary.FakeMysqlDaemon.Replicating = false - primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -615,7 +615,7 @@ func TestRestoreUnreachablePrimary(t *testing.T) { Sequence: 457, }, }, - } + }) // start primary so that replica can fetch primary position from it primary.StartActionLoop(t, wr) @@ -625,7 +625,7 @@ func TestRestoreUnreachablePrimary(t *testing.T) { sourceTablet := NewFakeTablet(t, wr, "cell1", 1, topodatapb.TabletType_REPLICA, db) sourceTablet.FakeMysqlDaemon.ReadOnly = true sourceTablet.FakeMysqlDaemon.Replicating = true - sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + sourceTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -633,7 +633,7 @@ func TestRestoreUnreachablePrimary(t *testing.T) { Sequence: 457, }, }, - } + }) sourceTablet.FakeMysqlDaemon.SetReplicationSourceInputs = []string{fmt.Sprintf("%s:%d", primary.Tablet.MysqlHostname, primary.Tablet.MysqlPort)} sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup @@ -667,7 +667,7 @@ func TestRestoreUnreachablePrimary(t *testing.T) { destTablet := NewFakeTablet(t, wr, "cell1", 2, topodatapb.TabletType_REPLICA, db) destTablet.FakeMysqlDaemon.ReadOnly = true destTablet.FakeMysqlDaemon.Replicating = true - destTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + destTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -675,7 +675,7 @@ func TestRestoreUnreachablePrimary(t *testing.T) { Sequence: 457, }, }, - } + }) destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup "STOP REPLICA", @@ -691,7 +691,7 @@ func TestRestoreUnreachablePrimary(t *testing.T) { destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ "SHOW DATABASES": {}, } - destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition + destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.GetPrimaryPositionLocked() destTablet.FakeMysqlDaemon.SetReplicationSourceInputs = append(destTablet.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) destTablet.StartActionLoop(t, wr) @@ -783,7 +783,7 @@ func TestDisableActiveReparents(t *testing.T) { primary := NewFakeTablet(t, wr, "cell1", 0, topodatapb.TabletType_PRIMARY, db) primary.FakeMysqlDaemon.ReadOnly = false primary.FakeMysqlDaemon.Replicating = false - primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -791,7 +791,7 @@ func TestDisableActiveReparents(t *testing.T) { Sequence: 457, }, }, - } + }) // start primary so that replica can fetch primary position from it primary.StartActionLoop(t, wr) @@ -802,7 +802,7 @@ func TestDisableActiveReparents(t *testing.T) { sourceTablet := NewFakeTablet(t, wr, "cell1", 1, topodatapb.TabletType_REPLICA, db) sourceTablet.FakeMysqlDaemon.ReadOnly = true sourceTablet.FakeMysqlDaemon.Replicating = true - sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + sourceTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -810,7 +810,7 @@ func TestDisableActiveReparents(t *testing.T) { Sequence: 457, }, }, - } + }) sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "STOP REPLICA", } @@ -835,7 +835,7 @@ func TestDisableActiveReparents(t *testing.T) { destTablet := NewFakeTablet(t, wr, "cell1", 2, topodatapb.TabletType_REPLICA, db) destTablet.FakeMysqlDaemon.ReadOnly = true destTablet.FakeMysqlDaemon.Replicating = true - destTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + destTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -843,7 +843,7 @@ func TestDisableActiveReparents(t *testing.T) { Sequence: 457, }, }, - } + }) destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "FAKE RESET BINARY LOGS AND GTIDS", "FAKE SET GLOBAL gtid_purged", @@ -855,7 +855,7 @@ func TestDisableActiveReparents(t *testing.T) { destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ "SHOW DATABASES": {}, } - destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition + destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.GetPrimaryPositionLocked() destTablet.FakeMysqlDaemon.SetReplicationSourceInputs = append(destTablet.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) destTablet.StartActionLoop(t, wr) diff --git a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go index 96f9df74405..3167be5e512 100644 --- a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go @@ -63,7 +63,7 @@ func TestEmergencyReparentShard(t *testing.T) { reparenttestutil.SetKeyspaceDurability(context.Background(), t, ts, "test_keyspace", "semi_sync") oldPrimary.FakeMysqlDaemon.Replicating = false - oldPrimary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + oldPrimary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -71,7 +71,7 @@ func TestEmergencyReparentShard(t *testing.T) { Sequence: 456, }, }, - } + }) currentPrimaryFilePosition, _ := replication.ParseFilePosGTIDSet("mariadb-bin.000010:456") oldPrimary.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ GTIDSet: currentPrimaryFilePosition, @@ -80,7 +80,7 @@ func TestEmergencyReparentShard(t *testing.T) { // new primary newPrimary.FakeMysqlDaemon.ReadOnly = true newPrimary.FakeMysqlDaemon.Replicating = true - newPrimary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + newPrimary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -88,7 +88,7 @@ func TestEmergencyReparentShard(t *testing.T) { Sequence: 456, }, }, - } + }) newPrimaryRelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:456") newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ GTIDSet: newPrimaryRelayLogPos, @@ -123,7 +123,7 @@ func TestEmergencyReparentShard(t *testing.T) { // good replica 1 is replicating goodReplica1.FakeMysqlDaemon.ReadOnly = true goodReplica1.FakeMysqlDaemon.Replicating = true - goodReplica1.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + goodReplica1.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -131,7 +131,7 @@ func TestEmergencyReparentShard(t *testing.T) { Sequence: 455, }, }, - } + }) goodReplica1RelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:455") goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ GTIDSet: goodReplica1RelayLogPos, @@ -154,7 +154,7 @@ func TestEmergencyReparentShard(t *testing.T) { // good replica 2 is not replicating goodReplica2.FakeMysqlDaemon.ReadOnly = true goodReplica2.FakeMysqlDaemon.Replicating = false - goodReplica2.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + goodReplica2.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -162,7 +162,7 @@ func TestEmergencyReparentShard(t *testing.T) { Sequence: 454, }, }, - } + }) goodReplica2RelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:454") goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ GTIDSet: goodReplica2RelayLogPos, @@ -217,7 +217,7 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) { newPrimary.FakeMysqlDaemon.Replicating = true // It has transactions in its relay log, but not as many as // moreAdvancedReplica - newPrimary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + newPrimary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -225,7 +225,7 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) { Sequence: 456, }, }, - } + }) newPrimaryRelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:456") newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ GTIDSet: newPrimaryRelayLogPos, @@ -250,7 +250,7 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) { // more advanced replica moreAdvancedReplica.FakeMysqlDaemon.Replicating = true // relay log position is more advanced than desired new primary - moreAdvancedReplica.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + moreAdvancedReplica.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 2: replication.MariadbGTID{ Domain: 2, @@ -258,14 +258,14 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) { Sequence: 457, }, }, - } + }) moreAdvancedReplicaLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:457") moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ GTIDSet: moreAdvancedReplicaLogPos, } moreAdvancedReplica.FakeMysqlDaemon.SetReplicationSourceInputs = append(moreAdvancedReplica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet), topoproto.MysqlAddr(oldPrimary.Tablet)) moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions = append(moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions, moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition) - newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, moreAdvancedReplica.FakeMysqlDaemon.CurrentPrimaryPosition) + newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, moreAdvancedReplica.FakeMysqlDaemon.GetPrimaryPositionLocked()) moreAdvancedReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup "STOP REPLICA", diff --git a/go/vt/wrangler/testlib/planned_reparent_shard_test.go b/go/vt/wrangler/testlib/planned_reparent_shard_test.go index 28ffd34b756..1894c6bb4eb 100644 --- a/go/vt/wrangler/testlib/planned_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/planned_reparent_shard_test.go @@ -96,7 +96,7 @@ func TestPlannedReparentShardNoPrimaryProvided(t *testing.T) { oldPrimary.FakeMysqlDaemon.ReadOnly = false oldPrimary.FakeMysqlDaemon.Replicating = false oldPrimary.FakeMysqlDaemon.ReplicationStatusError = mysql.ErrNotReplica - oldPrimary.FakeMysqlDaemon.CurrentPrimaryPosition = newPrimary.FakeMysqlDaemon.WaitPrimaryPositions[0] + oldPrimary.FakeMysqlDaemon.SetPrimaryPositionLocked(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions[0]) oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet)) oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "FAKE SET SOURCE", @@ -213,7 +213,7 @@ func TestPlannedReparentShardNoError(t *testing.T) { oldPrimary.FakeMysqlDaemon.ReadOnly = false oldPrimary.FakeMysqlDaemon.Replicating = false oldPrimary.FakeMysqlDaemon.ReplicationStatusError = mysql.ErrNotReplica - oldPrimary.FakeMysqlDaemon.CurrentPrimaryPosition = newPrimary.FakeMysqlDaemon.WaitPrimaryPositions[0] + oldPrimary.FakeMysqlDaemon.SetPrimaryPositionLocked(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions[0]) oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet)) oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "FAKE SET SOURCE", @@ -434,7 +434,7 @@ func TestPlannedReparentShardWaitForPositionFail(t *testing.T) { oldPrimary.FakeMysqlDaemon.ReadOnly = false oldPrimary.FakeMysqlDaemon.Replicating = false // set to incorrect value to make promote fail on WaitForReplicationPos - oldPrimary.FakeMysqlDaemon.CurrentPrimaryPosition = newPrimary.FakeMysqlDaemon.PromoteResult + oldPrimary.FakeMysqlDaemon.SetPrimaryPositionLocked(newPrimary.FakeMysqlDaemon.PromoteResult) oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet)) oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "FAKE SET SOURCE", @@ -542,7 +542,7 @@ func TestPlannedReparentShardWaitForPositionTimeout(t *testing.T) { // old primary oldPrimary.FakeMysqlDaemon.ReadOnly = false oldPrimary.FakeMysqlDaemon.Replicating = false - oldPrimary.FakeMysqlDaemon.CurrentPrimaryPosition = newPrimary.FakeMysqlDaemon.WaitPrimaryPositions[0] + oldPrimary.FakeMysqlDaemon.SetPrimaryPositionLocked(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions[0]) oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet)) oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "FAKE SET SOURCE", @@ -616,7 +616,7 @@ func TestPlannedReparentShardRelayLogError(t *testing.T) { primary.FakeMysqlDaemon.ReadOnly = false primary.FakeMysqlDaemon.Replicating = false primary.FakeMysqlDaemon.ReplicationStatusError = mysql.ErrNotReplica - primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 7: replication.MariadbGTID{ Domain: 7, @@ -624,7 +624,7 @@ func TestPlannedReparentShardRelayLogError(t *testing.T) { Sequence: 990, }, }, - } + }) primary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, primary_alias, replication_position) VALUES", } @@ -697,7 +697,7 @@ func TestPlannedReparentShardRelayLogErrorStartReplication(t *testing.T) { primary.FakeMysqlDaemon.ReadOnly = false primary.FakeMysqlDaemon.Replicating = false primary.FakeMysqlDaemon.ReplicationStatusError = mysql.ErrNotReplica - primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 7: replication.MariadbGTID{ Domain: 7, @@ -705,7 +705,7 @@ func TestPlannedReparentShardRelayLogErrorStartReplication(t *testing.T) { Sequence: 990, }, }, - } + }) primary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, primary_alias, replication_position) VALUES", } @@ -815,7 +815,7 @@ func TestPlannedReparentShardPromoteReplicaFail(t *testing.T) { oldPrimary.FakeMysqlDaemon.ReadOnly = false oldPrimary.FakeMysqlDaemon.Replicating = false oldPrimary.FakeMysqlDaemon.ReplicationStatusError = mysql.ErrNotReplica - oldPrimary.FakeMysqlDaemon.CurrentPrimaryPosition = newPrimary.FakeMysqlDaemon.WaitPrimaryPositions[0] + oldPrimary.FakeMysqlDaemon.SetPrimaryPositionLocked(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions[0]) oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet)) oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "FAKE SET SOURCE", @@ -823,7 +823,7 @@ func TestPlannedReparentShardPromoteReplicaFail(t *testing.T) { // We call a SetReplicationSource explicitly "FAKE SET SOURCE", "START REPLICA", - // extra SetReplicationSource call due to retry + // extra SetReplicationSource call due to retry) "FAKE SET SOURCE", "START REPLICA", } @@ -885,7 +885,7 @@ func TestPlannedReparentShardPromoteReplicaFail(t *testing.T) { // retrying should work newPrimary.FakeMysqlDaemon.PromoteError = nil - newPrimary.FakeMysqlDaemon.CurrentPrimaryPosition = newPrimary.FakeMysqlDaemon.WaitPrimaryPositions[0] + newPrimary.FakeMysqlDaemon.SetPrimaryPositionLocked(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions[0]) // run PlannedReparentShard err = vp.Run([]string{"PlannedReparentShard", "--wait_replicas_timeout", "10s", "--keyspace_shard", newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard, "--new_primary", topoproto.TabletAliasString(newPrimary.Tablet.Alias)}) @@ -922,7 +922,7 @@ func TestPlannedReparentShardSamePrimary(t *testing.T) { oldPrimary.FakeMysqlDaemon.ReadOnly = true oldPrimary.FakeMysqlDaemon.Replicating = false oldPrimary.FakeMysqlDaemon.ReplicationStatusError = mysql.ErrNotReplica - oldPrimary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + oldPrimary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 7: replication.MariadbGTID{ Domain: 7, @@ -930,7 +930,7 @@ func TestPlannedReparentShardSamePrimary(t *testing.T) { Sequence: 990, }, }, - } + }) oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, primary_alias, replication_position) VALUES", } diff --git a/go/vt/wrangler/testlib/reparent_utils_test.go b/go/vt/wrangler/testlib/reparent_utils_test.go index ea2e34b66bd..6f66968ba80 100644 --- a/go/vt/wrangler/testlib/reparent_utils_test.go +++ b/go/vt/wrangler/testlib/reparent_utils_test.go @@ -67,7 +67,7 @@ func TestShardReplicationStatuses(t *testing.T) { } // primary action loop (to initialize host and port) - primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 5: replication.MariadbGTID{ Domain: 5, @@ -75,12 +75,12 @@ func TestShardReplicationStatuses(t *testing.T) { Sequence: 892, }, }, - } + }) primary.StartActionLoop(t, wr) defer primary.StopActionLoop(t) // replica loop - replica.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + replica.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 5: replication.MariadbGTID{ Domain: 5, @@ -88,7 +88,7 @@ func TestShardReplicationStatuses(t *testing.T) { Sequence: 890, }, }, - } + }) replica.FakeMysqlDaemon.CurrentSourceHost = primary.Tablet.MysqlHostname replica.FakeMysqlDaemon.CurrentSourcePort = primary.Tablet.MysqlPort replica.FakeMysqlDaemon.SetReplicationSourceInputs = append(replica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) @@ -207,7 +207,7 @@ func TestSetReplicationSource(t *testing.T) { require.NoError(t, err, "UpdateShardFields failed") pos, err := replication.DecodePosition("MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8") require.NoError(t, err) - primary.FakeMysqlDaemon.CurrentPrimaryPositionLocked(pos) + primary.FakeMysqlDaemon.SetPrimaryPositionLocked(pos) // primary action loop (to initialize host and port) primary.StartActionLoop(t, wr) diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index 4e58024785d..a99e6ba2c43 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -776,7 +776,7 @@ func (tme *testMigraterEnv) createDBClients(ctx context.Context, t *testing.T) { func (tme *testMigraterEnv) setPrimaryPositions() { for _, primary := range tme.sourcePrimaries { - primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 5: replication.MariadbGTID{ Domain: 5, @@ -784,10 +784,10 @@ func (tme *testMigraterEnv) setPrimaryPositions() { Sequence: 892, }, }, - } + }) } for _, primary := range tme.targetPrimaries { - primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{ + primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{ GTIDSet: replication.MariadbGTIDSet{ 5: replication.MariadbGTID{ Domain: 5, @@ -795,7 +795,7 @@ func (tme *testMigraterEnv) setPrimaryPositions() { Sequence: 893, }, }, - } + }) } }