From 5703208291ec1e1123adc01451c8518cf8e76d28 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 27 Aug 2024 11:26:58 +0530 Subject: [PATCH 01/10] feat: add more unit tests and verify we return no healthy tablet available when the target is replica and we have no tablets Signed-off-by: Manan Gupta --- go/vt/vtgate/tabletgateway.go | 4 ++- go/vt/vtgate/tabletgateway_test.go | 52 ++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 63ae836d715..0dbaff49d9f 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -282,7 +282,9 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, if len(tablets) == 0 { // if we have a keyspace event watcher, check if the reason why our primary is not available is that it's currently being resharded // or if a reparent operation is in progress. - if kev := gw.kev; kev != nil { + // We only check for whether reshard is ongoing or primary is serving or not, only if the target is primary. We don't want to buffer + // replica queries, so it doesn't make any sense to check for resharding or reparenting in that case. + if kev := gw.kev; kev != nil && target.TabletType == topodatapb.TabletType_PRIMARY { if kev.TargetIsBeingResharded(ctx, target) { log.V(2).Infof("current keyspace is being resharded, retrying: %s: %s", target.Keyspace, debug.Stack()) err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, buffer.ClusterEventReshardingInProgress) diff --git a/go/vt/vtgate/tabletgateway_test.go b/go/vt/vtgate/tabletgateway_test.go index 32d18dcc9ab..f2fd5e4bd51 100644 --- a/go/vt/vtgate/tabletgateway_test.go +++ b/go/vt/vtgate/tabletgateway_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/test/utils" + "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/discovery" @@ -298,3 +299,54 @@ func verifyShardErrors(t *testing.T, err error, wantErrors []string, wantCode vt } require.Equal(t, vterrors.Code(err), wantCode, "wanted error code: %s, got: %v", wantCode, vterrors.Code(err)) } + +// TestWithRetry tests the functionality of withRetry function in different circumstances. +func TestWithRetry(t *testing.T) { + ctx := context.Background() + tg := NewTabletGateway(ctx, discovery.NewFakeHealthCheck(nil), &fakeTopoServer{}, "cell") + tg.kev = discovery.NewKeyspaceEventWatcher(ctx, tg.srvTopoServer, tg.hc, tg.localCell) + + testcases := []struct { + name string + target *querypb.Target + inTransaction bool + inner func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService) (bool, error) + expectedErr string + }{ + { + name: "Transaction on a replica", + target: &querypb.Target{ + Keyspace: "ks", + Shard: "0", + TabletType: topodatapb.TabletType_REPLICA, + }, + inTransaction: true, + inner: func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService) (bool, error) { + return false, nil + }, + expectedErr: "tabletGateway's query service can only be used for non-transactional queries on replicas", + }, { + name: "No replica tablets available", + target: &querypb.Target{ + Keyspace: "ks", + Shard: "0", + TabletType: topodatapb.TabletType_REPLICA, + }, + inTransaction: false, + inner: func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService) (bool, error) { + return false, nil + }, + expectedErr: `target: ks.0.replica: no healthy tablet available for 'keyspace:"ks" shard:"0" tablet_type:REPLICA'`, + }, + } + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + err := tg.withRetry(ctx, tt.target, nil, "", tt.inTransaction, tt.inner) + if tt.expectedErr == "" { + require.NoError(t, err) + } else { + require.ErrorContains(t, err, tt.expectedErr) + } + }) + } +} From 5d60cef6778804dd69b15597a3f7933daa24dee0 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 27 Aug 2024 13:01:09 +0530 Subject: [PATCH 02/10] feat: also wait for the keyspace event watcher to have seen the healthcheck updates Signed-off-by: Manan Gupta --- go/vt/discovery/healthcheck.go | 3 +++ go/vt/discovery/keyspace_events.go | 39 ++++++++++++++++++++++++++++++ go/vt/srvtopo/discover.go | 14 +++++------ go/vt/srvtopo/discover_test.go | 16 ++++++------ go/vt/vtgate/tabletgateway.go | 17 +++++++++++-- 5 files changed, 72 insertions(+), 17 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 2a467301eaf..ede13719ac2 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -98,6 +98,9 @@ var ( // How much to sleep between each check. waitAvailableTabletInterval = 100 * time.Millisecond + // waitConsistentKeyspacesCheck is the amount of time to wait for between checks to verify the keyspace is consistent. + waitConsistentKeyspacesCheck = 100 * time.Millisecond + // HealthCheckCacheTemplate uses healthCheckTemplate with the `HealthCheck Tablet - Cache` title to create the // HTML code required to render the cache of the HealthCheck. HealthCheckCacheTemplate = fmt.Sprintf(healthCheckTemplate, "HealthCheck - Cache") diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go index 9fa457c1589..066194e7a08 100644 --- a/go/vt/discovery/keyspace_events.go +++ b/go/vt/discovery/keyspace_events.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "sync" + "time" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" @@ -703,3 +704,41 @@ func (kew *KeyspaceEventWatcher) GetServingKeyspaces() []string { } return servingKeyspaces } + +// WaitForConsistentKeyspaces waits for the given set of keyspaces to be marked consistent. +func (kew *KeyspaceEventWatcher) WaitForConsistentKeyspaces(ctx context.Context, keyspaces []string) error { + for { + // We empty keyspaces as we find them to be consistent. + allConsistent := true + for i, ks := range keyspaces { + if ks == "" { + continue + } + + // Get the keyspace status and see it is consistent yet or not. + kss := kew.getKeyspaceStatus(ctx, ks) + if kss.consistent { + keyspaces[i] = "" + } else { + allConsistent = false + } + } + + if allConsistent { + // all the keyspaces are consistent. + return nil + } + + // Unblock after the sleep or when the context has expired. + select { + case <-ctx.Done(): + for _, ks := range keyspaces { + if ks != "" { + log.Infof("keyspace %v didn't become consistent", ks) + } + } + return ctx.Err() + case <-time.After(waitConsistentKeyspacesCheck): + } + } +} diff --git a/go/vt/srvtopo/discover.go b/go/vt/srvtopo/discover.go index 2997dc42e21..2b020e89887 100644 --- a/go/vt/srvtopo/discover.go +++ b/go/vt/srvtopo/discover.go @@ -17,9 +17,8 @@ limitations under the License. package srvtopo import ( - "sync" - "context" + "sync" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/log" @@ -29,15 +28,16 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) -// FindAllTargets goes through all serving shards in the topology for the provided keyspaces +// FindAllTargetsAndKeyspaces goes through all serving shards in the topology for the provided keyspaces // and tablet types. If no keyspaces are provided all available keyspaces in the topo are // fetched. It returns one Target object per keyspace/shard/matching TabletType. -func FindAllTargets(ctx context.Context, ts Server, cell string, keyspaces []string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, error) { +// It also returns all the keyspaces that it found. +func FindAllTargetsAndKeyspaces(ctx context.Context, ts Server, cell string, keyspaces []string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, []string, error) { var err error if len(keyspaces) == 0 { keyspaces, err = ts.GetSrvKeyspaceNames(ctx, cell, true) if err != nil { - return nil, err + return nil, nil, err } } @@ -95,8 +95,8 @@ func FindAllTargets(ctx context.Context, ts Server, cell string, keyspaces []str } wg.Wait() if errRecorder.HasErrors() { - return nil, errRecorder.Error() + return nil, nil, errRecorder.Error() } - return targets, nil + return targets, keyspaces, nil } diff --git a/go/vt/srvtopo/discover_test.go b/go/vt/srvtopo/discover_test.go index 75c5f25cc6e..1e72d7fae45 100644 --- a/go/vt/srvtopo/discover_test.go +++ b/go/vt/srvtopo/discover_test.go @@ -49,7 +49,7 @@ func (a TargetArray) Less(i, j int) bool { return a[i].TabletType < a[j].TabletType } -func TestFindAllTargets(t *testing.T) { +func TestFindAllTargetsAndKeyspaces(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() ts := memorytopo.NewServer(ctx, "cell1", "cell2") @@ -65,7 +65,7 @@ func TestFindAllTargets(t *testing.T) { rs := NewResilientServer(ctx, ts, counts) // No keyspace / shards. - ks, err := FindAllTargets(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}) + ks, _, err := FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}) assert.NoError(t, err) assert.Len(t, ks, 0) @@ -84,7 +84,7 @@ func TestFindAllTargets(t *testing.T) { })) // Get it. - ks, err = FindAllTargets(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}) + ks, _, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}) assert.NoError(t, err) assert.EqualValues(t, []*querypb.Target{ { @@ -96,7 +96,7 @@ func TestFindAllTargets(t *testing.T) { }, ks) // Get any keyspace. - ks, err = FindAllTargets(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}) + ks, _, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}) assert.NoError(t, err) assert.EqualValues(t, []*querypb.Target{ { @@ -130,7 +130,7 @@ func TestFindAllTargets(t *testing.T) { })) // Get it for any keyspace, all types. - ks, err = FindAllTargets(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA}) + ks, _, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA}) assert.NoError(t, err) sort.Sort(TargetArray(ks)) assert.EqualValues(t, []*querypb.Target{ @@ -155,7 +155,7 @@ func TestFindAllTargets(t *testing.T) { }, ks) // Only get 1 keyspace for all types. - ks, err = FindAllTargets(ctx, rs, "cell1", []string{"test_keyspace2"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA}) + ks, _, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"test_keyspace2"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA}) assert.NoError(t, err) assert.EqualValues(t, []*querypb.Target{ { @@ -173,7 +173,7 @@ func TestFindAllTargets(t *testing.T) { }, ks) // Only get the REPLICA targets for any keyspace. - ks, err = FindAllTargets(ctx, rs, "cell1", []string{}, []topodatapb.TabletType{topodatapb.TabletType_REPLICA}) + ks, _, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{}, []topodatapb.TabletType{topodatapb.TabletType_REPLICA}) assert.NoError(t, err) assert.Equal(t, []*querypb.Target{ { @@ -185,7 +185,7 @@ func TestFindAllTargets(t *testing.T) { }, ks) // Get non-existent keyspace. - ks, err = FindAllTargets(ctx, rs, "cell1", []string{"doesnt-exist"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA}) + ks, _, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"doesnt-exist"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA}) assert.NoError(t, err) assert.Len(t, ks, 0) } diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 0dbaff49d9f..7d2c1368b06 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -191,11 +191,24 @@ func (gw *TabletGateway) WaitForTablets(ctx context.Context, tabletTypesToWait [ } // Finds the targets to look for. - targets, err := srvtopo.FindAllTargets(ctx, gw.srvTopoServer, gw.localCell, discovery.KeyspacesToWatch, tabletTypesToWait) + targets, keyspaces, err := srvtopo.FindAllTargetsAndKeyspaces(ctx, gw.srvTopoServer, gw.localCell, discovery.KeyspacesToWatch, tabletTypesToWait) if err != nil { return err } - return gw.hc.WaitForAllServingTablets(ctx, targets) + err = gw.hc.WaitForAllServingTablets(ctx, targets) + if err != nil { + return err + } + // After having waited for all serving tablets. We should also wait for the keyspace event watcher to have seen + // the updates and marked all the keyspaces as consistent. + // Otherwise, we could be in a situation where even though the healthchecks have arrived, the keyspace event watcher hasn't finished processing them. + // So, if a primary tablet goes non-serving (because of a PRS or some other reason), we won't be able to start buffering. + // Waiting for the keyspaces to become consistent ensures that all the primary tablets for all the shards should be serving as seen by the keyspace event watcher + // and any disruption from now on, will make sure we start buffering properly. + if gw.kev != nil { + return gw.kev.WaitForConsistentKeyspaces(ctx, keyspaces) + } + return nil } // Close shuts down underlying connections. From 6b91cdf5213f450ab8602ee52d3ac78fb491e3cc Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 27 Aug 2024 14:00:47 +0530 Subject: [PATCH 03/10] test: fix gateway test to close the keyspace watcher Signed-off-by: Manan Gupta --- go/vt/vtgate/tabletgateway_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/go/vt/vtgate/tabletgateway_test.go b/go/vt/vtgate/tabletgateway_test.go index f2fd5e4bd51..fc86ab358c8 100644 --- a/go/vt/vtgate/tabletgateway_test.go +++ b/go/vt/vtgate/tabletgateway_test.go @@ -302,9 +302,13 @@ func verifyShardErrors(t *testing.T, err error, wantErrors []string, wantCode vt // TestWithRetry tests the functionality of withRetry function in different circumstances. func TestWithRetry(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) tg := NewTabletGateway(ctx, discovery.NewFakeHealthCheck(nil), &fakeTopoServer{}, "cell") tg.kev = discovery.NewKeyspaceEventWatcher(ctx, tg.srvTopoServer, tg.hc, tg.localCell) + defer func() { + cancel() + tg.Close(ctx) + }() testcases := []struct { name string From 0d7aee1cbfb9f90f7dbcc03f9db1e86436abd221 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 27 Aug 2024 14:37:40 +0530 Subject: [PATCH 04/10] feat: handle deleted keyspaces in keyspace watcher Signed-off-by: Manan Gupta --- go/vt/discovery/keyspace_events.go | 4 +++- go/vt/vtgate/tabletgateway.go | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go index 066194e7a08..11f9c861886 100644 --- a/go/vt/discovery/keyspace_events.go +++ b/go/vt/discovery/keyspace_events.go @@ -717,7 +717,9 @@ func (kew *KeyspaceEventWatcher) WaitForConsistentKeyspaces(ctx context.Context, // Get the keyspace status and see it is consistent yet or not. kss := kew.getKeyspaceStatus(ctx, ks) - if kss.consistent { + // If kss is nil, then it must be deleted. In that case too it is fine for us to consider + // it consistent since the keyspace has been deleted. + if kss == nil || kss.consistent { keyspaces[i] = "" } else { allConsistent = false diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 7d2c1368b06..4e3dd0e4bac 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -200,12 +200,12 @@ func (gw *TabletGateway) WaitForTablets(ctx context.Context, tabletTypesToWait [ return err } // After having waited for all serving tablets. We should also wait for the keyspace event watcher to have seen - // the updates and marked all the keyspaces as consistent. + // the updates and marked all the keyspaces as consistent (if we want to wait for primary tablets). // Otherwise, we could be in a situation where even though the healthchecks have arrived, the keyspace event watcher hasn't finished processing them. // So, if a primary tablet goes non-serving (because of a PRS or some other reason), we won't be able to start buffering. // Waiting for the keyspaces to become consistent ensures that all the primary tablets for all the shards should be serving as seen by the keyspace event watcher // and any disruption from now on, will make sure we start buffering properly. - if gw.kev != nil { + if topoproto.IsTypeInList(topodatapb.TabletType_PRIMARY, tabletTypesToWait) && gw.kev != nil { return gw.kev.WaitForConsistentKeyspaces(ctx, keyspaces) } return nil From 0d073a99a83cfb641ac4ff88d9e4d997ba56935a Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 27 Aug 2024 14:44:18 +0530 Subject: [PATCH 05/10] test: augment tests for findAllTargetsAndKeyspaces Signed-off-by: Manan Gupta --- go/vt/srvtopo/discover_test.go | 39 +++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/go/vt/srvtopo/discover_test.go b/go/vt/srvtopo/discover_test.go index 1e72d7fae45..90929586702 100644 --- a/go/vt/srvtopo/discover_test.go +++ b/go/vt/srvtopo/discover_test.go @@ -65,9 +65,10 @@ func TestFindAllTargetsAndKeyspaces(t *testing.T) { rs := NewResilientServer(ctx, ts, counts) // No keyspace / shards. - ks, _, err := FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}) + targets, ksList, err := FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}) assert.NoError(t, err) - assert.Len(t, ks, 0) + assert.Len(t, targets, 0) + assert.EqualValues(t, []string{"test_keyspace"}, ksList) // Add one. assert.NoError(t, ts.UpdateSrvKeyspace(ctx, "cell1", "test_keyspace", &topodatapb.SrvKeyspace{ @@ -84,7 +85,7 @@ func TestFindAllTargetsAndKeyspaces(t *testing.T) { })) // Get it. - ks, _, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}) + targets, ksList, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"test_keyspace"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}) assert.NoError(t, err) assert.EqualValues(t, []*querypb.Target{ { @@ -93,10 +94,11 @@ func TestFindAllTargetsAndKeyspaces(t *testing.T) { Shard: "test_shard0", TabletType: topodatapb.TabletType_PRIMARY, }, - }, ks) + }, targets) + assert.EqualValues(t, []string{"test_keyspace"}, ksList) // Get any keyspace. - ks, _, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}) + targets, ksList, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}) assert.NoError(t, err) assert.EqualValues(t, []*querypb.Target{ { @@ -105,7 +107,8 @@ func TestFindAllTargetsAndKeyspaces(t *testing.T) { Shard: "test_shard0", TabletType: topodatapb.TabletType_PRIMARY, }, - }, ks) + }, targets) + assert.EqualValues(t, []string{"test_keyspace"}, ksList) // Add another one. assert.NoError(t, ts.UpdateSrvKeyspace(ctx, "cell1", "test_keyspace2", &topodatapb.SrvKeyspace{ @@ -130,9 +133,9 @@ func TestFindAllTargetsAndKeyspaces(t *testing.T) { })) // Get it for any keyspace, all types. - ks, _, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA}) + targets, ksList, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", nil, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA}) assert.NoError(t, err) - sort.Sort(TargetArray(ks)) + sort.Sort(TargetArray(targets)) assert.EqualValues(t, []*querypb.Target{ { Cell: "cell1", @@ -152,10 +155,12 @@ func TestFindAllTargetsAndKeyspaces(t *testing.T) { Shard: "test_shard2", TabletType: topodatapb.TabletType_REPLICA, }, - }, ks) + }, targets) + sort.Strings(ksList) + assert.EqualValues(t, []string{"test_keyspace", "test_keyspace2"}, ksList) // Only get 1 keyspace for all types. - ks, _, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"test_keyspace2"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA}) + targets, ksList, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"test_keyspace2"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA}) assert.NoError(t, err) assert.EqualValues(t, []*querypb.Target{ { @@ -170,10 +175,11 @@ func TestFindAllTargetsAndKeyspaces(t *testing.T) { Shard: "test_shard2", TabletType: topodatapb.TabletType_REPLICA, }, - }, ks) + }, targets) + assert.EqualValues(t, []string{"test_keyspace2"}, ksList) // Only get the REPLICA targets for any keyspace. - ks, _, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{}, []topodatapb.TabletType{topodatapb.TabletType_REPLICA}) + targets, ksList, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{}, []topodatapb.TabletType{topodatapb.TabletType_REPLICA}) assert.NoError(t, err) assert.Equal(t, []*querypb.Target{ { @@ -182,10 +188,13 @@ func TestFindAllTargetsAndKeyspaces(t *testing.T) { Shard: "test_shard2", TabletType: topodatapb.TabletType_REPLICA, }, - }, ks) + }, targets) + sort.Strings(ksList) + assert.EqualValues(t, []string{"test_keyspace", "test_keyspace2"}, ksList) // Get non-existent keyspace. - ks, _, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"doesnt-exist"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA}) + targets, ksList, err = FindAllTargetsAndKeyspaces(ctx, rs, "cell1", []string{"doesnt-exist"}, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA}) assert.NoError(t, err) - assert.Len(t, ks, 0) + assert.Len(t, targets, 0) + assert.EqualValues(t, []string{"doesnt-exist"}, ksList) } From b0294cced03351fde78f6920228f75599837e63b Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 27 Aug 2024 14:48:45 +0530 Subject: [PATCH 06/10] comment: add more descriptive comment to explain the thinking behind PrimaryIsNotServing Signed-off-by: Manan Gupta --- go/vt/discovery/keyspace_events.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go index 11f9c861886..b907876df31 100644 --- a/go/vt/discovery/keyspace_events.go +++ b/go/vt/discovery/keyspace_events.go @@ -684,8 +684,12 @@ func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(ctx context.Context, target ks.mu.Lock() defer ks.mu.Unlock() if state, ok := ks.shards[target.Shard]; ok { - // If the primary tablet was present then externallyReparented will be non-zero and - // currentPrimary will be not nil. + // The first time we receive an update for a serving primary, we set the externallyReparented value and currentPrimary values. + // These never get reset, so the last two checks checking for them being non-empty is purely for defensive reasons, so that we don't + // return that the primary is not serving when there is no primary that the keyspace event watcher has seen yet. + // The reason this function returns if the Primary is not serving and not just if it is serving, because we want to very defensive in when we say + // the primary is not serving. This function is used to start buffering and we don't want to start buffering when we don't know for sure if the primary + // is not serving and we will receive an update that stops buffering soon. return state.currentPrimary, !state.serving && !ks.consistent && state.externallyReparented != 0 && state.currentPrimary != nil } return nil, false From fc2f4ac42cfed21a827f48a59a679ab3145b490a Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 27 Aug 2024 15:07:38 +0530 Subject: [PATCH 07/10] test: add tests for added wait function Signed-off-by: Manan Gupta --- go/vt/discovery/healthcheck.go | 3 - go/vt/discovery/keyspace_events.go | 5 ++ go/vt/discovery/keyspace_events_test.go | 81 +++++++++++++++++++++++++ 3 files changed, 86 insertions(+), 3 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index ede13719ac2..2a467301eaf 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -98,9 +98,6 @@ var ( // How much to sleep between each check. waitAvailableTabletInterval = 100 * time.Millisecond - // waitConsistentKeyspacesCheck is the amount of time to wait for between checks to verify the keyspace is consistent. - waitConsistentKeyspacesCheck = 100 * time.Millisecond - // HealthCheckCacheTemplate uses healthCheckTemplate with the `HealthCheck Tablet - Cache` title to create the // HTML code required to render the cache of the HealthCheck. HealthCheckCacheTemplate = fmt.Sprintf(healthCheckTemplate, "HealthCheck - Cache") diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go index b907876df31..d477228202b 100644 --- a/go/vt/discovery/keyspace_events.go +++ b/go/vt/discovery/keyspace_events.go @@ -38,6 +38,11 @@ import ( vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) +var ( + // waitConsistentKeyspacesCheck is the amount of time to wait for between checks to verify the keyspace is consistent. + waitConsistentKeyspacesCheck = 100 * time.Millisecond +) + // KeyspaceEventWatcher is an auxiliary watcher that watches all availability incidents // for all keyspaces in a Vitess cell and notifies listeners when the events have been resolved. // Right now this is capable of detecting the end of failovers, both planned and unplanned, diff --git a/go/vt/discovery/keyspace_events_test.go b/go/vt/discovery/keyspace_events_test.go index bcaf48b62a8..0502c4a532d 100644 --- a/go/vt/discovery/keyspace_events_test.go +++ b/go/vt/discovery/keyspace_events_test.go @@ -333,6 +333,87 @@ func TestKeyspaceEventTypes(t *testing.T) { } } +// TestWaitForConsistentKeyspaces tests the behaviour of WaitForConsistent for different scenarios. +func TestWaitForConsistentKeyspaces(t *testing.T) { + testcases := []struct { + name string + ksMap map[string]*keyspaceState + ksList []string + errExpected string + }{ + { + name: "Empty keyspace list", + ksList: nil, + ksMap: map[string]*keyspaceState{ + "ks1": {}, + }, + errExpected: "", + }, + { + name: "All keyspaces consistent", + ksList: []string{"ks1", "ks2"}, + ksMap: map[string]*keyspaceState{ + "ks1": { + consistent: true, + }, + "ks2": { + consistent: true, + }, + }, + errExpected: "", + }, + { + name: "One keyspace inconsistent", + ksList: []string{"ks1", "ks2"}, + ksMap: map[string]*keyspaceState{ + "ks1": { + consistent: true, + }, + "ks2": { + consistent: false, + }, + }, + errExpected: "context canceled", + }, + { + name: "One deleted keyspace - consistent", + ksList: []string{"ks1", "ks2"}, + ksMap: map[string]*keyspaceState{ + "ks1": { + consistent: true, + }, + "ks2": { + deleted: true, + }, + }, + errExpected: "", + }, + } + + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + // We create a cancelable context and immediately cancel it. + // We don't want the unit tests to wait, so we only test the first + // iteration of whether the keyspace event watcher returns + // that the keyspaces are consistent or not. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + kew := KeyspaceEventWatcher{ + keyspaces: tt.ksMap, + mu: sync.Mutex{}, + ts: &fakeTopoServer{}, + } + err := kew.WaitForConsistentKeyspaces(ctx, tt.ksList) + if tt.errExpected != "" { + require.ErrorContains(t, err, tt.errExpected) + } else { + require.NoError(t, err) + } + + }) + } +} + type fakeTopoServer struct { } From 9e04eda800b3549e237270126ecbd03e113000c7 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 27 Aug 2024 15:32:14 +0530 Subject: [PATCH 08/10] feat: fix wait function to copy the array before changing it Signed-off-by: Manan Gupta --- go/vt/discovery/keyspace_events.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go index d477228202b..3a4d517f036 100644 --- a/go/vt/discovery/keyspace_events.go +++ b/go/vt/discovery/keyspace_events.go @@ -19,6 +19,7 @@ package discovery import ( "context" "fmt" + "slices" "sync" "time" @@ -715,7 +716,10 @@ func (kew *KeyspaceEventWatcher) GetServingKeyspaces() []string { } // WaitForConsistentKeyspaces waits for the given set of keyspaces to be marked consistent. -func (kew *KeyspaceEventWatcher) WaitForConsistentKeyspaces(ctx context.Context, keyspaces []string) error { +func (kew *KeyspaceEventWatcher) WaitForConsistentKeyspaces(ctx context.Context, ksList []string) error { + // We don't want to change the original keyspace list that we receive so we clone it + // before we empty it elements down below. + keyspaces := slices.Clone(ksList) for { // We empty keyspaces as we find them to be consistent. allConsistent := true From 34da81982e9b6cd23d4881b498251fa641d88053 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 28 Aug 2024 09:50:48 +0530 Subject: [PATCH 09/10] refactor: rename the function and add more comments to make the code clearer Signed-off-by: Manan Gupta --- go/vt/discovery/keyspace_events.go | 42 +++++++++++++++++------- go/vt/discovery/keyspace_events_test.go | 38 ++++++++++----------- go/vt/vtgate/tabletgateway.go | 7 ++-- go/vt/vtgate/tabletgateway_flaky_test.go | 10 +++--- 4 files changed, 59 insertions(+), 38 deletions(-) diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go index 3a4d517f036..c50a7b279d5 100644 --- a/go/vt/discovery/keyspace_events.go +++ b/go/vt/discovery/keyspace_events.go @@ -669,32 +669,52 @@ func (kew *KeyspaceEventWatcher) TargetIsBeingResharded(ctx context.Context, tar return ks.beingResharded(target.Shard) } -// PrimaryIsNotServing checks if the reason why the given target is not accessible right now is -// that the primary tablet for that shard is not serving. This is possible during a Planned -// Reparent Shard operation. Just as the operation completes, a new primary will be elected, and +// ShouldStartBufferingForTarget checks if we should be starting buffering for the given target. +// We check the following things before we start buffering - +// 1. The shard must have a primary. +// 2. The primary must be non-serving. +// 3. The keyspace must be marked inconsistent. +// +// This buffering is meant to kick in during a Planned Reparent Shard operation. +// As part of that operation the old primary will become non-serving. At that point +// this code should return true to start buffering requests. +// Just as the PRS operation completes, a new primary will be elected, and // it will send its own healthcheck stating that it is serving. We should buffer requests until -// that point. There are use cases where people do not run with a Primary server at all, so we must +// that point. +// +// There are use cases where people do not run with a Primary server at all, so we must // verify that we only start buffering when a primary was present, and it went not serving. // The shard state keeps track of the current primary and the last externally reparented time, which // we can use to determine that there was a serving primary which now became non serving. This is // only possible in a DemotePrimary RPC which are only called from ERS and PRS. So buffering will -// stop when these operations succeed. We return the tablet alias of the primary if it is serving. -func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(ctx context.Context, target *querypb.Target) (*topodatapb.TabletAlias, bool) { +// stop when these operations succeed. We also return the tablet alias of the primary if it is serving. +func (kew *KeyspaceEventWatcher) ShouldStartBufferingForTarget(ctx context.Context, target *querypb.Target) (*topodatapb.TabletAlias, bool) { if target.TabletType != topodatapb.TabletType_PRIMARY { + // We don't support buffering for any target tablet type other than the primary. return nil, false } ks := kew.getKeyspaceStatus(ctx, target.Keyspace) if ks == nil { + // If the keyspace status is nil, then the keyspace must be deleted. + // The user query is trying to access a keyspace that has been deleted. + // There is no reason to buffer this query. return nil, false } ks.mu.Lock() defer ks.mu.Unlock() if state, ok := ks.shards[target.Shard]; ok { - // The first time we receive an update for a serving primary, we set the externallyReparented value and currentPrimary values. - // These never get reset, so the last two checks checking for them being non-empty is purely for defensive reasons, so that we don't - // return that the primary is not serving when there is no primary that the keyspace event watcher has seen yet. - // The reason this function returns if the Primary is not serving and not just if it is serving, because we want to very defensive in when we say - // the primary is not serving. This function is used to start buffering and we don't want to start buffering when we don't know for sure if the primary + // As described in the function comment, we only want to start buffering when all the following conditions are met - + // 1. The shard must have a primary. We check this by checking the currentPrimary and externallyReparented fields being non-empty. + // They are set the first time the shard registers an update from a serving primary and are never cleared out after that. + // If the user has configred vtgates to wait for the primary tablet healthchecks before starting query service, this condition + // will always be true. + // 2. The primary must be non-serving. We check this by checking the serving field in the shard state. + // When a primary becomes non-serving, it also marks the keyspace inconsistent. So the next check is only added + // for being defensive against any bugs. + // 3. The keyspace must be marked inconsistent. We check this by checking the consistent field in the keyspace state. + // + // The reason we need all the three checks is that we want to be very defensive in when we start buffering. + // We don't want to start buffering when we don't know for sure if the primary // is not serving and we will receive an update that stops buffering soon. return state.currentPrimary, !state.serving && !ks.consistent && state.externallyReparented != 0 && state.currentPrimary != nil } diff --git a/go/vt/discovery/keyspace_events_test.go b/go/vt/discovery/keyspace_events_test.go index 0502c4a532d..d6bc6b96df1 100644 --- a/go/vt/discovery/keyspace_events_test.go +++ b/go/vt/discovery/keyspace_events_test.go @@ -148,11 +148,11 @@ func TestKeyspaceEventTypes(t *testing.T) { kew := NewKeyspaceEventWatcher(ctx, ts2, hc, cell) type testCase struct { - name string - kss *keyspaceState - shardToCheck string - expectResharding bool - expectPrimaryNotServing bool + name string + kss *keyspaceState + shardToCheck string + expectResharding bool + expectShouldBuffer bool } testCases := []testCase{ @@ -189,9 +189,9 @@ func TestKeyspaceEventTypes(t *testing.T) { }, consistent: false, }, - shardToCheck: "-", - expectResharding: true, - expectPrimaryNotServing: false, + shardToCheck: "-", + expectResharding: true, + expectShouldBuffer: false, }, { name: "two to four resharding in progress", @@ -250,9 +250,9 @@ func TestKeyspaceEventTypes(t *testing.T) { }, consistent: false, }, - shardToCheck: "-80", - expectResharding: true, - expectPrimaryNotServing: false, + shardToCheck: "-80", + expectResharding: true, + expectShouldBuffer: false, }, { name: "unsharded primary not serving", @@ -276,9 +276,9 @@ func TestKeyspaceEventTypes(t *testing.T) { }, consistent: false, }, - shardToCheck: "-", - expectResharding: false, - expectPrimaryNotServing: true, + shardToCheck: "-", + expectResharding: false, + expectShouldBuffer: true, }, { name: "sharded primary not serving", @@ -310,9 +310,9 @@ func TestKeyspaceEventTypes(t *testing.T) { }, consistent: false, }, - shardToCheck: "-80", - expectResharding: false, - expectPrimaryNotServing: true, + shardToCheck: "-80", + expectResharding: false, + expectShouldBuffer: true, }, } @@ -327,8 +327,8 @@ func TestKeyspaceEventTypes(t *testing.T) { resharding := kew.TargetIsBeingResharded(ctx, tc.kss.shards[tc.shardToCheck].target) require.Equal(t, resharding, tc.expectResharding, "TargetIsBeingResharded should return %t", tc.expectResharding) - _, primaryDown := kew.PrimaryIsNotServing(ctx, tc.kss.shards[tc.shardToCheck].target) - require.Equal(t, primaryDown, tc.expectPrimaryNotServing, "PrimaryIsNotServing should return %t", tc.expectPrimaryNotServing) + _, shouldBuffer := kew.ShouldStartBufferingForTarget(ctx, tc.kss.shards[tc.shardToCheck].target) + require.Equal(t, shouldBuffer, tc.expectShouldBuffer, "ShouldStartBufferingForTarget should return %t", tc.expectShouldBuffer) }) } } diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 4e3dd0e4bac..6e7213efa16 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -303,12 +303,13 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, buffer.ClusterEventReshardingInProgress) continue } - primary, notServing := kev.PrimaryIsNotServing(ctx, target) - if notServing { + primary, shouldBuffer := kev.ShouldStartBufferingForTarget(ctx, target) + if shouldBuffer { err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, buffer.ClusterEventReparentInProgress) continue } - // if primary is serving, but we initially found no tablet, we're in an inconsistent state + // if the keyspace event manager doesn't think we should buffer queries, and also sees a primary tablet, + // but we initially found no tablet, we're in an inconsistent state // we then retry the entire loop if primary != nil { err = vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "inconsistent state detected, primary is serving but initially found no available tablet") diff --git a/go/vt/vtgate/tabletgateway_flaky_test.go b/go/vt/vtgate/tabletgateway_flaky_test.go index 21107c8d30e..e74f613b682 100644 --- a/go/vt/vtgate/tabletgateway_flaky_test.go +++ b/go/vt/vtgate/tabletgateway_flaky_test.go @@ -67,7 +67,7 @@ func TestGatewayBufferingWhenPrimarySwitchesServingState(t *testing.T) { waitForBuffering := func(enabled bool) { timer := time.NewTimer(bufferingWaitTimeout) defer timer.Stop() - for _, buffering := tg.kev.PrimaryIsNotServing(ctx, target); buffering != enabled; _, buffering = tg.kev.PrimaryIsNotServing(ctx, target) { + for _, buffering := tg.kev.ShouldStartBufferingForTarget(ctx, target); buffering != enabled; _, buffering = tg.kev.ShouldStartBufferingForTarget(ctx, target) { select { case <-timer.C: require.Fail(t, "timed out waiting for buffering of enabled: %t", enabled) @@ -213,8 +213,8 @@ func TestGatewayBufferingWhileReparenting(t *testing.T) { hc.Broadcast(primaryTablet) require.Len(t, tg.hc.GetHealthyTabletStats(target), 0, "GetHealthyTabletStats has tablets even though it shouldn't") - _, isNotServing := tg.kev.PrimaryIsNotServing(ctx, target) - require.True(t, isNotServing) + _, shouldStartBuffering := tg.kev.ShouldStartBufferingForTarget(ctx, target) + require.True(t, shouldStartBuffering) // add a result to the sandbox connection of the new primary sbcReplica.SetResults([]*sqltypes.Result{sqlResult1}) @@ -244,8 +244,8 @@ outer: case <-timeout: require.Fail(t, "timed out - could not verify the new primary") case <-time.After(10 * time.Millisecond): - newPrimary, notServing := tg.kev.PrimaryIsNotServing(ctx, target) - if newPrimary != nil && newPrimary.Uid == 1 && !notServing { + newPrimary, shouldBuffer := tg.kev.ShouldStartBufferingForTarget(ctx, target) + if newPrimary != nil && newPrimary.Uid == 1 && !shouldBuffer { break outer } } From d9a0ea09eceb5939a2b77c19b5f5509100292390 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 29 Aug 2024 08:02:47 +0530 Subject: [PATCH 10/10] comment: fix spelling mistake in the comment Signed-off-by: Manan Gupta --- go/vt/discovery/keyspace_events.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go index c50a7b279d5..036d4f3ad14 100644 --- a/go/vt/discovery/keyspace_events.go +++ b/go/vt/discovery/keyspace_events.go @@ -706,7 +706,7 @@ func (kew *KeyspaceEventWatcher) ShouldStartBufferingForTarget(ctx context.Conte // As described in the function comment, we only want to start buffering when all the following conditions are met - // 1. The shard must have a primary. We check this by checking the currentPrimary and externallyReparented fields being non-empty. // They are set the first time the shard registers an update from a serving primary and are never cleared out after that. - // If the user has configred vtgates to wait for the primary tablet healthchecks before starting query service, this condition + // If the user has configured vtgates to wait for the primary tablet healthchecks before starting query service, this condition // will always be true. // 2. The primary must be non-serving. We check this by checking the serving field in the shard state. // When a primary becomes non-serving, it also marks the keyspace inconsistent. So the next check is only added