From 41431784b65ebf8aca1516881e3d3621de7dbf66 Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Mon, 22 Apr 2024 10:33:55 -0400 Subject: [PATCH] Cherry-pick 495de697cd80b522c0f4df818f9dab0b243b1a1d with conflicts --- go/vt/vtgate/vstream_manager.go | 34 +- go/vt/vtgate/vstream_manager_test.go | 420 ++++++++++++++++++++++ go/vt/vttablet/sandboxconn/sandboxconn.go | 32 ++ 3 files changed, 477 insertions(+), 9 deletions(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 154f23f3941..326497270bb 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -27,6 +27,18 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" +<<<<<<< HEAD +======= + "vitess.io/vitess/go/vt/key" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/srvtopo" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" +>>>>>>> 495de697cd (VReplication: Take replication lag into account in VStreamManager healthcheck result processing (#15761)) querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" @@ -521,18 +533,23 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha go func() { _ = tabletConn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error { var err error - if ctx.Err() != nil { + switch { + case ctx.Err() != nil: err = fmt.Errorf("context has ended") - } else if shr == nil || shr.RealtimeStats == nil || shr.Target == nil { - err = fmt.Errorf("health check failed") - } else if vs.tabletType != shr.Target.TabletType { - err = fmt.Errorf("tablet type has changed from %s to %s, restarting vstream", - vs.tabletType, shr.Target.TabletType) - } else if shr.RealtimeStats.HealthError != "" { + case shr == nil || shr.RealtimeStats == nil || shr.Target == nil: + err = fmt.Errorf("health check failed on %s", topoproto.TabletAliasString(tablet.Alias)) + case vs.tabletType != shr.Target.TabletType: + err = fmt.Errorf("tablet %s type has changed from %s to %s, restarting vstream", + topoproto.TabletAliasString(tablet.Alias), vs.tabletType, shr.Target.TabletType) + case shr.RealtimeStats.HealthError != "": err = fmt.Errorf("tablet %s is no longer healthy: %s, restarting vstream", - tablet.Alias, shr.RealtimeStats.HealthError) + topoproto.TabletAliasString(tablet.Alias), shr.RealtimeStats.HealthError) + case shr.RealtimeStats.ReplicationLagSeconds > uint32(discovery.GetLowReplicationLag().Seconds()): + err = fmt.Errorf("tablet %s has a replication lag of %d seconds which is beyond the value provided in --discovery_low_replication_lag of %s so the tablet is no longer considered healthy, restarting vstream", + topoproto.TabletAliasString(tablet.Alias), shr.RealtimeStats.ReplicationLagSeconds, discovery.GetLowReplicationLag()) } if err != nil { + log.Warningf("Tablet state changed: %s, attempting to restart", err) errCh <- err return err } @@ -563,7 +580,6 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha case <-ctx.Done(): return ctx.Err() case streamErr := <-errCh: - log.Warningf("Tablet state changed: %s, attempting to restart", streamErr) return vterrors.New(vtrpcpb.Code_UNAVAILABLE, streamErr.Error()) case <-journalDone: // Unreachable. diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 926af63e9ac..911ca4a8ac6 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -39,8 +39,20 @@ import ( "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/discovery" +<<<<<<< HEAD "vitess.io/vitess/go/vt/proto/binlogdata" +======= + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/srvtopo" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/sandboxconn" + +>>>>>>> 495de697cd (VReplication: Take replication lag into account in VStreamManager healthcheck result processing (#15761)) binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/srvtopo" ) @@ -1187,8 +1199,416 @@ func TestVStreamIdleHeartbeat(t *testing.T) { } } +<<<<<<< HEAD func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager { gw := NewTabletGateway(context.Background(), hc, serv, cell) +======= +func TestKeyspaceHasBeenSharded(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + cell := "zone1" + ks := "testks" + + type testcase struct { + name string + oldshards []string + newshards []string + vgtid *binlogdatapb.VGtid + trafficSwitched bool + want bool + wantErr string + } + testcases := []testcase{ + { + name: "2 to 4, split both, traffic not switched", + oldshards: []string{ + "-80", + "80-", + }, + newshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-80", + }, + { + Keyspace: ks, + Shard: "80-", + }, + }, + }, + trafficSwitched: false, + want: false, + }, + { + name: "2 to 4, split both, traffic not switched", + oldshards: []string{ + "-80", + "80-", + }, + newshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-80", + }, + { + Keyspace: ks, + Shard: "80-", + }, + }, + }, + trafficSwitched: false, + want: false, + }, + { + name: "2 to 8, split both, traffic switched", + oldshards: []string{ + "-80", + "80-", + }, + newshards: []string{ + "-20", + "20-40", + "40-60", + "60-80", + "80-a0", + "a0-c0", + "c0-e0", + "e0-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-80", + }, + { + Keyspace: ks, + Shard: "80-", + }, + }, + }, + trafficSwitched: true, + want: true, + }, + { + name: "2 to 4, split only first shard, traffic switched", + oldshards: []string{ + "-80", + "80-", + }, + newshards: []string{ + "-20", + "20-40", + "40-60", + "60-80", + // 80- is not being resharded. + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-80", + }, + { + Keyspace: ks, + Shard: "80-", + }, + }, + }, + trafficSwitched: true, + want: true, + }, + { + name: "4 to 2, merge both shards, traffic switched", + oldshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + newshards: []string{ + "-80", + "80-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-40", + }, + { + Keyspace: ks, + Shard: "40-80", + }, + { + Keyspace: ks, + Shard: "80-c0", + }, + { + Keyspace: ks, + Shard: "c0-", + }, + }, + }, + trafficSwitched: true, + want: true, + }, + { + name: "4 to 3, merge second half, traffic not switched", + oldshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + newshards: []string{ + // -40 and 40-80 are not being resharded. + "80-", // Merge of 80-c0 and c0- + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-40", + }, + { + Keyspace: ks, + Shard: "40-80", + }, + { + Keyspace: ks, + Shard: "80-c0", + }, + { + Keyspace: ks, + Shard: "c0-", + }, + }, + }, + trafficSwitched: false, + want: false, + }, + { + name: "4 to 3, merge second half, traffic switched", + oldshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + newshards: []string{ + // -40 and 40-80 are not being resharded. + "80-", // Merge of 80-c0 and c0- + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-40", + }, + { + Keyspace: ks, + Shard: "40-80", + }, + { + Keyspace: ks, + Shard: "80-c0", + }, + { + Keyspace: ks, + Shard: "c0-", + }, + }, + }, + trafficSwitched: true, + want: true, + }, + } + + addTablet := func(t *testing.T, ctx context.Context, host string, port int32, cell, ks, shard string, ts *topo.Server, hc *discovery.FakeHealthCheck, serving bool) { + tabletconn := hc.AddTestTablet(cell, host, port, ks, shard, topodatapb.TabletType_PRIMARY, serving, 0, nil) + err := ts.CreateTablet(ctx, tabletconn.Tablet()) + require.NoError(t, err) + var alias *topodatapb.TabletAlias + if serving { + alias = tabletconn.Tablet().Alias + } + _, err = ts.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = alias + si.IsPrimaryServing = serving + return nil + }) + require.NoError(t, err) + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + hc := discovery.NewFakeHealthCheck(nil) + _ = createSandbox(ks) + st := getSandboxTopo(ctx, cell, ks, append(tc.oldshards, tc.newshards...)) + vsm := newTestVStreamManager(ctx, hc, st, cell) + vs := vstream{ + vgtid: tc.vgtid, + tabletType: topodatapb.TabletType_PRIMARY, + optCells: cell, + vsm: vsm, + ts: st.topoServer, + } + for i, shard := range tc.oldshards { + addTablet(t, ctx, fmt.Sprintf("1.1.0.%d", i), int32(1000+i), cell, ks, shard, st.topoServer, hc, !tc.trafficSwitched) + } + for i, shard := range tc.newshards { + addTablet(t, ctx, fmt.Sprintf("1.1.1.%d", i), int32(2000+i), cell, ks, shard, st.topoServer, hc, tc.trafficSwitched) + } + got, err := vs.keyspaceHasBeenResharded(ctx, ks) + if tc.wantErr != "" { + require.EqualError(t, err, tc.wantErr) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.want, got) + }) + } +} + +// TestVStreamManagerHealthCheckResponseHandling tests the handling of healthcheck responses by +// the vstream manager to confirm that we are correctly restarting the vstream when we should. +func TestVStreamManagerHealthCheckResponseHandling(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + // Capture the vstream warning log. Otherwise we need to re-implement the vstream error + // handling in SandboxConn's implementation and then we're not actually testing the + // production code. + logger := logutil.NewMemoryLogger() + log.Warningf = logger.Warningf + + cell := "aa" + ks := "TestVStream" + shard := "0" + tabletType := topodatapb.TabletType_REPLICA + _ = createSandbox(ks) + hc := discovery.NewFakeHealthCheck(nil) + st := getSandboxTopo(ctx, cell, ks, []string{shard}) + vsm := newTestVStreamManager(ctx, hc, st, cell) + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: shard, + }}, + } + source := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, shard, tabletType, true, 0, nil) + tabletAlias := topoproto.TabletAliasString(source.Tablet().Alias) + addTabletToSandboxTopo(t, ctx, st, ks, shard, source.Tablet()) + target := &querypb.Target{ + Cell: cell, + Keyspace: ks, + Shard: shard, + TabletType: tabletType, + } + highLag := uint32(discovery.GetLowReplicationLag().Seconds()) + 1 + + type testcase struct { + name string + hcRes *querypb.StreamHealthResponse + wantErr string + } + testcases := []testcase{ + { + name: "all healthy", // Will hit the context timeout + }, + { + name: "failure", + hcRes: &querypb.StreamHealthResponse{ + TabletAlias: source.Tablet().Alias, + Target: nil, // This is seen as a healthcheck stream failure + }, + wantErr: fmt.Sprintf("health check failed on %s", tabletAlias), + }, + { + name: "tablet type changed", + hcRes: &querypb.StreamHealthResponse{ + TabletAlias: source.Tablet().Alias, + Target: &querypb.Target{ + Cell: cell, + Keyspace: ks, + Shard: shard, + TabletType: topodatapb.TabletType_PRIMARY, + }, + PrimaryTermStartTimestamp: time.Now().Unix(), + RealtimeStats: &querypb.RealtimeStats{}, + }, + wantErr: fmt.Sprintf("tablet %s type has changed from %s to %s", + tabletAlias, tabletType, topodatapb.TabletType_PRIMARY.String()), + }, + { + name: "unhealthy", + hcRes: &querypb.StreamHealthResponse{ + TabletAlias: source.Tablet().Alias, + Target: target, + RealtimeStats: &querypb.RealtimeStats{ + HealthError: "unhealthy", + }, + }, + wantErr: fmt.Sprintf("tablet %s is no longer healthy", tabletAlias), + }, + { + name: "replication lag too high", + hcRes: &querypb.StreamHealthResponse{ + TabletAlias: source.Tablet().Alias, + Target: target, + RealtimeStats: &querypb.RealtimeStats{ + ReplicationLagSeconds: highLag, + }, + }, + wantErr: fmt.Sprintf("%s has a replication lag of %d seconds which is beyond the value provided", + tabletAlias, highLag), + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + done := make(chan struct{}) + go func() { + sctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + defer close(done) + // SandboxConn's VStream implementation always waits for the context to timeout. + err := vsm.VStream(sctx, tabletType, vgtid, nil, nil, func(events []*binlogdatapb.VEvent) error { + require.Fail(t, "unexpected event", "Received unexpected events: %v", events) + return nil + }) + if tc.wantErr != "" { // Otherwise we simply expect the context to timeout + if !strings.Contains(logger.String(), tc.wantErr) { + require.Fail(t, "unexpected vstream error", "vstream ended with error: %v, which did not contain: %s", err, tc.wantErr) + } + } + }() + if tc.wantErr != "" { + source.SetStreamHealthResponse(tc.hcRes) + } + <-done + logger.Clear() + }) + } +} + +func newTestVStreamManager(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager { + gw := NewTabletGateway(ctx, hc, serv, cell) +>>>>>>> 495de697cd (VReplication: Take replication lag into account in VStreamManager healthcheck result processing (#15761)) srvResolver := srvtopo.NewResolver(serv, gw, cell) return newVStreamManager(srvResolver, serv, cell) } diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index 5f7a593cbb5..97a3dbf6f09 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -122,7 +122,20 @@ type SandboxConn struct { NotServing bool +<<<<<<< HEAD getSchemaResult []map[string]string +======= + getSchemaResult []SchemaResult + + parser *sqlparser.Parser + + streamHealthResponse *querypb.StreamHealthResponse +} + +type SchemaResult struct { + TablesAndViews map[string]string + UDFs []*querypb.UDFInfo +>>>>>>> 495de697cd (VReplication: Take replication lag into account in VStreamManager healthcheck result processing (#15761)) } var _ queryservice.QueryService = (*SandboxConn)(nil) // compile-time interface check @@ -415,9 +428,28 @@ func (sbc *SandboxConn) MessageAck(ctx context.Context, target *querypb.Target, // SandboxSQRowCount is the default number of fake splits returned. var SandboxSQRowCount = int64(10) +<<<<<<< HEAD // StreamHealth is not implemented. func (sbc *SandboxConn) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { return fmt.Errorf("not implemented in test") +======= +// SetStreamHealthResponse sets the StreamHealthResponse to be returned in StreamHealth. +func (sbc *SandboxConn) SetStreamHealthResponse(res *querypb.StreamHealthResponse) { + sbc.mapMu.Lock() + defer sbc.mapMu.Unlock() + sbc.streamHealthResponse = res +} + +// StreamHealth always mocks a "healthy" result by default. If you want to override this behavior you +// can call SetStreamHealthResponse. +func (sbc *SandboxConn) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { + sbc.mapMu.Lock() + defer sbc.mapMu.Unlock() + if sbc.streamHealthResponse != nil { + return callback(sbc.streamHealthResponse) + } + return nil +>>>>>>> 495de697cd (VReplication: Take replication lag into account in VStreamManager healthcheck result processing (#15761)) } // ExpectVStreamStartPos makes the conn verify that that the next vstream request has the right startPos.