diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 9194327d076..aa98f148f44 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -34,6 +34,7 @@ import ( "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -538,18 +539,23 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha go func() { _ = tabletConn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error { var err error - if ctx.Err() != nil { + switch { + case ctx.Err() != nil: err = fmt.Errorf("context has ended") - } else if shr == nil || shr.RealtimeStats == nil || shr.Target == nil { - err = fmt.Errorf("health check failed") - } else if vs.tabletType != shr.Target.TabletType { - err = fmt.Errorf("tablet type has changed from %s to %s, restarting vstream", - vs.tabletType, shr.Target.TabletType) - } else if shr.RealtimeStats.HealthError != "" { + case shr == nil || shr.RealtimeStats == nil || shr.Target == nil: + err = fmt.Errorf("health check failed on %s", topoproto.TabletAliasString(tablet.Alias)) + case vs.tabletType != shr.Target.TabletType: + err = fmt.Errorf("tablet %s type has changed from %s to %s, restarting vstream", + topoproto.TabletAliasString(tablet.Alias), vs.tabletType, shr.Target.TabletType) + case shr.RealtimeStats.HealthError != "": err = fmt.Errorf("tablet %s is no longer healthy: %s, restarting vstream", - tablet.Alias, shr.RealtimeStats.HealthError) + topoproto.TabletAliasString(tablet.Alias), shr.RealtimeStats.HealthError) + case shr.RealtimeStats.ReplicationLagSeconds > uint32(discovery.GetLowReplicationLag().Seconds()): + err = fmt.Errorf("tablet %s has a replication lag of %d seconds which is beyond the value provided in --discovery_low_replication_lag of %s so the tablet is no longer considered healthy, restarting vstream", + topoproto.TabletAliasString(tablet.Alias), shr.RealtimeStats.ReplicationLagSeconds, discovery.GetLowReplicationLag()) } if err != nil { + log.Warningf("Tablet state changed: %s, attempting to restart", err) errCh <- err return err } @@ -586,7 +592,6 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha case <-ctx.Done(): return ctx.Err() case streamErr := <-errCh: - log.Warningf("Tablet state changed: %s, attempting to restart", streamErr) return vterrors.New(vtrpcpb.Code_UNAVAILABLE, streamErr.Error()) case <-journalDone: // Unreachable. diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index dab397d4621..14975c703d8 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -24,26 +24,27 @@ import ( "testing" "time" - "vitess.io/vitess/go/sync2" - - "vitess.io/vitess/go/vt/topo" - - vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vterrors" - - "vitess.io/vitess/go/stats" - "vitess.io/vitess/go/vt/vttablet/sandboxconn" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/proto/binlogdata" + "vitess.io/vitess/go/vt/srvtopo" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/sandboxconn" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/srvtopo" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) var mu sync.Mutex @@ -1333,6 +1334,124 @@ func TestVstreamCopy(t *testing.T) { require.Equal(t, "target: TestVStreamCopy.-20.primary: final error", err.Error()) } +// TestVStreamManagerHealthCheckResponseHandling tests the handling of healthcheck responses by +// the vstream manager to confirm that we are correctly restarting the vstream when we should. +func TestVStreamManagerHealthCheckResponseHandling(t *testing.T) { + // Capture the vstream warning log. Otherwise we need to re-implement the vstream error + // handling in SandboxConn's implementation and then we're not actually testing the + // production code. + logger := logutil.NewMemoryLogger() + log.Warningf = logger.Warningf + + cell := "aa" + ks := "TestVStream" + shard := "0" + tabletType := topodatapb.TabletType_REPLICA + _ = createSandbox(ks) + hc := discovery.NewFakeHealthCheck(nil) + st := getSandboxTopo(ctx, cell, ks, []string{shard}) + vsm := newTestVStreamManager(hc, st, cell) + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: shard, + }}, + } + source := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, shard, tabletType, true, 0, nil) + tabletAlias := topoproto.TabletAliasString(source.Tablet().Alias) + addTabletToSandboxTopo(t, st, ks, shard, source.Tablet()) + target := &querypb.Target{ + Cell: cell, + Keyspace: ks, + Shard: shard, + TabletType: tabletType, + } + highLag := uint32(discovery.GetLowReplicationLag().Seconds()) + 1 + + type testcase struct { + name string + hcRes *querypb.StreamHealthResponse + wantErr string + } + testcases := []testcase{ + { + name: "all healthy", // Will hit the context timeout + }, + { + name: "failure", + hcRes: &querypb.StreamHealthResponse{ + TabletAlias: source.Tablet().Alias, + Target: nil, // This is seen as a healthcheck stream failure + }, + wantErr: fmt.Sprintf("health check failed on %s", tabletAlias), + }, + { + name: "tablet type changed", + hcRes: &querypb.StreamHealthResponse{ + TabletAlias: source.Tablet().Alias, + Target: &querypb.Target{ + Cell: cell, + Keyspace: ks, + Shard: shard, + TabletType: topodatapb.TabletType_PRIMARY, + }, + RealtimeStats: &querypb.RealtimeStats{}, + }, + wantErr: fmt.Sprintf("tablet %s type has changed from %s to %s", + tabletAlias, tabletType, topodatapb.TabletType_PRIMARY.String()), + }, + { + name: "unhealthy", + hcRes: &querypb.StreamHealthResponse{ + TabletAlias: source.Tablet().Alias, + Target: target, + RealtimeStats: &querypb.RealtimeStats{ + HealthError: "unhealthy", + }, + }, + wantErr: fmt.Sprintf("tablet %s is no longer healthy", tabletAlias), + }, + { + name: "replication lag too high", + hcRes: &querypb.StreamHealthResponse{ + TabletAlias: source.Tablet().Alias, + Target: target, + RealtimeStats: &querypb.RealtimeStats{ + ReplicationLagSeconds: highLag, + }, + }, + wantErr: fmt.Sprintf("%s has a replication lag of %d seconds which is beyond the value provided", + tabletAlias, highLag), + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + done := make(chan struct{}) + go func() { + sctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + defer close(done) + // SandboxConn's VStream implementation always waits for the context to timeout. + err := vsm.VStream(sctx, tabletType, vgtid, nil, nil, func(events []*binlogdatapb.VEvent) error { + require.Fail(t, "unexpected event", "Received unexpected events: %v", events) + return nil + }) + if tc.wantErr != "" { // Otherwise we simply expect the context to timeout + if !strings.Contains(logger.String(), tc.wantErr) { + require.Fail(t, "unexpected vstream error", "vstream ended with error: %v, which did not contain: %s", err, tc.wantErr) + } + } + }() + if tc.wantErr != "" { + source.SetStreamHealthResponse(tc.hcRes) + } + <-done + logger.Clear() + }) + } +} + func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string, allowVstreamCopy bool) *vstreamManager { gw := NewTabletGateway(context.Background(), hc, serv, cell) srvResolver := srvtopo.NewResolver(serv, gw, cell) diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index cb6e6582451..05c6b0485df 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -119,6 +119,8 @@ type SandboxConn struct { EphemeralShardErr error NotServing bool + + streamHealthResponse *querypb.StreamHealthResponse } var _ queryservice.QueryService = (*SandboxConn)(nil) // compile-time interface check @@ -406,9 +408,22 @@ func (sbc *SandboxConn) MessageAck(ctx context.Context, target *querypb.Target, // SandboxSQRowCount is the default number of fake splits returned. var SandboxSQRowCount = int64(10) -// StreamHealth is not implemented. +// SetStreamHealthResponse sets the StreamHealthResponse to be returned in StreamHealth. +func (sbc *SandboxConn) SetStreamHealthResponse(res *querypb.StreamHealthResponse) { + sbc.mapMu.Lock() + defer sbc.mapMu.Unlock() + sbc.streamHealthResponse = res +} + +// StreamHealth always mocks a "healthy" result by default. If you want to override this behavior you +// can call SetStreamHealthResponse. func (sbc *SandboxConn) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { - return fmt.Errorf("not implemented in test") + sbc.mapMu.Lock() + defer sbc.mapMu.Unlock() + if sbc.streamHealthResponse != nil { + return callback(sbc.streamHealthResponse) + } + return nil } // ExpectVStreamStartPos makes the conn verify that that the next vstream request has the right startPos.