Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flakes: De-flake TestGatewayBufferingWhenPrimarySwitchesServingState #14968

Merged
merged 3 commits into from
Jan 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions go/vt/vtgate/tabletgateway_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import (

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/utils"

"vitess.io/vitess/go/mysql/collations"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/vtgate/buffer"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vtgate/buffer"
)

// TestGatewayBufferingWhenPrimarySwitchesServingState is used to test that the buffering mechanism buffers the queries when a primary goes to a non serving state and
Expand Down Expand Up @@ -64,6 +63,20 @@ func TestGatewayBufferingWhenPrimarySwitchesServingState(t *testing.T) {
// add a primary tablet which is serving
sbc := hc.AddTestTablet("cell", host, port, keyspace, shard, tabletType, true, 10, nil)

bufferingWaitTimeout := 60 * time.Second
waitForBuffering := func(enabled bool) {
timer := time.NewTimer(bufferingWaitTimeout)
defer timer.Stop()
for _, buffering := tg.kev.PrimaryIsNotServing(ctx, target); buffering != enabled; _, buffering = tg.kev.PrimaryIsNotServing(ctx, target) {
select {
case <-timer.C:
require.Fail(t, "timed out waiting for buffering of enabled: %t", enabled)
default:
}
time.Sleep(10 * time.Millisecond)
}
}

// add a result to the sandbox connection
sqlResult1 := &sqltypes.Result{
Fields: []*querypb.Field{{
Expand Down Expand Up @@ -94,6 +107,8 @@ func TestGatewayBufferingWhenPrimarySwitchesServingState(t *testing.T) {
// add another result to the sandbox connection
sbc.SetResults([]*sqltypes.Result{sqlResult1})

waitForBuffering(true)

// execute the query in a go routine since it should be buffered, and check that it eventually succeed
queryChan := make(chan struct{})
go func() {
Expand All @@ -102,17 +117,17 @@ func TestGatewayBufferingWhenPrimarySwitchesServingState(t *testing.T) {
}()

// set the serving type for the primary tablet true and broadcast it so that the buffering code registers this change
// this should stop the buffering and the query executed in the go routine should work. This should be done with some delay so
// that we know that the query was buffered
time.Sleep(1 * time.Second)
// this should stop the buffering and the query executed in the go routine should work.
hc.SetServing(primaryTablet, true)
hc.Broadcast(primaryTablet)

waitForBuffering(false)

// wait for the query to execute before checking for results
select {
case <-queryChan:
require.NoError(t, err)
require.Equal(t, res, sqlResult1)
require.Equal(t, sqlResult1, res)
case <-time.After(15 * time.Second):
t.Fatalf("timed out waiting for query to execute")
}
Expand Down
Loading