From ee7acfa82d690380ad62db20b8617a384cbf3b67 Mon Sep 17 00:00:00 2001 From: Tanjin Xu <109303790+tanjinx@users.noreply.github.com> Date: Fri, 22 Nov 2024 08:55:22 -0800 Subject: [PATCH 1/3] [release-19.0] Fix to prevent stopping buffering prematurely (#17013) (#17203) (#564) Signed-off-by: Manan Gupta Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com> Co-authored-by: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> --- .../reparent/newfeaturetest/reparent_test.go | 63 +++++ go/test/endtoend/reparent/utils/utils.go | 46 ++++ go/vt/discovery/fake_healthcheck.go | 15 ++ go/vt/discovery/keyspace_events.go | 69 +++++- go/vt/discovery/keyspace_events_test.go | 229 ++++++++++++++++++ go/vt/vtgate/buffer/buffer.go | 16 +- go/vt/vtgate/buffer/buffer_helper_test.go | 2 +- go/vt/vtgate/buffer/buffer_test.go | 48 +++- go/vt/vtgate/buffer/shard_buffer.go | 20 +- go/vt/vtgate/buffer/variables_test.go | 2 +- go/vt/vtgate/tabletgateway.go | 2 +- go/vt/vtgate/tabletgateway_flaky_test.go | 3 +- 12 files changed, 492 insertions(+), 23 deletions(-) diff --git a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go index d05551f0e7b..92b1be7b3f7 100644 --- a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go +++ b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go @@ -19,10 +19,13 @@ package newfeaturetest import ( "context" "fmt" + "sync" "testing" + "time" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/reparent/utils" ) @@ -156,3 +159,63 @@ func TestChangeTypeWithoutSemiSync(t *testing.T) { err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replica.Alias, "replica") require.NoError(t, err) } + +func TestBufferingWithMultipleDisruptions(t *testing.T) { + defer cluster.PanicHandler(t) + clusterInstance := utils.SetupShardedReparentCluster(t) + defer utils.TeardownCluster(clusterInstance) + + // Stop all VTOrc instances, so that they don't interfere with the test. + for _, vtorc := range clusterInstance.VTOrcProcesses { + err := vtorc.TearDown() + require.NoError(t, err) + } + + // Start by reparenting all the shards to the first tablet. + keyspace := clusterInstance.Keyspaces[0] + shards := keyspace.Shards + for _, shard := range shards { + err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shard.Name, shard.Vttablets[0].Alias) + require.NoError(t, err) + } + + // We simulate start of external reparent or a PRS where the healthcheck update from the tablet gets lost in transit + // to vtgate by just setting the primary read only. This is also why we needed to shutdown all VTOrcs, so that they don't + // fix this. + utils.RunSQL(context.Background(), t, "set global read_only=1", shards[0].Vttablets[0]) + utils.RunSQL(context.Background(), t, "set global read_only=1", shards[1].Vttablets[0]) + + wg := sync.WaitGroup{} + rowCount := 10 + vtParams := clusterInstance.GetVTParams(keyspace.Name) + // We now spawn writes for a bunch of go routines. + // The ones going to shard 1 and shard 2 should block, since + // they're in the midst of a reparenting operation (as seen by the buffering code). + for i := 1; i <= rowCount; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + conn, err := mysql.Connect(context.Background(), &vtParams) + if err != nil { + return + } + defer conn.Close() + _, err = conn.ExecuteFetch(utils.GetInsertQuery(i), 0, false) + require.NoError(t, err) + }(i) + } + + // Now, run a PRS call on the last shard. This shouldn't unbuffer the queries that are buffered for shards 1 and 2 + // since the disruption on the two shards hasn't stopped. + err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[2].Name, shards[2].Vttablets[1].Alias) + require.NoError(t, err) + // We wait a second just to make sure the PRS changes are processed by the buffering logic in vtgate. + time.Sleep(1 * time.Second) + // Finally, we'll now make the 2 shards healthy again by running PRS. + err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[0].Name, shards[0].Vttablets[1].Alias) + require.NoError(t, err) + err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[1].Name, shards[1].Vttablets[1].Alias) + require.NoError(t, err) + // Wait for all the writes to have succeeded. + wg.Wait() +} diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index 9d7ae27c2f8..e926b19b482 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -75,6 +75,52 @@ func SetupRangeBasedCluster(ctx context.Context, t *testing.T) *cluster.LocalPro return setupCluster(ctx, t, ShardName, []string{cell1}, []int{2}, "semi_sync") } +// SetupShardedReparentCluster is used to setup a sharded cluster for testing +func SetupShardedReparentCluster(t *testing.T) *cluster.LocalProcessCluster { + clusterInstance := cluster.NewCluster(cell1, Hostname) + // Start topo server + err := clusterInstance.StartTopo() + require.NoError(t, err) + + clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, + "--lock_tables_timeout", "5s", + // Fast health checks help find corner cases. + "--health_check_interval", "1s", + "--track_schema_versions=true", + "--queryserver_enable_online_ddl=false") + clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, + "--enable_buffer", + // Long timeout in case failover is slow. + "--buffer_window", "10m", + "--buffer_max_failover_duration", "10m", + "--buffer_min_time_between_failovers", "20m", + ) + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: KeyspaceName, + SchemaSQL: sqlSchema, + VSchema: `{"sharded": true, "vindexes": {"hash_index": {"type": "hash"}}, "tables": {"vt_insert_test": {"column_vindexes": [{"column": "id", "name": "hash_index"}]}}}`, + } + err = clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false) + require.NoError(t, err) + + // Start Vtgate + err = clusterInstance.StartVtgate() + require.NoError(t, err) + return clusterInstance +} + +// GetInsertQuery returns a built insert query to insert a row. +func GetInsertQuery(idx int) string { + return fmt.Sprintf(insertSQL, idx, idx) +} + +// GetSelectionQuery returns a built selection query read the data. +func GetSelectionQuery() string { + return `select * from vt_insert_test` +} + // TeardownCluster is used to teardown the reparent cluster. When // run in a CI environment -- which is considered true when the // "CI" env variable is set to "true" -- the teardown also removes diff --git a/go/vt/discovery/fake_healthcheck.go b/go/vt/discovery/fake_healthcheck.go index 1c83de5b149..daeb50a9d75 100644 --- a/go/vt/discovery/fake_healthcheck.go +++ b/go/vt/discovery/fake_healthcheck.go @@ -172,6 +172,21 @@ func (fhc *FakeHealthCheck) SetTabletType(tablet *topodatapb.Tablet, tabletType item.ts.Target.TabletType = tabletType } +// SetPrimaryTimestamp sets the primary timestamp for the given tablet +func (fhc *FakeHealthCheck) SetPrimaryTimestamp(tablet *topodatapb.Tablet, timestamp int64) { + if fhc.ch == nil { + return + } + fhc.mu.Lock() + defer fhc.mu.Unlock() + key := TabletToMapKey(tablet) + item, isPresent := fhc.items[key] + if !isPresent { + return + } + item.ts.PrimaryTermStartTime = timestamp +} + // Unsubscribe is not implemented. func (fhc *FakeHealthCheck) Unsubscribe(c chan *TabletHealth) { } diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go index 770fb125480..9835ba188f9 100644 --- a/go/vt/discovery/keyspace_events.go +++ b/go/vt/discovery/keyspace_events.go @@ -173,8 +173,12 @@ func (kss *keyspaceState) beingResharded(currentShard string) bool { } type shardState struct { - target *querypb.Target - serving bool + target *querypb.Target + serving bool + // waitForReparent is used to tell the keyspace event watcher + // that this shard should be marked serving only after a reparent + // operation has succeeded. + waitForReparent bool externallyReparented int64 currentPrimary *topodatapb.TabletAlias } @@ -357,8 +361,34 @@ func (kss *keyspaceState) onHealthCheck(th *TabletHealth) { // if the shard went from serving to not serving, or the other way around, the keyspace // is undergoing an availability event if sstate.serving != th.Serving { - sstate.serving = th.Serving kss.consistent = false + switch { + case th.Serving && sstate.waitForReparent: + // While waiting for a reparent, if we receive a serving primary, + // we should check if the primary term start time is greater than the externally reparented time. + // We mark the shard serving only if it is. This is required so that we don't prematurely stop + // buffering for PRS, or TabletExternallyReparented, after seeing a serving healthcheck from the + // same old primary tablet that has already been turned read-only. + if th.PrimaryTermStartTime > sstate.externallyReparented { + sstate.waitForReparent = false + sstate.serving = true + } + case th.Serving && !sstate.waitForReparent: + sstate.serving = true + case !th.Serving: + sstate.serving = false + } + } + if !th.Serving { + // Once we have seen a non-serving primary healthcheck, there is no need for us to explicitly wait + // for a reparent to happen. We use waitForReparent to ensure that we don't prematurely stop + // buffering when we receive a serving healthcheck from the primary that is being demoted. + // However, if we receive a non-serving check, then we know that we won't receive any more serving + // health checks until reparent finishes. Specifically, this helps us when PRS fails, but + // stops gracefully because the new candidate couldn't get caught up in time. In this case, we promote + // the previous primary back. Without turning off waitForReparent here, we wouldn't be able to stop + // buffering for that case. + sstate.waitForReparent = false } // if the primary for this shard has been externally reparented, we're undergoing a failover, @@ -653,3 +683,36 @@ func (kew *KeyspaceEventWatcher) GetServingKeyspaces() []string { } return servingKeyspaces } + +// MarkShardNotServing marks the given shard not serving. +// We use this when we start buffering for a given shard. This helps +// coordinate between the sharding logic and the keyspace event watcher. +// We take in a boolean as well to tell us whether this error is because +// a reparent is ongoing. If it is, we also mark the shard to wait for a reparent. +// The return argument is whether the shard was found and marked not serving successfully or not. +func (kew *KeyspaceEventWatcher) MarkShardNotServing(ctx context.Context, keyspace string, shard string, isReparentErr bool) bool { + kss := kew.getKeyspaceStatus(ctx, keyspace) + if kss == nil { + // Only happens if the keyspace was deleted. + return false + } + kss.mu.Lock() + defer kss.mu.Unlock() + sstate := kss.shards[shard] + if sstate == nil { + // This only happens if the shard is deleted, or if + // the keyspace event watcher hasn't seen the shard at all. + return false + } + // Mark the keyspace inconsistent and the shard not serving. + kss.consistent = false + sstate.serving = false + if isReparentErr { + // If the error was triggered because a reparent operation has started. + // We mark the shard to wait for a reparent to finish before marking it serving. + // This is required to prevent premature stopping of buffering if we receive + // a serving healthcheck from a primary that is being demoted. + sstate.waitForReparent = true + } + return true +} diff --git a/go/vt/discovery/keyspace_events_test.go b/go/vt/discovery/keyspace_events_test.go index b7124ec3c13..c77f7c4c6e9 100644 --- a/go/vt/discovery/keyspace_events_test.go +++ b/go/vt/discovery/keyspace_events_test.go @@ -19,6 +19,7 @@ package discovery import ( "context" "encoding/hex" + "sync" "testing" "time" @@ -270,6 +271,234 @@ func TestKeyspaceEventTypes(t *testing.T) { } } +func TestOnHealthCheck(t *testing.T) { + testcases := []struct { + name string + ss *shardState + th *TabletHealth + wantServing bool + wantWaitForReparent bool + wantExternallyReparented int64 + wantUID uint32 + }{ + { + name: "Non primary tablet health ignored", + ss: &shardState{ + serving: false, + waitForReparent: false, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_REPLICA, + }, + Serving: true, + }, + wantServing: false, + wantWaitForReparent: false, + wantExternallyReparented: 10, + wantUID: 1, + }, { + name: "Serving primary seen in non-serving shard", + ss: &shardState{ + serving: false, + waitForReparent: false, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_PRIMARY, + }, + Serving: true, + PrimaryTermStartTime: 20, + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 2, + }, + }, + }, + wantServing: true, + wantWaitForReparent: false, + wantExternallyReparented: 20, + wantUID: 2, + }, { + name: "New serving primary seen while waiting for reparent", + ss: &shardState{ + serving: false, + waitForReparent: true, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_PRIMARY, + }, + Serving: true, + PrimaryTermStartTime: 20, + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 2, + }, + }, + }, + wantServing: true, + wantWaitForReparent: false, + wantExternallyReparented: 20, + wantUID: 2, + }, { + name: "Old serving primary seen while waiting for reparent", + ss: &shardState{ + serving: false, + waitForReparent: true, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_PRIMARY, + }, + Serving: true, + PrimaryTermStartTime: 10, + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + }, + wantServing: false, + wantWaitForReparent: true, + wantExternallyReparented: 10, + wantUID: 1, + }, { + name: "Old non-serving primary seen while waiting for reparent", + ss: &shardState{ + serving: false, + waitForReparent: true, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_PRIMARY, + }, + Serving: false, + PrimaryTermStartTime: 10, + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + }, + wantServing: false, + wantWaitForReparent: false, + wantExternallyReparented: 10, + wantUID: 1, + }, { + name: "New serving primary while already serving", + ss: &shardState{ + serving: true, + waitForReparent: false, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_PRIMARY, + }, + Serving: true, + PrimaryTermStartTime: 20, + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 2, + }, + }, + }, + wantServing: true, + wantWaitForReparent: false, + wantExternallyReparented: 20, + wantUID: 2, + }, { + name: "Primary goes non serving", + ss: &shardState{ + serving: true, + waitForReparent: false, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_PRIMARY, + }, + Serving: false, + PrimaryTermStartTime: 10, + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + }, + wantServing: false, + wantWaitForReparent: false, + wantExternallyReparented: 10, + wantUID: 1, + }, + } + + ksName := "ks" + shard := "-80" + kss := &keyspaceState{ + mu: sync.Mutex{}, + keyspace: ksName, + shards: make(map[string]*shardState), + } + // Adding this so that we don't run any topo calls from ensureConsistentLocked. + kss.moveTablesState = &MoveTablesState{ + Typ: MoveTablesRegular, + State: MoveTablesSwitching, + } + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + kss.shards[shard] = tt.ss + tt.th.Target.Keyspace = ksName + tt.th.Target.Shard = shard + kss.onHealthCheck(tt.th) + require.Equal(t, tt.wantServing, tt.ss.serving) + require.Equal(t, tt.wantWaitForReparent, tt.ss.waitForReparent) + require.Equal(t, tt.wantExternallyReparented, tt.ss.externallyReparented) + require.Equal(t, tt.wantUID, tt.ss.currentPrimary.Uid) + }) + } +} + type fakeTopoServer struct { } diff --git a/go/vt/vtgate/buffer/buffer.go b/go/vt/vtgate/buffer/buffer.go index 622bb03b082..0900709145f 100644 --- a/go/vt/vtgate/buffer/buffer.go +++ b/go/vt/vtgate/buffer/buffer.go @@ -94,6 +94,18 @@ func CausedByFailover(err error) bool { return isFailover } +// isErrorDueToReparenting is a stronger check than CausedByFailover, meant to return +// if the failure is caused because of a reparent. +func isErrorDueToReparenting(err error) bool { + if vterrors.Code(err) != vtrpcpb.Code_CLUSTER_EVENT { + return false + } + if strings.Contains(err.Error(), ClusterEventReshardingInProgress) { + return false + } + return true +} + // for debugging purposes func getReason(err error) string { for _, ce := range ClusterEvents { @@ -171,7 +183,7 @@ func New(cfg *Config) *Buffer { // It returns an error if buffering failed (e.g. buffer full). // If it does not return an error, it may return a RetryDoneFunc which must be // called after the request was retried. -func (b *Buffer) WaitForFailoverEnd(ctx context.Context, keyspace, shard string, err error) (RetryDoneFunc, error) { +func (b *Buffer) WaitForFailoverEnd(ctx context.Context, keyspace, shard string, kev *discovery.KeyspaceEventWatcher, err error) (RetryDoneFunc, error) { // If an err is given, it must be related to a failover. // We never buffer requests with other errors. if err != nil && !CausedByFailover(err) { @@ -188,7 +200,7 @@ func (b *Buffer) WaitForFailoverEnd(ctx context.Context, keyspace, shard string, requestsSkipped.Add([]string{keyspace, shard, skippedDisabled}, 1) return nil, nil } - return sb.waitForFailoverEnd(ctx, keyspace, shard, err) + return sb.waitForFailoverEnd(ctx, keyspace, shard, kev, err) } func (b *Buffer) HandleKeyspaceEvent(ksevent *discovery.KeyspaceEvent) { diff --git a/go/vt/vtgate/buffer/buffer_helper_test.go b/go/vt/vtgate/buffer/buffer_helper_test.go index 2deb460fc39..1276f0cd751 100644 --- a/go/vt/vtgate/buffer/buffer_helper_test.go +++ b/go/vt/vtgate/buffer/buffer_helper_test.go @@ -50,7 +50,7 @@ func issueRequestAndBlockRetry(ctx context.Context, t *testing.T, b *Buffer, err bufferingStopped := make(chan error) go func() { - retryDone, err := b.WaitForFailoverEnd(ctx, keyspace, shard, failoverErr) + retryDone, err := b.WaitForFailoverEnd(ctx, keyspace, shard, nil, failoverErr) if err != nil { bufferingStopped <- err } diff --git a/go/vt/vtgate/buffer/buffer_test.go b/go/vt/vtgate/buffer/buffer_test.go index c730a8336d1..fc326ce0ce5 100644 --- a/go/vt/vtgate/buffer/buffer_test.go +++ b/go/vt/vtgate/buffer/buffer_test.go @@ -72,6 +72,32 @@ var ( } ) +func TestIsErrorDueToReparenting(t *testing.T) { + testcases := []struct { + err error + want bool + }{ + { + err: vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, ClusterEventReshardingInProgress), + want: false, + }, + { + err: vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, ClusterEventReparentInProgress), + want: true, + }, + { + err: vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, "The MySQL server is running with the --super-read-only option"), + want: true, + }, + } + for _, tt := range testcases { + t.Run(tt.err.Error(), func(t *testing.T) { + got := isErrorDueToReparenting(tt.err) + assert.Equal(t, tt.want, got) + }) + } +} + func TestBuffering(t *testing.T) { testAllImplementations(t, func(t *testing.T, fail failover) { testBuffering1WithOptions(t, fail, 1) @@ -120,7 +146,7 @@ func testBuffering1WithOptions(t *testing.T, fail failover, concurrency int) { } // Subsequent requests with errors not related to the failover are not buffered. - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nonFailoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, nonFailoverErr); err != nil || retryDone != nil { t.Fatalf("requests with non-failover errors must never be buffered. err: %v retryDone: %v", err, retryDone) } @@ -168,7 +194,7 @@ func testBuffering1WithOptions(t *testing.T, fail failover, concurrency int) { } // Second failover: Buffering is skipped because last failover is too recent. - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("subsequent failovers must be skipped due to -buffer_min_time_between_failovers setting. err: %v retryDone: %v", err, retryDone) } if got, want := requestsSkipped.Counts()[statsKeyJoinedLastFailoverTooRecent], int64(1); got != want { @@ -226,7 +252,7 @@ func testDryRun1(t *testing.T, fail failover) { b := New(cfg) // Request does not get buffered. - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("requests must not be buffered during dry-run. err: %v retryDone: %v", err, retryDone) } // But the internal state changes though. @@ -272,10 +298,10 @@ func testPassthrough1(t *testing.T, fail failover) { b := New(cfg) - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, nil); err != nil || retryDone != nil { t.Fatalf("requests with no error must never be buffered. err: %v retryDone: %v", err, retryDone) } - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nonFailoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, nonFailoverErr); err != nil || retryDone != nil { t.Fatalf("requests with non-failover errors must never be buffered. err: %v retryDone: %v", err, retryDone) } @@ -311,7 +337,7 @@ func testLastReparentTooRecentBufferingSkipped1(t *testing.T, fail failover) { now = now.Add(1 * time.Second) fail(b, newPrimary, keyspace, shard, now) - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("requests where the failover end was recently detected before the start must not be buffered. err: %v retryDone: %v", err, retryDone) } if err := waitForPoolSlots(b, cfg.Size); err != nil { @@ -408,10 +434,10 @@ func testPassthroughDuringDrain1(t *testing.T, fail failover) { } // Requests during the drain will be passed through and not buffered. - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, nil); err != nil || retryDone != nil { t.Fatalf("requests with no error must not be buffered during a drain. err: %v retryDone: %v", err, retryDone) } - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("requests with failover errors must not be buffered during a drain. err: %v retryDone: %v", err, retryDone) } @@ -443,7 +469,7 @@ func testPassthroughIgnoredKeyspaceOrShard1(t *testing.T, fail failover) { b := New(cfg) ignoredKeyspace := "ignored_ks" - if retryDone, err := b.WaitForFailoverEnd(context.Background(), ignoredKeyspace, shard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), ignoredKeyspace, shard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("requests for ignored keyspaces must not be buffered. err: %v retryDone: %v", err, retryDone) } statsKeyJoined := strings.Join([]string{ignoredKeyspace, shard, skippedDisabled}, ".") @@ -452,7 +478,7 @@ func testPassthroughIgnoredKeyspaceOrShard1(t *testing.T, fail failover) { } ignoredShard := "ff-" - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, ignoredShard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, ignoredShard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("requests for ignored shards must not be buffered. err: %v retryDone: %v", err, retryDone) } if err := waitForPoolSlots(b, cfg.Size); err != nil { @@ -634,7 +660,7 @@ func testEvictionNotPossible1(t *testing.T, fail failover) { // Newer requests of the second failover cannot evict anything because // they have no entries buffered. - retryDone, bufferErr := b.WaitForFailoverEnd(context.Background(), keyspace, shard2, failoverErr) + retryDone, bufferErr := b.WaitForFailoverEnd(context.Background(), keyspace, shard2, nil, failoverErr) if bufferErr == nil || retryDone != nil { t.Fatalf("buffer should have returned an error because it's full: err: %v retryDone: %v", bufferErr, retryDone) } diff --git a/go/vt/vtgate/buffer/shard_buffer.go b/go/vt/vtgate/buffer/shard_buffer.go index 219323756e7..934839c89d1 100644 --- a/go/vt/vtgate/buffer/shard_buffer.go +++ b/go/vt/vtgate/buffer/shard_buffer.go @@ -138,7 +138,7 @@ func (sb *shardBuffer) disabled() bool { return sb.mode == bufferModeDisabled } -func (sb *shardBuffer) waitForFailoverEnd(ctx context.Context, keyspace, shard string, err error) (RetryDoneFunc, error) { +func (sb *shardBuffer) waitForFailoverEnd(ctx context.Context, keyspace, shard string, kev *discovery.KeyspaceEventWatcher, err error) (RetryDoneFunc, error) { // We assume if err != nil then it's always caused by a failover. // Other errors must be filtered at higher layers. failoverDetected := err != nil @@ -212,7 +212,11 @@ func (sb *shardBuffer) waitForFailoverEnd(ctx context.Context, keyspace, shard s return nil, nil } - sb.startBufferingLocked(err) + // Try to start buffering. If we're unsuccessful, then we exit early. + if !sb.startBufferingLocked(ctx, kev, err) { + sb.mu.Unlock() + return nil, nil + } } if sb.mode == bufferModeDryRun { @@ -256,7 +260,16 @@ func (sb *shardBuffer) shouldBufferLocked(failoverDetected bool) bool { panic("BUG: All possible states must be covered by the switch expression above.") } -func (sb *shardBuffer) startBufferingLocked(err error) { +func (sb *shardBuffer) startBufferingLocked(ctx context.Context, kev *discovery.KeyspaceEventWatcher, err error) bool { + if kev != nil { + if !kev.MarkShardNotServing(ctx, sb.keyspace, sb.shard, isErrorDueToReparenting(err)) { + // We failed to mark the shard as not serving. Do not buffer the request. + // This can happen if the keyspace has been deleted or if the keyspace even watcher + // hasn't yet seen the shard. Keyspace event watcher might not stop buffering for this + // request at all until it times out. It's better to not buffer this request. + return false + } + } // Reset monitoring data from previous failover. lastRequestsInFlightMax.Set(sb.statsKey, 0) lastRequestsDryRunMax.Set(sb.statsKey, 0) @@ -282,6 +295,7 @@ func (sb *shardBuffer) startBufferingLocked(err error) { sb.buf.config.MaxFailoverDuration, errorsanitizer.NormalizeError(err.Error()), ) + return true } // logErrorIfStateNotLocked logs an error if the current state is not "state". diff --git a/go/vt/vtgate/buffer/variables_test.go b/go/vt/vtgate/buffer/variables_test.go index a0640bde9e4..30d2426c639 100644 --- a/go/vt/vtgate/buffer/variables_test.go +++ b/go/vt/vtgate/buffer/variables_test.go @@ -51,7 +51,7 @@ func TestVariablesAreInitialized(t *testing.T) { // Create a new buffer and make a call which will create the shardBuffer object. // After that, the variables should be initialized for that shard. b := New(NewDefaultConfig()) - _, err := b.WaitForFailoverEnd(context.Background(), "init_test", "0", nil /* err */) + _, err := b.WaitForFailoverEnd(context.Background(), "init_test", "0", nil, nil) if err != nil { t.Fatalf("buffer should just passthrough and not return an error: %v", err) } diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 4c67f5672e1..19a3df36436 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -297,7 +297,7 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, // b) no transaction was created yet. if gw.buffer != nil && !bufferedOnce && !inTransaction && target.TabletType == topodatapb.TabletType_PRIMARY { // The next call blocks if we should buffer during a failover. - retryDone, bufferErr := gw.buffer.WaitForFailoverEnd(ctx, target.Keyspace, target.Shard, err) + retryDone, bufferErr := gw.buffer.WaitForFailoverEnd(ctx, target.Keyspace, target.Shard, gw.kev, err) // Request may have been buffered. if retryDone != nil { diff --git a/go/vt/vtgate/tabletgateway_flaky_test.go b/go/vt/vtgate/tabletgateway_flaky_test.go index 21107c8d30e..acd24ecd7db 100644 --- a/go/vt/vtgate/tabletgateway_flaky_test.go +++ b/go/vt/vtgate/tabletgateway_flaky_test.go @@ -234,6 +234,7 @@ func TestGatewayBufferingWhileReparenting(t *testing.T) { hc.SetTabletType(primaryTablet, topodatapb.TabletType_REPLICA) hc.Broadcast(primaryTablet) hc.SetTabletType(replicaTablet, topodatapb.TabletType_PRIMARY) + hc.SetPrimaryTimestamp(replicaTablet, 100) // We set a higher timestamp than before to simulate a PRS. hc.SetServing(replicaTablet, true) hc.Broadcast(replicaTablet) @@ -245,7 +246,7 @@ outer: require.Fail(t, "timed out - could not verify the new primary") case <-time.After(10 * time.Millisecond): newPrimary, notServing := tg.kev.PrimaryIsNotServing(ctx, target) - if newPrimary != nil && newPrimary.Uid == 1 && !notServing { + if newPrimary != nil && newPrimary.Uid == replicaTablet.Alias.Uid && !notServing { break outer } } From 44b64bb04750c77da75da7699c6eea77f7426299 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 22 Nov 2024 19:00:56 +0100 Subject: [PATCH 2/3] `slack-19.0`: remove `mariadb_to_mysql` CI file from v22 (#566) Signed-off-by: Tim Vaillancourt --- ...endtoend_vreplication_mariadb_to_mysql.yml | 179 ------------------ 1 file changed, 179 deletions(-) delete mode 100644 .github/workflows/cluster_endtoend_vreplication_mariadb_to_mysql.yml diff --git a/.github/workflows/cluster_endtoend_vreplication_mariadb_to_mysql.yml b/.github/workflows/cluster_endtoend_vreplication_mariadb_to_mysql.yml deleted file mode 100644 index 398d7a561f2..00000000000 --- a/.github/workflows/cluster_endtoend_vreplication_mariadb_to_mysql.yml +++ /dev/null @@ -1,179 +0,0 @@ -# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows" - -name: Cluster (vreplication_mariadb_to_mysql) -on: [push, pull_request] -concurrency: - group: format('{0}-{1}', ${{ github.ref }}, 'Cluster (vreplication_mariadb_to_mysql)') - cancel-in-progress: true - -permissions: read-all - -env: - LAUNCHABLE_ORGANIZATION: "vitess" - LAUNCHABLE_WORKSPACE: "vitess-app" - GITHUB_PR_HEAD_SHA: "${{ github.event.pull_request.head.sha }}" - -jobs: - build: - name: Run endtoend tests on Cluster (vreplication_mariadb_to_mysql) - runs-on: ubuntu-latest - - steps: - - name: Skip CI - run: | - if [[ "${{contains( github.event.pull_request.labels.*.name, 'Skip CI')}}" == "true" ]]; then - echo "skipping CI due to the 'Skip CI' label" - exit 1 - fi - - - name: Check if workflow needs to be skipped - id: skip-workflow - run: | - skip='false' - if [[ "${{github.event.pull_request}}" == "" ]] && [[ "${{github.ref}}" != "refs/heads/main" ]] && [[ ! "${{github.ref}}" =~ ^refs/heads/release-[0-9]+\.[0-9]$ ]] && [[ ! "${{github.ref}}" =~ "refs/tags/.*" ]]; then - skip='true' - fi - echo Skip ${skip} - echo "skip-workflow=${skip}" >> $GITHUB_OUTPUT - - PR_DATA=$(curl -s\ - -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ - -H "Accept: application/vnd.github.v3+json" \ - "https://api.github.com/repos/${{ github.repository }}/pulls/${{ github.event.pull_request.number }}") - draft=$(echo "$PR_DATA" | jq .draft -r) - echo "is_draft=${draft}" >> $GITHUB_OUTPUT - - - name: Check out code - if: steps.skip-workflow.outputs.skip-workflow == 'false' - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 - - - name: Check for changes in relevant files - if: steps.skip-workflow.outputs.skip-workflow == 'false' - uses: dorny/paths-filter@ebc4d7e9ebcb0b1eb21480bb8f43113e996ac77a # v3.0.1 - id: changes - with: - token: '' - filters: | - end_to_end: - - 'go/**/*.go' - - 'go/vt/sidecardb/**/*.sql' - - 'go/test/endtoend/onlineddl/vrepl_suite/**' - - 'test.go' - - 'Makefile' - - 'build.env' - - 'go.sum' - - 'go.mod' - - 'proto/*.proto' - - 'tools/**' - - 'config/**' - - 'bootstrap.sh' - - '.github/workflows/cluster_endtoend_vreplication_mariadb_to_mysql.yml' - - - name: Set up Go - if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' - uses: actions/setup-go@0a12ed9d6a96ab950c8f026ed9f722fe0da7ef32 # v5.0.2 - with: - go-version: 1.23.1 - - - name: Set up python - if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 - - - name: Tune the OS - if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' - run: | - # Limit local port range to not use ports that overlap with server side - # ports that we listen on. - sudo sysctl -w net.ipv4.ip_local_port_range="22768 65535" - # Increase the asynchronous non-blocking I/O. More information at https://dev.mysql.com/doc/refman/5.7/en/innodb-parameters.html#sysvar_innodb_use_native_aio - echo "fs.aio-max-nr = 1048576" | sudo tee -a /etc/sysctl.conf - sudo sysctl -p /etc/sysctl.conf - - - name: Get dependencies - if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' - run: | - - # Get key to latest MySQL repo - sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys A8D3785C - # Setup MySQL 8.0 - wget -c https://dev.mysql.com/get/mysql-apt-config_0.8.32-1_all.deb - echo mysql-apt-config mysql-apt-config/select-server select mysql-8.0 | sudo debconf-set-selections - sudo DEBIAN_FRONTEND="noninteractive" dpkg -i mysql-apt-config* - sudo apt-get -qq update - # Install everything else we need, and configure - sudo apt-get -qq install -y mysql-server mysql-shell mysql-client make unzip g++ etcd curl git wget eatmydata xz-utils libncurses5 - - sudo service mysql stop - sudo service etcd stop - sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/ - sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld - go mod download - - # install JUnit report formatter - go install github.com/vitessio/go-junit-report@HEAD - - - name: Setup launchable dependencies - if: steps.skip-workflow.outputs.is_draft == 'false' && steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && github.base_ref == 'main' - run: | - # Get Launchable CLI installed. If you can, make it a part of the builder image to speed things up - pip3 install --user launchable~=1.0 > /dev/null - - # verify that launchable setup is all correct. - launchable verify || true - - # Tell Launchable about the build you are producing and testing - launchable record build --name "$GITHUB_RUN_ID" --no-commit-collection --source . - - - name: Run cluster endtoend test - if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' - timeout-minutes: 45 - run: | - # We set the VTDATAROOT to the /tmp folder to reduce the file path of mysql.sock file - # which musn't be more than 107 characters long. - export VTDATAROOT="/tmp/" - source build.env - - set -exo pipefail - - # Increase our open file descriptor limit as we could hit this - ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql8026.cnf - innodb_buffer_pool_dump_at_shutdown=OFF - innodb_buffer_pool_in_core_file=OFF - innodb_buffer_pool_load_at_startup=OFF - innodb_buffer_pool_size=64M - innodb_doublewrite=OFF - innodb_flush_log_at_trx_commit=0 - innodb_flush_method=O_DIRECT - innodb_numa_interleave=ON - innodb_adaptive_hash_index=OFF - sync_binlog=0 - sync_relay_log=0 - performance_schema=OFF - slow-query-log=OFF - EOF - - cat <<-EOF>>./config/mycnf/mysql8026.cnf - binlog-transaction-compression=ON - EOF - - # run the tests however you normally do, then produce a JUnit XML file - eatmydata -- go run test.go -docker=false -follow -shard vreplication_mariadb_to_mysql | tee -a output.txt | go-junit-report -set-exit-code > report.xml - - - name: Print test output and Record test result in launchable if PR is not a draft - if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && always() - run: | - if [[ "${{steps.skip-workflow.outputs.is_draft}}" == "false" ]]; then - # send recorded tests to launchable - launchable record tests --build "$GITHUB_RUN_ID" go-test . || true - fi - - # print test output - cat output.txt - - - name: Test Summary - if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && always() - uses: test-summary/action@31493c76ec9e7aa675f1585d3ed6f1da69269a86 # v2.4 - with: - paths: "report.xml" - show: "fail, skip" From 508c86da1d75856259e53caaf8c81de4dcd0f914 Mon Sep 17 00:00:00 2001 From: Tanjin Xu <109303790+tanjinx@users.noreply.github.com> Date: Fri, 22 Nov 2024 16:01:59 -0800 Subject: [PATCH 3/3] backport upstream 16655 - Fix race condition that prevents queries from being buffered after vtgate startup (#567) * backport upstream 16655 --------- Co-authored-by: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> --- go/vt/discovery/keyspace_events.go | 99 ++++++++++++++++--- go/vt/discovery/keyspace_events_test.go | 119 +++++++++++++++++++---- go/vt/srvtopo/discover.go | 14 +-- go/vt/srvtopo/discover_test.go | 41 +++++--- go/vt/vtgate/tabletgateway.go | 28 ++++-- go/vt/vtgate/tabletgateway_flaky_test.go | 10 +- go/vt/vtgate/tabletgateway_test.go | 56 +++++++++++ 7 files changed, 302 insertions(+), 65 deletions(-) diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go index 9835ba188f9..fe90428c390 100644 --- a/go/vt/discovery/keyspace_events.go +++ b/go/vt/discovery/keyspace_events.go @@ -19,7 +19,9 @@ package discovery import ( "context" "fmt" + "slices" "sync" + "time" "google.golang.org/protobuf/proto" @@ -36,6 +38,11 @@ import ( vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) +var ( + // waitConsistentKeyspacesCheck is the amount of time to wait for between checks to verify the keyspace is consistent. + waitConsistentKeyspacesCheck = 100 * time.Millisecond +) + // KeyspaceEventWatcher is an auxiliary watcher that watches all availability incidents // for all keyspaces in a Vitess cell and notifies listeners when the events have been resolved. // Right now this is capable of detecting the end of failovers, both planned and unplanned, @@ -643,28 +650,53 @@ func (kew *KeyspaceEventWatcher) TargetIsBeingResharded(ctx context.Context, tar return ks.beingResharded(target.Shard) } -// PrimaryIsNotServing checks if the reason why the given target is not accessible right now is -// that the primary tablet for that shard is not serving. This is possible during a Planned Reparent Shard -// operation. Just as the operation completes, a new primary will be elected, and it will send its own healthcheck -// stating that it is serving. We should buffer requests until that point. -// There are use cases where people do not run with a Primary server at all, so we must verify that -// we only start buffering when a primary was present, and it went not serving. -// The shard state keeps track of the current primary and the last externally reparented time, which we can use -// to determine that there was a serving primary which now became non serving. This is only possible in a DemotePrimary -// RPC which are only called from ERS and PRS. So buffering will stop when these operations succeed. -// We return the tablet alias of the primary if it is serving. -func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(ctx context.Context, target *querypb.Target) (*topodatapb.TabletAlias, bool) { +// ShouldStartBufferingForTarget checks if we should be starting buffering for the given target. +// We check the following things before we start buffering - +// 1. The shard must have a primary. +// 2. The primary must be non-serving. +// 3. The keyspace must be marked inconsistent. +// +// This buffering is meant to kick in during a Planned Reparent Shard operation. +// As part of that operation the old primary will become non-serving. At that point +// this code should return true to start buffering requests. +// Just as the PRS operation completes, a new primary will be elected, and +// it will send its own healthcheck stating that it is serving. We should buffer requests until +// that point. +// +// There are use cases where people do not run with a Primary server at all, so we must +// verify that we only start buffering when a primary was present, and it went not serving. +// The shard state keeps track of the current primary and the last externally reparented time, which +// we can use to determine that there was a serving primary which now became non serving. This is +// only possible in a DemotePrimary RPC which are only called from ERS and PRS. So buffering will +// stop when these operations succeed. We also return the tablet alias of the primary if it is serving. +func (kew *KeyspaceEventWatcher) ShouldStartBufferingForTarget(ctx context.Context, target *querypb.Target) (*topodatapb.TabletAlias, bool) { if target.TabletType != topodatapb.TabletType_PRIMARY { + // We don't support buffering for any target tablet type other than the primary. return nil, false } ks := kew.getKeyspaceStatus(ctx, target.Keyspace) if ks == nil { + // If the keyspace status is nil, then the keyspace must be deleted. + // The user query is trying to access a keyspace that has been deleted. + // There is no reason to buffer this query. return nil, false } ks.mu.Lock() defer ks.mu.Unlock() if state, ok := ks.shards[target.Shard]; ok { - // If the primary tablet was present then externallyReparented will be non-zero and currentPrimary will be not nil + // As described in the function comment, we only want to start buffering when all the following conditions are met - + // 1. The shard must have a primary. We check this by checking the currentPrimary and externallyReparented fields being non-empty. + // They are set the first time the shard registers an update from a serving primary and are never cleared out after that. + // If the user has configured vtgates to wait for the primary tablet healthchecks before starting query service, this condition + // will always be true. + // 2. The primary must be non-serving. We check this by checking the serving field in the shard state. + // When a primary becomes non-serving, it also marks the keyspace inconsistent. So the next check is only added + // for being defensive against any bugs. + // 3. The keyspace must be marked inconsistent. We check this by checking the consistent field in the keyspace state. + // + // The reason we need all the three checks is that we want to be very defensive in when we start buffering. + // We don't want to start buffering when we don't know for sure if the primary + // is not serving and we will receive an update that stops buffering soon. return state.currentPrimary, !state.serving && !ks.consistent && state.externallyReparented != 0 && state.currentPrimary != nil } return nil, false @@ -716,3 +748,46 @@ func (kew *KeyspaceEventWatcher) MarkShardNotServing(ctx context.Context, keyspa } return true } + +// WaitForConsistentKeyspaces waits for the given set of keyspaces to be marked consistent. +func (kew *KeyspaceEventWatcher) WaitForConsistentKeyspaces(ctx context.Context, ksList []string) error { + // We don't want to change the original keyspace list that we receive so we clone it + // before we empty it elements down below. + keyspaces := slices.Clone(ksList) + for { + // We empty keyspaces as we find them to be consistent. + allConsistent := true + for i, ks := range keyspaces { + if ks == "" { + continue + } + + // Get the keyspace status and see it is consistent yet or not. + kss := kew.getKeyspaceStatus(ctx, ks) + // If kss is nil, then it must be deleted. In that case too it is fine for us to consider + // it consistent since the keyspace has been deleted. + if kss == nil || kss.consistent { + keyspaces[i] = "" + } else { + allConsistent = false + } + } + + if allConsistent { + // all the keyspaces are consistent. + return nil + } + + // Unblock after the sleep or when the context has expired. + select { + case <-ctx.Done(): + for _, ks := range keyspaces { + if ks != "" { + log.Infof("keyspace %v didn't become consistent", ks) + } + } + return ctx.Err() + case <-time.After(waitConsistentKeyspacesCheck): + } + } +} diff --git a/go/vt/discovery/keyspace_events_test.go b/go/vt/discovery/keyspace_events_test.go index c77f7c4c6e9..d7101d3177c 100644 --- a/go/vt/discovery/keyspace_events_test.go +++ b/go/vt/discovery/keyspace_events_test.go @@ -86,11 +86,11 @@ func TestKeyspaceEventTypes(t *testing.T) { kew := NewKeyspaceEventWatcher(ctx, ts2, hc, cell) type testCase struct { - name string - kss *keyspaceState - shardToCheck string - expectResharding bool - expectPrimaryNotServing bool + name string + kss *keyspaceState + shardToCheck string + expectResharding bool + expectShouldBuffer bool } testCases := []testCase{ @@ -127,9 +127,9 @@ func TestKeyspaceEventTypes(t *testing.T) { }, consistent: false, }, - shardToCheck: "-", - expectResharding: true, - expectPrimaryNotServing: false, + shardToCheck: "-", + expectResharding: true, + expectShouldBuffer: false, }, { name: "two to four resharding in progress", @@ -188,9 +188,9 @@ func TestKeyspaceEventTypes(t *testing.T) { }, consistent: false, }, - shardToCheck: "-80", - expectResharding: true, - expectPrimaryNotServing: false, + shardToCheck: "-80", + expectResharding: true, + expectShouldBuffer: false, }, { name: "unsharded primary not serving", @@ -214,9 +214,9 @@ func TestKeyspaceEventTypes(t *testing.T) { }, consistent: false, }, - shardToCheck: "-", - expectResharding: false, - expectPrimaryNotServing: true, + shardToCheck: "-", + expectResharding: false, + expectShouldBuffer: true, }, { name: "sharded primary not serving", @@ -248,9 +248,9 @@ func TestKeyspaceEventTypes(t *testing.T) { }, consistent: false, }, - shardToCheck: "-80", - expectResharding: false, - expectPrimaryNotServing: true, + shardToCheck: "-80", + expectResharding: false, + expectShouldBuffer: true, }, } @@ -265,8 +265,89 @@ func TestKeyspaceEventTypes(t *testing.T) { resharding := kew.TargetIsBeingResharded(ctx, tc.kss.shards[tc.shardToCheck].target) require.Equal(t, resharding, tc.expectResharding, "TargetIsBeingResharded should return %t", tc.expectResharding) - _, primaryDown := kew.PrimaryIsNotServing(ctx, tc.kss.shards[tc.shardToCheck].target) - require.Equal(t, primaryDown, tc.expectPrimaryNotServing, "PrimaryIsNotServing should return %t", tc.expectPrimaryNotServing) + _, shouldBuffer := kew.ShouldStartBufferingForTarget(ctx, tc.kss.shards[tc.shardToCheck].target) + require.Equal(t, shouldBuffer, tc.expectShouldBuffer, "ShouldStartBufferingForTarget should return %t", tc.expectShouldBuffer) + }) + } +} + +// TestWaitForConsistentKeyspaces tests the behaviour of WaitForConsistent for different scenarios. +func TestWaitForConsistentKeyspaces(t *testing.T) { + testcases := []struct { + name string + ksMap map[string]*keyspaceState + ksList []string + errExpected string + }{ + { + name: "Empty keyspace list", + ksList: nil, + ksMap: map[string]*keyspaceState{ + "ks1": {}, + }, + errExpected: "", + }, + { + name: "All keyspaces consistent", + ksList: []string{"ks1", "ks2"}, + ksMap: map[string]*keyspaceState{ + "ks1": { + consistent: true, + }, + "ks2": { + consistent: true, + }, + }, + errExpected: "", + }, + { + name: "One keyspace inconsistent", + ksList: []string{"ks1", "ks2"}, + ksMap: map[string]*keyspaceState{ + "ks1": { + consistent: true, + }, + "ks2": { + consistent: false, + }, + }, + errExpected: "context canceled", + }, + { + name: "One deleted keyspace - consistent", + ksList: []string{"ks1", "ks2"}, + ksMap: map[string]*keyspaceState{ + "ks1": { + consistent: true, + }, + "ks2": { + deleted: true, + }, + }, + errExpected: "", + }, + } + + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + // We create a cancelable context and immediately cancel it. + // We don't want the unit tests to wait, so we only test the first + // iteration of whether the keyspace event watcher returns + // that the keyspaces are consistent or not. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + kew := KeyspaceEventWatcher{ + keyspaces: tt.ksMap, + mu: sync.Mutex{}, + ts: &fakeTopoServer{}, + } + err := kew.WaitForConsistentKeyspaces(ctx, tt.ksList) + if tt.errExpected != "" { + require.ErrorContains(t, err, tt.errExpected) + } else { + require.NoError(t, err) + } + }) } } diff --git a/go/vt/srvtopo/discover.go b/go/vt/srvtopo/discover.go index 2997dc42e21..2b020e89887 100644 --- a/go/vt/srvtopo/discover.go +++ b/go/vt/srvtopo/discover.go @@ -17,9 +17,8 @@ limitations under the License. package srvtopo import ( - "sync" - "context" + "sync" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/log" @@ -29,15 +28,16 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) -// FindAllTargets goes through all serving shards in the topology for the provided keyspaces +// FindAllTargetsAndKeyspaces goes through all serving shards in the topology for the provided keyspaces // and tablet types. If no keyspaces are provided all available keyspaces in the topo are // fetched. It returns one Target object per keyspace/shard/matching TabletType. -func FindAllTargets(ctx context.Context, ts Server, cell string, keyspaces []string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, error) { +// It also returns all the keyspaces that it found. +func FindAllTargetsAndKeyspaces(ctx context.Context, ts Server, cell string, keyspaces []string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, []string, error) { var err error if len(keyspaces) == 0 { keyspaces, err = ts.GetSrvKeyspaceNames(ctx, cell, true) if err != nil { - return nil, err + return nil, nil, err } } @@ -95,8 +95,8 @@ func FindAllTargets(ctx context.Context, ts Server, cell string, keyspaces []str } wg.Wait() if errRecorder.HasErrors() { - return nil, errRecorder.Error() + return nil, nil, errRecorder.Error() } - return targets, nil + return targets, keyspaces, nil } diff --git a/go/vt/srvtopo/discover_test.go b/go/vt/srvtopo/discover_test.go index 3f730bba3d3..0232bce7a65 100644 --- a/go/vt/srvtopo/discover_test.go +++ b/go/vt/srvtopo/discover_test.go @@ -48,7 +48,7 @@ func (a TargetArray) Less(i, j int) bool { return a[i].TabletType < a[j].TabletType } -func TestFindAllTargets(t *testing.T) { +func TestFindAllTargetsAndKeyspaces(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() ts := memorytopo.NewServer(ctx, "cell1", "cell2") @@ -63,9 +63,10 @@ func TestFindAllTargets(t *testing.T) { rs := NewResilientServer(ctx, ts, "TestFindAllKeyspaceShards") // No keyspace / shards. - ks, err := FindAllTargets(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}) + targets, ksList, err := FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}) assert.NoError(t, err) - assert.Len(t, ks, 0) + assert.Len(t, targets, 0) + assert.EqualValues(t, []string{"test_keyspace"}, ksList) // Add one. assert.NoError(t, ts.UpdateSrvKeyspace(ctx, "cell1", "test_keyspace", &topodatapb.SrvKeyspace{ @@ -82,7 +83,7 @@ func TestFindAllTargets(t *testing.T) { })) // Get it. - ks, err = FindAllTargets(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}) + targets, ksList, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}) assert.NoError(t, err) assert.EqualValues(t, []*querypb.Target{ { @@ -91,10 +92,11 @@ func TestFindAllTargets(t *testing.T) { Shard: "test_shard0", TabletType: topodatapb.TabletType_PRIMARY, }, - }, ks) + }, targets) + assert.EqualValues(t, []string{"test_keyspace"}, ksList) // Get any keyspace. - ks, err = FindAllTargets(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}) + targets, ksList, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}) assert.NoError(t, err) assert.EqualValues(t, []*querypb.Target{ { @@ -103,7 +105,8 @@ func TestFindAllTargets(t *testing.T) { Shard: "test_shard0", TabletType: topodatapb.TabletType_PRIMARY, }, - }, ks) + }, targets) + assert.EqualValues(t, []string{"test_keyspace"}, ksList) // Add another one. assert.NoError(t, ts.UpdateSrvKeyspace(ctx, "cell1", "test_keyspace2", &topodatapb.SrvKeyspace{ @@ -128,9 +131,9 @@ func TestFindAllTargets(t *testing.T) { })) // Get it for any keyspace, all types. - ks, err = FindAllTargets(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA}) + targets, ksList, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA}) assert.NoError(t, err) - sort.Sort(TargetArray(ks)) + sort.Sort(TargetArray(targets)) assert.EqualValues(t, []*querypb.Target{ { Cell: "cell1", @@ -150,10 +153,12 @@ func TestFindAllTargets(t *testing.T) { Shard: "test_shard2", TabletType: topodatapb.TabletType_REPLICA, }, - }, ks) + }, targets) + sort.Strings(ksList) + assert.EqualValues(t, []string{"test_keyspace", "test_keyspace2"}, ksList) // Only get 1 keyspace for all types. - ks, err = FindAllTargets(ctx, rs, "cell1", []string{"test_keyspace2"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA}) + targets, ksList, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"test_keyspace2"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA}) assert.NoError(t, err) assert.EqualValues(t, []*querypb.Target{ { @@ -168,10 +173,11 @@ func TestFindAllTargets(t *testing.T) { Shard: "test_shard2", TabletType: topodatapb.TabletType_REPLICA, }, - }, ks) + }, targets) + assert.EqualValues(t, []string{"test_keyspace2"}, ksList) // Only get the REPLICA targets for any keyspace. - ks, err = FindAllTargets(ctx, rs, "cell1", []string{}, []topodatapb.TabletType{topodatapb.TabletType_REPLICA}) + targets, ksList, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{}, []topodatapb.TabletType{topodatapb.TabletType_REPLICA}) assert.NoError(t, err) assert.Equal(t, []*querypb.Target{ { @@ -180,10 +186,13 @@ func TestFindAllTargets(t *testing.T) { Shard: "test_shard2", TabletType: topodatapb.TabletType_REPLICA, }, - }, ks) + }, targets) + sort.Strings(ksList) + assert.EqualValues(t, []string{"test_keyspace", "test_keyspace2"}, ksList) // Get non-existent keyspace. - ks, err = FindAllTargets(ctx, rs, "cell1", []string{"doesnt-exist"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA}) + targets, ksList, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"doesnt-exist"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA}) assert.NoError(t, err) - assert.Len(t, ks, 0) + assert.Len(t, targets, 0) + assert.EqualValues(t, []string{"doesnt-exist"}, ksList) } diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 19a3df36436..9d1df487e51 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -219,11 +219,24 @@ func (gw *TabletGateway) WaitForTablets(ctx context.Context, tabletTypesToWait [ } // Finds the targets to look for. - targets, err := srvtopo.FindAllTargets(ctx, gw.srvTopoServer, gw.localCell, discovery.KeyspacesToWatch, tabletTypesToWait) + targets, keyspaces, err := srvtopo.FindAllTargetsAndKeyspaces(ctx, gw.srvTopoServer, gw.localCell, discovery.KeyspacesToWatch, tabletTypesToWait) if err != nil { return err } - return gw.hc.WaitForAllServingTablets(ctx, targets) + err = gw.hc.WaitForAllServingTablets(ctx, targets) + if err != nil { + return err + } + // After having waited for all serving tablets. We should also wait for the keyspace event watcher to have seen + // the updates and marked all the keyspaces as consistent (if we want to wait for primary tablets). + // Otherwise, we could be in a situation where even though the healthchecks have arrived, the keyspace event watcher hasn't finished processing them. + // So, if a primary tablet goes non-serving (because of a PRS or some other reason), we won't be able to start buffering. + // Waiting for the keyspaces to become consistent ensures that all the primary tablets for all the shards should be serving as seen by the keyspace event watcher + // and any disruption from now on, will make sure we start buffering properly. + if topoproto.IsTypeInList(topodatapb.TabletType_PRIMARY, tabletTypesToWait) && gw.kev != nil { + return gw.kev.WaitForConsistentKeyspaces(ctx, keyspaces) + } + return nil } // Close shuts down underlying connections. @@ -319,18 +332,21 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, if len(tablets) == 0 { // if we have a keyspace event watcher, check if the reason why our primary is not available is that it's currently being resharded // or if a reparent operation is in progress. - if kev := gw.kev; kev != nil { + // We only check for whether reshard is ongoing or primary is serving or not, only if the target is primary. We don't want to buffer + // replica queries, so it doesn't make any sense to check for resharding or reparenting in that case. + if kev := gw.kev; kev != nil && target.TabletType == topodatapb.TabletType_PRIMARY { if kev.TargetIsBeingResharded(ctx, target) { log.V(2).Infof("current keyspace is being resharded, retrying: %s: %s", target.Keyspace, debug.Stack()) err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, buffer.ClusterEventReshardingInProgress) continue } - primary, notServing := kev.PrimaryIsNotServing(ctx, target) - if notServing { + primary, shouldBuffer := kev.ShouldStartBufferingForTarget(ctx, target) + if shouldBuffer { err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, buffer.ClusterEventReparentInProgress) continue } - // if primary is serving, but we initially found no tablet, we're in an inconsistent state + // if the keyspace event manager doesn't think we should buffer queries, and also sees a primary tablet, + // but we initially found no tablet, we're in an inconsistent state // we then retry the entire loop if primary != nil { err = vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "inconsistent state detected, primary is serving but initially found no available tablet") diff --git a/go/vt/vtgate/tabletgateway_flaky_test.go b/go/vt/vtgate/tabletgateway_flaky_test.go index acd24ecd7db..d136542d176 100644 --- a/go/vt/vtgate/tabletgateway_flaky_test.go +++ b/go/vt/vtgate/tabletgateway_flaky_test.go @@ -67,7 +67,7 @@ func TestGatewayBufferingWhenPrimarySwitchesServingState(t *testing.T) { waitForBuffering := func(enabled bool) { timer := time.NewTimer(bufferingWaitTimeout) defer timer.Stop() - for _, buffering := tg.kev.PrimaryIsNotServing(ctx, target); buffering != enabled; _, buffering = tg.kev.PrimaryIsNotServing(ctx, target) { + for _, buffering := tg.kev.ShouldStartBufferingForTarget(ctx, target); buffering != enabled; _, buffering = tg.kev.ShouldStartBufferingForTarget(ctx, target) { select { case <-timer.C: require.Fail(t, "timed out waiting for buffering of enabled: %t", enabled) @@ -213,8 +213,8 @@ func TestGatewayBufferingWhileReparenting(t *testing.T) { hc.Broadcast(primaryTablet) require.Len(t, tg.hc.GetHealthyTabletStats(target), 0, "GetHealthyTabletStats has tablets even though it shouldn't") - _, isNotServing := tg.kev.PrimaryIsNotServing(ctx, target) - require.True(t, isNotServing) + _, shouldStartBuffering := tg.kev.ShouldStartBufferingForTarget(ctx, target) + require.True(t, shouldStartBuffering) // add a result to the sandbox connection of the new primary sbcReplica.SetResults([]*sqltypes.Result{sqlResult1}) @@ -245,8 +245,8 @@ outer: case <-timeout: require.Fail(t, "timed out - could not verify the new primary") case <-time.After(10 * time.Millisecond): - newPrimary, notServing := tg.kev.PrimaryIsNotServing(ctx, target) - if newPrimary != nil && newPrimary.Uid == replicaTablet.Alias.Uid && !notServing { + newPrimary, shouldBuffer := tg.kev.ShouldStartBufferingForTarget(ctx, target) + if newPrimary != nil && newPrimary.Uid == replicaTablet.Alias.Uid && !shouldBuffer { break outer } } diff --git a/go/vt/vtgate/tabletgateway_test.go b/go/vt/vtgate/tabletgateway_test.go index b1e79b7803d..2aafb78af99 100644 --- a/go/vt/vtgate/tabletgateway_test.go +++ b/go/vt/vtgate/tabletgateway_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/test/utils" + "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/discovery" @@ -343,3 +344,58 @@ func verifyShardErrors(t *testing.T, err error, wantErrors []string, wantCode vt } require.Equal(t, vterrors.Code(err), wantCode, "wanted error code: %s, got: %v", wantCode, vterrors.Code(err)) } + +// TestWithRetry tests the functionality of withRetry function in different circumstances. +func TestWithRetry(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + tg := NewTabletGateway(ctx, discovery.NewFakeHealthCheck(nil), &fakeTopoServer{}, "cell") + tg.kev = discovery.NewKeyspaceEventWatcher(ctx, tg.srvTopoServer, tg.hc, tg.localCell) + defer func() { + cancel() + tg.Close(ctx) + }() + + testcases := []struct { + name string + target *querypb.Target + inTransaction bool + inner func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService) (bool, error) + expectedErr string + }{ + { + name: "Transaction on a replica", + target: &querypb.Target{ + Keyspace: "ks", + Shard: "0", + TabletType: topodatapb.TabletType_REPLICA, + }, + inTransaction: true, + inner: func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService) (bool, error) { + return false, nil + }, + expectedErr: "tabletGateway's query service can only be used for non-transactional queries on replicas", + }, { + name: "No replica tablets available", + target: &querypb.Target{ + Keyspace: "ks", + Shard: "0", + TabletType: topodatapb.TabletType_REPLICA, + }, + inTransaction: false, + inner: func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService) (bool, error) { + return false, nil + }, + expectedErr: `target: ks.0.replica: no healthy tablet available for 'keyspace:"ks" shard:"0" tablet_type:REPLICA'`, + }, + } + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + err := tg.withRetry(ctx, tt.target, nil, "", tt.inTransaction, tt.inner) + if tt.expectedErr == "" { + require.NoError(t, err) + } else { + require.ErrorContains(t, err, tt.expectedErr) + } + }) + } +}