diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index f1a749590d8..c5bc460de09 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -555,18 +555,19 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha case ctx.Err() != nil: err = fmt.Errorf("context has ended") case shr == nil || shr.RealtimeStats == nil || shr.Target == nil: - err = fmt.Errorf("health check failed") + err = fmt.Errorf("health check failed on %s", topoproto.TabletAliasString(tablet.Alias)) case vs.tabletType != shr.Target.TabletType: - err = fmt.Errorf("tablet type has changed from %s to %s, restarting vstream", - vs.tabletType, shr.Target.TabletType) + err = fmt.Errorf("tablet %s type has changed from %s to %s, restarting vstream", + topoproto.TabletAliasString(tablet.Alias), vs.tabletType, shr.Target.TabletType) case shr.RealtimeStats.HealthError != "": err = fmt.Errorf("tablet %s is no longer healthy: %s, restarting vstream", - tablet.Alias, shr.RealtimeStats.HealthError) + topoproto.TabletAliasString(tablet.Alias), shr.RealtimeStats.HealthError) case shr.RealtimeStats.ReplicationLagSeconds > uint32(discovery.GetLowReplicationLag().Seconds()): err = fmt.Errorf("tablet %s has a replication lag of %d seconds which is beyond the value provided in --discovery_low_replication_lag of %s so the tablet is no longer considered healthy, restarting vstream", topoproto.TabletAliasString(tablet.Alias), shr.RealtimeStats.ReplicationLagSeconds, discovery.GetLowReplicationLag()) } if err != nil { + log.Warningf("Tablet state changed: %s, attempting to restart", err) errCh <- err return err } @@ -597,7 +598,6 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha case <-ctx.Done(): return ctx.Err() case streamErr := <-errCh: - log.Warningf("Tablet state changed: %s, attempting to restart", streamErr) return vterrors.New(vtrpcpb.Code_UNAVAILABLE, streamErr.Error()) case <-journalDone: // Unreachable. diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 6eec06a1bac..475475bb043 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -32,12 +32,16 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/sandboxconn" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" @@ -1559,6 +1563,133 @@ func TestKeyspaceHasBeenSharded(t *testing.T) { } } +// TestVStreamManagerHealthcheckResponseHandling tests the handling of healthcheck responses by +// the vstream manager to confirm that we are correctly restarting the vstream when we should. +func TestVStreamManagerHealthcheckResponseHandling(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + // Capture the vstream warning log. Otherwise we need to re-implement the vstream error + // handling in SandboxConn's implementation and then we're not actually testing the + // production code. + logger := logutil.NewMemoryLogger() + log.Warningf = logger.Warningf + + cell := "aa" + ks := "TestVStream" + shard := "0" + tabletType := topodatapb.TabletType_REPLICA + _ = createSandbox(ks) + healthChan := make(chan *discovery.TabletHealth) + hc := discovery.NewFakeHealthCheck(healthChan) + st := getSandboxTopo(ctx, cell, ks, []string{shard}) + vsm := newTestVStreamManager(ctx, hc, st, cell) + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: shard, + }}, + } + source := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, shard, tabletType, true, 0, nil) + tabletAlias := topoproto.TabletAliasString(source.Tablet().Alias) + addTabletToSandboxTopo(t, ctx, st, ks, shard, source.Tablet()) + target := &querypb.Target{ + Cell: cell, + Keyspace: ks, + Shard: shard, + TabletType: tabletType, + } + highLag := uint32(discovery.GetLowReplicationLag().Seconds()) + 1 + + type testcase struct { + name string + hcRes *querypb.StreamHealthResponse + wantErr string + } + testcases := []testcase{ + { + name: "all healthy", // Will hit the context timeout + }, + { + name: "failure", + hcRes: &querypb.StreamHealthResponse{ + TabletAlias: source.Tablet().Alias, + Target: nil, // This is seen as a healthcheck stream failure + }, + wantErr: fmt.Sprintf("health check failed on %s", tabletAlias), + }, + { + name: "tablet type changed", + hcRes: &querypb.StreamHealthResponse{ + TabletAlias: source.Tablet().Alias, + Target: &querypb.Target{ + Cell: cell, + Keyspace: ks, + Shard: shard, + TabletType: topodatapb.TabletType_PRIMARY, + }, + PrimaryTermStartTimestamp: time.Now().Unix(), + RealtimeStats: &querypb.RealtimeStats{}, + }, + wantErr: fmt.Sprintf("tablet %s type has changed from %s to %s", + tabletAlias, tabletType, topodatapb.TabletType_PRIMARY.String()), + }, + { + name: "unhealthy", + hcRes: &querypb.StreamHealthResponse{ + TabletAlias: source.Tablet().Alias, + Target: target, + RealtimeStats: &querypb.RealtimeStats{ + HealthError: "unhealthy", + }, + }, + wantErr: fmt.Sprintf("tablet %s is no longer healthy", tabletAlias), + }, + { + name: "replication lag too high", + hcRes: &querypb.StreamHealthResponse{ + TabletAlias: source.Tablet().Alias, + Target: target, + RealtimeStats: &querypb.RealtimeStats{ + ReplicationLagSeconds: highLag, + }, + }, + wantErr: fmt.Sprintf("%s has a replication lag of %d seconds which is beyond the value provided", + tabletAlias, highLag), + }, + } + + warningIdx := 0 + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + done := make(chan struct{}) + go func() { + sctx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + defer close(done) + // SandboxConn's VStream implementation always waits for the context to timeout. + err := vsm.VStream(sctx, tabletType, vgtid, nil, nil, func(events []*binlogdatapb.VEvent) error { + require.Fail(t, "unexpected event", "Receieved unexpected events: %v", events) + return nil + }) + if tc.wantErr != "" { // Otherwise we simply expect the context to timeout + if len(logger.Events) < warningIdx+1 { + require.Fail(t, "expected vstream error", "vstream did not encounter a healthstream error, expected: %s", tc.wantErr) + } + loggedErr := logger.Events[warningIdx] + warningIdx++ + if loggedErr == nil || !strings.Contains(loggedErr.String(), tc.wantErr) { + require.Fail(t, "unexpected vstream error", "vstream ended with error: %v, which did not contain: %s", err, tc.wantErr) + } + } + }() + if tc.wantErr != "" { + source.SetStreamHealthResponse(tc.hcRes) + } + <-done + }) + } +} + func newTestVStreamManager(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager { gw := NewTabletGateway(ctx, hc, serv, cell) srvResolver := srvtopo.NewResolver(serv, gw, cell) diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index 1f8a4027f8e..4bf45503211 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -133,6 +133,8 @@ type SandboxConn struct { getSchemaResult []SchemaResult parser *sqlparser.Parser + + streamHealthResponse *querypb.StreamHealthResponse } type SchemaResult struct { @@ -475,8 +477,20 @@ func (sbc *SandboxConn) MessageAck(ctx context.Context, target *querypb.Target, // SandboxSQRowCount is the default number of fake splits returned. var SandboxSQRowCount = int64(10) -// StreamHealth always mocks a "healthy" result. +func (sbc *SandboxConn) SetStreamHealthResponse(res *querypb.StreamHealthResponse) { + sbc.mapMu.Lock() + defer sbc.mapMu.Unlock() + sbc.streamHealthResponse = res +} + +// StreamHealth always mocks a "healthy" result by default. If you want to overrid this behavior you +// call SetStreamHealthResponse. func (sbc *SandboxConn) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { + sbc.mapMu.Lock() + defer sbc.mapMu.Unlock() + if sbc.streamHealthResponse != nil { + return callback(sbc.streamHealthResponse) + } return nil }