Skip to content

Commit

Permalink
Flaky test fixes (#16940)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Oct 15, 2024
1 parent 45a8067 commit 19f9c95
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 80 deletions.
20 changes: 19 additions & 1 deletion go/mysql/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1424,7 +1424,7 @@ func TestListenerShutdown(t *testing.T) {

l.Shutdown()

assert.EqualValues(t, 1, connRefuse.Get(), "connRefuse")
waitForConnRefuse(t, 1)

err = conn.Ping()
require.EqualError(t, err, "Server shutdown in progress (errno 1053) (sqlstate 08S01)")
Expand All @@ -1436,6 +1436,24 @@ func TestListenerShutdown(t *testing.T) {
require.Equal(t, "Server shutdown in progress", sqlErr.Message)
}

func waitForConnRefuse(t *testing.T, valWanted int64) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
tick := time.NewTicker(100 * time.Millisecond)
defer tick.Stop()

for {
select {
case <-ctx.Done():
require.FailNow(t, "connRefuse did not reach %v", valWanted)
case <-tick.C:
if connRefuse.Get() == valWanted {
return
}
}
}
}

func TestParseConnAttrs(t *testing.T) {
expected := map[string]string{
"_client_version": "8.0.11",
Expand Down
25 changes: 17 additions & 8 deletions go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,13 +299,6 @@ func (fmd *FakeMysqlDaemon) GetServerUUID(ctx context.Context) (string, error) {
return "000000", nil
}

// CurrentPrimaryPositionLocked is thread-safe.
func (fmd *FakeMysqlDaemon) CurrentPrimaryPositionLocked(pos replication.Position) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
fmd.CurrentPrimaryPosition = pos
}

// ReplicationStatus is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) ReplicationStatus(ctx context.Context) (replication.ReplicationStatus, error) {
if fmd.ReplicationStatusError != nil {
Expand All @@ -330,6 +323,8 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus(ctx context.Context) (replication.

// PrimaryStatus is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) PrimaryStatus(ctx context.Context) (replication.PrimaryStatus, error) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
if fmd.PrimaryStatusError != nil {
return replication.PrimaryStatus{}, fmd.PrimaryStatusError
}
Expand Down Expand Up @@ -399,7 +394,21 @@ func (fmd *FakeMysqlDaemon) GetPreviousGTIDs(ctx context.Context, binlog string)

// PrimaryPosition is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) PrimaryPosition(ctx context.Context) (replication.Position, error) {
return fmd.CurrentPrimaryPosition, nil
return fmd.GetPrimaryPositionLocked(), nil
}

// GetPrimaryPositionLocked gets the primary position while holding the lock.
func (fmd *FakeMysqlDaemon) GetPrimaryPositionLocked() replication.Position {
fmd.mu.Lock()
defer fmd.mu.Unlock()
return fmd.CurrentPrimaryPosition
}

// SetPrimaryPositionLocked is thread-safe.
func (fmd *FakeMysqlDaemon) SetPrimaryPositionLocked(pos replication.Position) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
fmd.CurrentPrimaryPosition = pos
}

// IsReadOnly is part of the MysqlDaemon interface.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/grpcvtctldserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13193,7 +13193,7 @@ func TestTabletExternallyReparented(t *testing.T) {
defer func() {
topofactory.SetError(nil)

ctx, cancel := context.WithTimeout(ctx, time.Millisecond*10)
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

resp, err := vtctld.GetTablets(ctx, &vtctldatapb.GetTabletsRequest{})
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ func newTestEnv(t *testing.T, ctx context.Context, sourceKeyspace string, source
tmclienttest.SetProtocol(fmt.Sprintf("go.vt.vttablet.tabletmanager.framework_test_%s", t.Name()), tenv.protoName)

tenv.mysqld = mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t))
var err error
tenv.mysqld.CurrentPrimaryPosition, err = replication.ParsePosition(gtidFlavor, gtidPosition)
curPosition, err := replication.ParsePosition(gtidFlavor, gtidPosition)
tenv.mysqld.SetPrimaryPositionLocked(curPosition)
require.NoError(t, err)

return tenv
Expand Down
66 changes: 33 additions & 33 deletions go/vt/wrangler/testlib/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,15 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error {
primary := NewFakeTablet(t, wr, "cell1", 0, topodatapb.TabletType_PRIMARY, db)
primary.FakeMysqlDaemon.ReadOnly = false
primary.FakeMysqlDaemon.Replicating = false
primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{
primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{
GTIDSet: replication.MariadbGTIDSet{
2: replication.MariadbGTID{
Domain: 2,
Server: 123,
Sequence: 457,
},
},
}
})

// start primary so that replica can fetch primary position from it
primary.StartActionLoop(t, wr)
Expand All @@ -169,15 +169,15 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error {
sourceTablet.FakeMysqlDaemon.ReadOnly = true
sourceTablet.FakeMysqlDaemon.Replicating = true
sourceTablet.FakeMysqlDaemon.SetReplicationSourceInputs = []string{fmt.Sprintf("%s:%d", primary.Tablet.MysqlHostname, primary.Tablet.MysqlPort)}
sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{
sourceTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{
GTIDSet: replication.MariadbGTIDSet{
2: replication.MariadbGTID{
Domain: 2,
Server: 123,
Sequence: 457,
},
},
}
})
sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 3 statements come from tablet startup
"STOP REPLICA",
Expand Down Expand Up @@ -220,15 +220,15 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error {
destTablet := NewFakeTablet(t, wr, "cell1", 2, topodatapb.TabletType_REPLICA, db)
destTablet.FakeMysqlDaemon.ReadOnly = true
destTablet.FakeMysqlDaemon.Replicating = true
destTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{
destTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{
GTIDSet: replication.MariadbGTIDSet{
2: replication.MariadbGTID{
Domain: 2,
Server: 123,
Sequence: 457,
},
},
}
})
destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 3 statements come from tablet startup
"STOP REPLICA",
Expand All @@ -247,7 +247,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error {
destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{
"SHOW DATABASES": {},
}
destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition
destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.GetPrimaryPositionLocked()
destTablet.FakeMysqlDaemon.SetReplicationSourceInputs = append(destTablet.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet))

destTablet.StartActionLoop(t, wr)
Expand Down Expand Up @@ -300,7 +300,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error {
"START REPLICA",
}

primary.FakeMysqlDaemon.SetReplicationPositionPos = primary.FakeMysqlDaemon.CurrentPrimaryPosition
primary.FakeMysqlDaemon.SetReplicationPositionPos = primary.FakeMysqlDaemon.GetPrimaryPositionLocked()

// restore primary from latest backup
require.NoError(t, primary.TM.RestoreData(ctx, logutil.NewConsoleLogger(), 0 /* waitForBackupInterval */, false /* deleteBeforeRestore */, time.Time{} /* restoreFromBackupTs */, time.Time{} /* restoreToTimestamp */, "" /* restoreToPos */, []string{} /* ignoreBackupEngines */, mysqlShutdownTimeout),
Expand Down Expand Up @@ -387,15 +387,15 @@ func TestBackupRestoreLagged(t *testing.T) {
primary := NewFakeTablet(t, wr, "cell1", 0, topodatapb.TabletType_PRIMARY, db)
primary.FakeMysqlDaemon.ReadOnly = false
primary.FakeMysqlDaemon.Replicating = false
primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{
primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{
GTIDSet: replication.MariadbGTIDSet{
2: replication.MariadbGTID{
Domain: 2,
Server: 123,
Sequence: 457,
},
},
}
})

// start primary so that replica can fetch primary position from it
primary.StartActionLoop(t, wr)
Expand All @@ -406,15 +406,15 @@ func TestBackupRestoreLagged(t *testing.T) {
sourceTablet := NewFakeTablet(t, wr, "cell1", 1, topodatapb.TabletType_REPLICA, db)
sourceTablet.FakeMysqlDaemon.ReadOnly = true
sourceTablet.FakeMysqlDaemon.Replicating = true
sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{
sourceTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{
GTIDSet: replication.MariadbGTIDSet{
2: replication.MariadbGTID{
Domain: 2,
Server: 123,
Sequence: 456,
},
},
}
})
sourceTablet.FakeMysqlDaemon.SetReplicationSourceInputs = []string{fmt.Sprintf("%s:%d", primary.Tablet.MysqlHostname, primary.Tablet.MysqlPort)}
sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 3 statements come from tablet startup
Expand Down Expand Up @@ -448,7 +448,7 @@ func TestBackupRestoreLagged(t *testing.T) {

timer := time.NewTicker(1 * time.Second)
<-timer.C
sourceTablet.FakeMysqlDaemon.CurrentPrimaryPositionLocked(replication.Position{
sourceTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{
GTIDSet: replication.MariadbGTIDSet{
2: replication.MariadbGTID{
Domain: 2,
Expand All @@ -467,7 +467,7 @@ func TestBackupRestoreLagged(t *testing.T) {
require.NoError(t, sourceTablet.FakeMysqlDaemon.CheckSuperQueryList())
assert.True(t, sourceTablet.FakeMysqlDaemon.Replicating)
assert.True(t, sourceTablet.FakeMysqlDaemon.Running)
assert.Equal(t, primary.FakeMysqlDaemon.CurrentPrimaryPosition, sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition)
assert.Equal(t, primary.FakeMysqlDaemon.GetPrimaryPositionLocked(), sourceTablet.FakeMysqlDaemon.GetPrimaryPositionLocked())
case <-timer2.C:
require.FailNow(t, "Backup timed out")
}
Expand All @@ -476,15 +476,15 @@ func TestBackupRestoreLagged(t *testing.T) {
destTablet := NewFakeTablet(t, wr, "cell1", 2, topodatapb.TabletType_REPLICA, db)
destTablet.FakeMysqlDaemon.ReadOnly = true
destTablet.FakeMysqlDaemon.Replicating = true
destTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{
destTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{
GTIDSet: replication.MariadbGTIDSet{
2: replication.MariadbGTID{
Domain: 2,
Server: 123,
Sequence: 456,
},
},
}
})
destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 3 statements come from tablet startup
"STOP REPLICA",
Expand All @@ -503,7 +503,7 @@ func TestBackupRestoreLagged(t *testing.T) {
destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{
"SHOW DATABASES": {},
}
destTablet.FakeMysqlDaemon.SetReplicationPositionPos = destTablet.FakeMysqlDaemon.CurrentPrimaryPosition
destTablet.FakeMysqlDaemon.SetReplicationPositionPos = destTablet.FakeMysqlDaemon.GetPrimaryPositionLocked()
destTablet.FakeMysqlDaemon.SetReplicationSourceInputs = append(destTablet.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet))

destTablet.StartActionLoop(t, wr)
Expand All @@ -526,7 +526,7 @@ func TestBackupRestoreLagged(t *testing.T) {

timer = time.NewTicker(1 * time.Second)
<-timer.C
destTablet.FakeMysqlDaemon.CurrentPrimaryPositionLocked(replication.Position{
destTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{
GTIDSet: replication.MariadbGTIDSet{
2: replication.MariadbGTID{
Domain: 2,
Expand All @@ -544,7 +544,7 @@ func TestBackupRestoreLagged(t *testing.T) {
require.NoError(t, destTablet.FakeMysqlDaemon.CheckSuperQueryList(), "destTablet.FakeMysqlDaemon.CheckSuperQueryList failed")
assert.True(t, destTablet.FakeMysqlDaemon.Replicating)
assert.True(t, destTablet.FakeMysqlDaemon.Running)
assert.Equal(t, primary.FakeMysqlDaemon.CurrentPrimaryPosition, destTablet.FakeMysqlDaemon.CurrentPrimaryPosition)
assert.Equal(t, primary.FakeMysqlDaemon.GetPrimaryPositionLocked(), destTablet.FakeMysqlDaemon.GetPrimaryPositionLocked())
case <-timer2.C:
require.FailNow(t, "Restore timed out")
}
Expand Down Expand Up @@ -607,15 +607,15 @@ func TestRestoreUnreachablePrimary(t *testing.T) {
primary := NewFakeTablet(t, wr, "cell1", 0, topodatapb.TabletType_PRIMARY, db)
primary.FakeMysqlDaemon.ReadOnly = false
primary.FakeMysqlDaemon.Replicating = false
primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{
primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{
GTIDSet: replication.MariadbGTIDSet{
2: replication.MariadbGTID{
Domain: 2,
Server: 123,
Sequence: 457,
},
},
}
})

// start primary so that replica can fetch primary position from it
primary.StartActionLoop(t, wr)
Expand All @@ -625,15 +625,15 @@ func TestRestoreUnreachablePrimary(t *testing.T) {
sourceTablet := NewFakeTablet(t, wr, "cell1", 1, topodatapb.TabletType_REPLICA, db)
sourceTablet.FakeMysqlDaemon.ReadOnly = true
sourceTablet.FakeMysqlDaemon.Replicating = true
sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{
sourceTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{
GTIDSet: replication.MariadbGTIDSet{
2: replication.MariadbGTID{
Domain: 2,
Server: 123,
Sequence: 457,
},
},
}
})
sourceTablet.FakeMysqlDaemon.SetReplicationSourceInputs = []string{fmt.Sprintf("%s:%d", primary.Tablet.MysqlHostname, primary.Tablet.MysqlPort)}
sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 3 statements come from tablet startup
Expand Down Expand Up @@ -667,15 +667,15 @@ func TestRestoreUnreachablePrimary(t *testing.T) {
destTablet := NewFakeTablet(t, wr, "cell1", 2, topodatapb.TabletType_REPLICA, db)
destTablet.FakeMysqlDaemon.ReadOnly = true
destTablet.FakeMysqlDaemon.Replicating = true
destTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{
destTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{
GTIDSet: replication.MariadbGTIDSet{
2: replication.MariadbGTID{
Domain: 2,
Server: 123,
Sequence: 457,
},
},
}
})
destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 3 statements come from tablet startup
"STOP REPLICA",
Expand All @@ -691,7 +691,7 @@ func TestRestoreUnreachablePrimary(t *testing.T) {
destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{
"SHOW DATABASES": {},
}
destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition
destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.GetPrimaryPositionLocked()
destTablet.FakeMysqlDaemon.SetReplicationSourceInputs = append(destTablet.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet))

destTablet.StartActionLoop(t, wr)
Expand Down Expand Up @@ -783,15 +783,15 @@ func TestDisableActiveReparents(t *testing.T) {
primary := NewFakeTablet(t, wr, "cell1", 0, topodatapb.TabletType_PRIMARY, db)
primary.FakeMysqlDaemon.ReadOnly = false
primary.FakeMysqlDaemon.Replicating = false
primary.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{
primary.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{
GTIDSet: replication.MariadbGTIDSet{
2: replication.MariadbGTID{
Domain: 2,
Server: 123,
Sequence: 457,
},
},
}
})

// start primary so that replica can fetch primary position from it
primary.StartActionLoop(t, wr)
Expand All @@ -802,15 +802,15 @@ func TestDisableActiveReparents(t *testing.T) {
sourceTablet := NewFakeTablet(t, wr, "cell1", 1, topodatapb.TabletType_REPLICA, db)
sourceTablet.FakeMysqlDaemon.ReadOnly = true
sourceTablet.FakeMysqlDaemon.Replicating = true
sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{
sourceTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{
GTIDSet: replication.MariadbGTIDSet{
2: replication.MariadbGTID{
Domain: 2,
Server: 123,
Sequence: 457,
},
},
}
})
sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"STOP REPLICA",
}
Expand All @@ -835,15 +835,15 @@ func TestDisableActiveReparents(t *testing.T) {
destTablet := NewFakeTablet(t, wr, "cell1", 2, topodatapb.TabletType_REPLICA, db)
destTablet.FakeMysqlDaemon.ReadOnly = true
destTablet.FakeMysqlDaemon.Replicating = true
destTablet.FakeMysqlDaemon.CurrentPrimaryPosition = replication.Position{
destTablet.FakeMysqlDaemon.SetPrimaryPositionLocked(replication.Position{
GTIDSet: replication.MariadbGTIDSet{
2: replication.MariadbGTID{
Domain: 2,
Server: 123,
Sequence: 457,
},
},
}
})
destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"FAKE RESET BINARY LOGS AND GTIDS",
"FAKE SET GLOBAL gtid_purged",
Expand All @@ -855,7 +855,7 @@ func TestDisableActiveReparents(t *testing.T) {
destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{
"SHOW DATABASES": {},
}
destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition
destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.GetPrimaryPositionLocked()
destTablet.FakeMysqlDaemon.SetReplicationSourceInputs = append(destTablet.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet))

destTablet.StartActionLoop(t, wr)
Expand Down
Loading

0 comments on commit 19f9c95

Please sign in to comment.