Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Take replication lag into account in VStreamManager healthcheck result processing #15761

Merged
merged 6 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
Expand Down Expand Up @@ -550,18 +551,23 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
go func() {
_ = tabletConn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error {
var err error
if ctx.Err() != nil {
switch {
case ctx.Err() != nil:
err = fmt.Errorf("context has ended")
} else if shr == nil || shr.RealtimeStats == nil || shr.Target == nil {
err = fmt.Errorf("health check failed")
} else if vs.tabletType != shr.Target.TabletType {
err = fmt.Errorf("tablet type has changed from %s to %s, restarting vstream",
vs.tabletType, shr.Target.TabletType)
} else if shr.RealtimeStats.HealthError != "" {
case shr == nil || shr.RealtimeStats == nil || shr.Target == nil:
err = fmt.Errorf("health check failed on %s", topoproto.TabletAliasString(tablet.Alias))
case vs.tabletType != shr.Target.TabletType:
err = fmt.Errorf("tablet %s type has changed from %s to %s, restarting vstream",
topoproto.TabletAliasString(tablet.Alias), vs.tabletType, shr.Target.TabletType)
case shr.RealtimeStats.HealthError != "":
err = fmt.Errorf("tablet %s is no longer healthy: %s, restarting vstream",
tablet.Alias, shr.RealtimeStats.HealthError)
topoproto.TabletAliasString(tablet.Alias), shr.RealtimeStats.HealthError)
case shr.RealtimeStats.ReplicationLagSeconds > uint32(discovery.GetLowReplicationLag().Seconds()):
err = fmt.Errorf("tablet %s has a replication lag of %d seconds which is beyond the value provided in --discovery_low_replication_lag of %s so the tablet is no longer considered healthy, restarting vstream",
topoproto.TabletAliasString(tablet.Alias), shr.RealtimeStats.ReplicationLagSeconds, discovery.GetLowReplicationLag())
}
if err != nil {
log.Warningf("Tablet state changed: %s, attempting to restart", err)
errCh <- err
return err
}
Expand Down Expand Up @@ -592,7 +598,6 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
case <-ctx.Done():
return ctx.Err()
case streamErr := <-errCh:
log.Warningf("Tablet state changed: %s, attempting to restart", streamErr)
return vterrors.New(vtrpcpb.Code_UNAVAILABLE, streamErr.Error())
case <-journalDone:
// Unreachable.
Expand Down
125 changes: 125 additions & 0 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@ import (
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/sandboxconn"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
Expand Down Expand Up @@ -1559,6 +1563,127 @@ func TestKeyspaceHasBeenSharded(t *testing.T) {
}
}

// TestVStreamManagerHealthCheckResponseHandling tests the handling of healthcheck responses by
// the vstream manager to confirm that we are correctly restarting the vstream when we should.
func TestVStreamManagerHealthCheckResponseHandling(t *testing.T) {
ctx := utils.LeakCheckContext(t)

// Capture the vstream warning log. Otherwise we need to re-implement the vstream error
// handling in SandboxConn's implementation and then we're not actually testing the
// production code.
logger := logutil.NewMemoryLogger()
log.Warningf = logger.Warningf

cell := "aa"
ks := "TestVStream"
shard := "0"
tabletType := topodatapb.TabletType_REPLICA
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{shard})
vsm := newTestVStreamManager(ctx, hc, st, cell)
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: shard,
}},
}
source := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, shard, tabletType, true, 0, nil)
tabletAlias := topoproto.TabletAliasString(source.Tablet().Alias)
addTabletToSandboxTopo(t, ctx, st, ks, shard, source.Tablet())
target := &querypb.Target{
Cell: cell,
Keyspace: ks,
Shard: shard,
TabletType: tabletType,
}
highLag := uint32(discovery.GetLowReplicationLag().Seconds()) + 1

