diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 38706a8fbee..4d72a684c0a 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -27,17 +27,18 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" - querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/srvtopo" - "vitess.io/vitess/go/vt/vterrors" ) // vstreamManager manages vstream requests. @@ -518,18 +519,23 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha go func() { _ = tabletConn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error { var err error - if ctx.Err() != nil { + switch { + case ctx.Err() != nil: err = fmt.Errorf("context has ended") - } else if shr == nil || shr.RealtimeStats == nil || shr.Target == nil { - err = fmt.Errorf("health check failed") - } else if vs.tabletType != shr.Target.TabletType { - err = fmt.Errorf("tablet type has changed from %s to %s, restarting vstream", - vs.tabletType, shr.Target.TabletType) - } else if shr.RealtimeStats.HealthError != "" { + case shr == nil || shr.RealtimeStats == nil || shr.Target == nil: + err = fmt.Errorf("health check failed on %s", topoproto.TabletAliasString(tablet.Alias)) + case vs.tabletType != shr.Target.TabletType: + err = fmt.Errorf("tablet %s type has changed from %s to %s, restarting vstream", + topoproto.TabletAliasString(tablet.Alias), vs.tabletType, shr.Target.TabletType) + case shr.RealtimeStats.HealthError != "": err = fmt.Errorf("tablet %s is no longer healthy: %s, restarting vstream", - tablet.Alias, shr.RealtimeStats.HealthError) + topoproto.TabletAliasString(tablet.Alias), shr.RealtimeStats.HealthError) + case shr.RealtimeStats.ReplicationLagSeconds > uint32(discovery.GetLowReplicationLag().Seconds()): + err = fmt.Errorf("tablet %s has a replication lag of %d seconds which is beyond the value provided in --discovery_low_replication_lag of %s so the tablet is no longer considered healthy, restarting vstream", + topoproto.TabletAliasString(tablet.Alias), shr.RealtimeStats.ReplicationLagSeconds, discovery.GetLowReplicationLag()) } if err != nil { + log.Warningf("Tablet state changed: %s, attempting to restart", err) errCh <- err return err } @@ -560,7 +566,6 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha case <-ctx.Done(): return ctx.Err() case streamErr := <-errCh: - log.Warningf("Tablet state changed: %s, attempting to restart", streamErr) return vterrors.New(vtrpcpb.Code_UNAVAILABLE, streamErr.Error()) case <-journalDone: // Unreachable. diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 3018791964f..f609982f196 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -32,12 +32,16 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/sandboxconn" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" @@ -1192,6 +1196,127 @@ func TestVStreamIdleHeartbeat(t *testing.T) { } } +// TestVStreamManagerHealthCheckResponseHandling tests the handling of healthcheck responses by +// the vstream manager to confirm that we are correctly restarting the vstream when we should. +func TestVStreamManagerHealthCheckResponseHandling(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + // Capture the vstream warning log. Otherwise we need to re-implement the vstream error + // handling in SandboxConn's implementation and then we're not actually testing the + // production code. + logger := logutil.NewMemoryLogger() + log.Warningf = logger.Warningf + + cell := "aa" + ks := "TestVStream" + shard := "0" + tabletType := topodatapb.TabletType_REPLICA + _ = createSandbox(ks) + hc := discovery.NewFakeHealthCheck(nil) + st := getSandboxTopo(ctx, cell, ks, []string{shard}) + vsm := newTestVStreamManager(ctx, hc, st, cell) + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: shard, + }}, + } + source := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, shard, tabletType, true, 0, nil) + tabletAlias := topoproto.TabletAliasString(source.Tablet().Alias) + addTabletToSandboxTopo(t, ctx, st, ks, shard, source.Tablet()) + target := &querypb.Target{ + Cell: cell, + Keyspace: ks, + Shard: shard, + TabletType: tabletType, + } + highLag := uint32(discovery.GetLowReplicationLag().Seconds()) + 1 + + type testcase struct { + name string + hcRes *querypb.StreamHealthResponse + wantErr string + } + testcases := []testcase{ + { + name: "all healthy", // Will hit the context timeout + }, + { + name: "failure", + hcRes: &querypb.StreamHealthResponse{ + TabletAlias: source.Tablet().Alias, + Target: nil, // This is seen as a healthcheck stream failure + }, + wantErr: fmt.Sprintf("health check failed on %s", tabletAlias), + }, + { + name: "tablet type changed", + hcRes: &querypb.StreamHealthResponse{ + TabletAlias: source.Tablet().Alias, + Target: &querypb.Target{ + Cell: cell, + Keyspace: ks, + Shard: shard, + TabletType: topodatapb.TabletType_PRIMARY, + }, + PrimaryTermStartTimestamp: time.Now().Unix(), + RealtimeStats: &querypb.RealtimeStats{}, + }, + wantErr: fmt.Sprintf("tablet %s type has changed from %s to %s", + tabletAlias, tabletType, topodatapb.TabletType_PRIMARY.String()), + }, + { + name: "unhealthy", + hcRes: &querypb.StreamHealthResponse{ + TabletAlias: source.Tablet().Alias, + Target: target, + RealtimeStats: &querypb.RealtimeStats{ + HealthError: "unhealthy", + }, + }, + wantErr: fmt.Sprintf("tablet %s is no longer healthy", tabletAlias), + }, + { + name: "replication lag too high", + hcRes: &querypb.StreamHealthResponse{ + TabletAlias: source.Tablet().Alias, + Target: target, + RealtimeStats: &querypb.RealtimeStats{ + ReplicationLagSeconds: highLag, + }, + }, + wantErr: fmt.Sprintf("%s has a replication lag of %d seconds which is beyond the value provided", + tabletAlias, highLag), + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + done := make(chan struct{}) + go func() { + sctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + defer close(done) + // SandboxConn's VStream implementation always waits for the context to timeout. + err := vsm.VStream(sctx, tabletType, vgtid, nil, nil, func(events []*binlogdatapb.VEvent) error { + require.Fail(t, "unexpected event", "Received unexpected events: %v", events) + return nil + }) + if tc.wantErr != "" { // Otherwise we simply expect the context to timeout + if !strings.Contains(logger.String(), tc.wantErr) { + require.Fail(t, "unexpected vstream error", "vstream ended with error: %v, which did not contain: %s", err, tc.wantErr) + } + } + }() + if tc.wantErr != "" { + source.SetStreamHealthResponse(tc.hcRes) + } + <-done + logger.Clear() + }) + } +} + func newTestVStreamManager(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager { gw := NewTabletGateway(ctx, hc, serv, cell) srvResolver := srvtopo.NewResolver(serv, gw, cell) diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index f9f1c860498..18b4f020d1d 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -126,6 +126,8 @@ type SandboxConn struct { NotServing bool getSchemaResult []map[string]string + + streamHealthResponse *querypb.StreamHealthResponse } var _ queryservice.QueryService = (*SandboxConn)(nil) // compile-time interface check @@ -418,8 +420,21 @@ func (sbc *SandboxConn) MessageAck(ctx context.Context, target *querypb.Target, // SandboxSQRowCount is the default number of fake splits returned. var SandboxSQRowCount = int64(10) -// StreamHealth always mocks a "healthy" result. +// SetStreamHealthResponse sets the StreamHealthResponse to be returned in StreamHealth. +func (sbc *SandboxConn) SetStreamHealthResponse(res *querypb.StreamHealthResponse) { + sbc.mapMu.Lock() + defer sbc.mapMu.Unlock() + sbc.streamHealthResponse = res +} + +// StreamHealth always mocks a "healthy" result by default. If you want to override this behavior you +// can call SetStreamHealthResponse. func (sbc *SandboxConn) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { + sbc.mapMu.Lock() + defer sbc.mapMu.Unlock() + if sbc.streamHealthResponse != nil { + return callback(sbc.streamHealthResponse) + } return nil }