type testcase struct {
name string
hcRes *querypb.StreamHealthResponse
wantErr string
}
testcases := []testcase{
{
name: "all healthy", // Will hit the context timeout
},
{
name: "failure",
hcRes: &querypb.StreamHealthResponse{
TabletAlias: source.Tablet().Alias,
Target: nil, // This is seen as a healthcheck stream failure
},
wantErr: fmt.Sprintf("health check failed on %s", tabletAlias),
},
{
name: "tablet type changed",
hcRes: &querypb.StreamHealthResponse{
TabletAlias: source.Tablet().Alias,
Target: &querypb.Target{
Cell: cell,
Keyspace: ks,
Shard: shard,
TabletType: topodatapb.TabletType_PRIMARY,
},
PrimaryTermStartTimestamp: time.Now().Unix(),
RealtimeStats: &querypb.RealtimeStats{},
},
wantErr: fmt.Sprintf("tablet %s type has changed from %s to %s",
tabletAlias, tabletType, topodatapb.TabletType_PRIMARY.String()),
},
{
name: "unhealthy",
hcRes: &querypb.StreamHealthResponse{
TabletAlias: source.Tablet().Alias,
Target: target,
RealtimeStats: &querypb.RealtimeStats{
HealthError: "unhealthy",
},
},
wantErr: fmt.Sprintf("tablet %s is no longer healthy", tabletAlias),
},
{
name: "replication lag too high",
hcRes: &querypb.StreamHealthResponse{
TabletAlias: source.Tablet().Alias,
Target: target,
RealtimeStats: &querypb.RealtimeStats{
ReplicationLagSeconds: highLag,
},
},
wantErr: fmt.Sprintf("%s has a replication lag of %d seconds which is beyond the value provided",
tabletAlias, highLag),
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
done := make(chan struct{})
go func() {
sctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
defer close(done)
// SandboxConn's VStream implementation always waits for the context to timeout.
err := vsm.VStream(sctx, tabletType, vgtid, nil, nil, func(events []*binlogdatapb.VEvent) error {
require.Fail(t, "unexpected event", "Received unexpected events: %v", events)
return nil
})
if tc.wantErr != "" { // Otherwise we simply expect the context to timeout
if !strings.Contains(logger.String(), tc.wantErr) {
require.Fail(t, "unexpected vstream error", "vstream ended with error: %v, which did not contain: %s", err, tc.wantErr)
}
}
}()
if tc.wantErr != "" {
source.SetStreamHealthResponse(tc.hcRes)
}
<-done
logger.Clear()
})
}
}

func newTestVStreamManager(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager {
gw := NewTabletGateway(ctx, hc, serv, cell)
srvResolver := srvtopo.NewResolver(serv, gw, cell)
Expand Down
17 changes: 16 additions & 1 deletion go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ type SandboxConn struct {
getSchemaResult []SchemaResult

parser *sqlparser.Parser

streamHealthResponse *querypb.StreamHealthResponse
}

type SchemaResult struct {
Expand Down Expand Up @@ -475,8 +477,21 @@ func (sbc *SandboxConn) MessageAck(ctx context.Context, target *querypb.Target,
// SandboxSQRowCount is the default number of fake splits returned.
var SandboxSQRowCount = int64(10)

// StreamHealth always mocks a "healthy" result.
// SetStreamHealthResponse sets the StreamHealthResponse to be returned in StreamHealth.
func (sbc *SandboxConn) SetStreamHealthResponse(res *querypb.StreamHealthResponse) {
sbc.mapMu.Lock()
defer sbc.mapMu.Unlock()
sbc.streamHealthResponse = res
}

// StreamHealth always mocks a "healthy" result by default. If you want to override this behavior you
// can call SetStreamHealthResponse.
func (sbc *SandboxConn) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error {
sbc.mapMu.Lock()
defer sbc.mapMu.Unlock()
if sbc.streamHealthResponse != nil {
return callback(sbc.streamHealthResponse)
}
return nil
}

Expand Down
Loading