Skip to content

Commit

Permalink
Add unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Apr 21, 2024
1 parent f86320e commit 9c64cbd
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 6 deletions.
10 changes: 5 additions & 5 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,18 +555,19 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
case ctx.Err() != nil:
err = fmt.Errorf("context has ended")
case shr == nil || shr.RealtimeStats == nil || shr.Target == nil:
err = fmt.Errorf("health check failed")
err = fmt.Errorf("health check failed on %s", topoproto.TabletAliasString(tablet.Alias))
case vs.tabletType != shr.Target.TabletType:
err = fmt.Errorf("tablet type has changed from %s to %s, restarting vstream",
vs.tabletType, shr.Target.TabletType)
err = fmt.Errorf("tablet %s type has changed from %s to %s, restarting vstream",
topoproto.TabletAliasString(tablet.Alias), vs.tabletType, shr.Target.TabletType)
case shr.RealtimeStats.HealthError != "":
err = fmt.Errorf("tablet %s is no longer healthy: %s, restarting vstream",
tablet.Alias, shr.RealtimeStats.HealthError)
topoproto.TabletAliasString(tablet.Alias), shr.RealtimeStats.HealthError)
case shr.RealtimeStats.ReplicationLagSeconds > uint32(discovery.GetLowReplicationLag().Seconds()):
err = fmt.Errorf("tablet %s has a replication lag of %d seconds which is beyond the value provided in --discovery_low_replication_lag of %s so the tablet is no longer considered healthy, restarting vstream",
topoproto.TabletAliasString(tablet.Alias), shr.RealtimeStats.ReplicationLagSeconds, discovery.GetLowReplicationLag())
}
if err != nil {
log.Warningf("Tablet state changed: %s, attempting to restart", err)
errCh <- err
return err
}
Expand Down Expand Up @@ -597,7 +598,6 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
case <-ctx.Done():
return ctx.Err()
case streamErr := <-errCh:
log.Warningf("Tablet state changed: %s, attempting to restart", streamErr)
return vterrors.New(vtrpcpb.Code_UNAVAILABLE, streamErr.Error())
case <-journalDone:
// Unreachable.
Expand Down
131 changes: 131 additions & 0 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@ import (
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/sandboxconn"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
Expand Down Expand Up @@ -1559,6 +1563,133 @@ func TestKeyspaceHasBeenSharded(t *testing.T) {
}
}

// TestVStreamManagerHealthcheckResponseHandling tests the handling of healthcheck responses by
// the vstream manager to confirm that we are correctly restarting the vstream when we should.
func TestVStreamManagerHealthcheckResponseHandling(t *testing.T) {
ctx := utils.LeakCheckContext(t)

// Capture the vstream warning log. Otherwise we need to re-implement the vstream error
// handling in SandboxConn's implementation and then we're not actually testing the
// production code.
logger := logutil.NewMemoryLogger()
log.Warningf = logger.Warningf

cell := "aa"
ks := "TestVStream"
shard := "0"
tabletType := topodatapb.TabletType_REPLICA
_ = createSandbox(ks)
healthChan := make(chan *discovery.TabletHealth)
hc := discovery.NewFakeHealthCheck(healthChan)
st := getSandboxTopo(ctx, cell, ks, []string{shard})
vsm := newTestVStreamManager(ctx, hc, st, cell)
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: shard,
}},
}
source := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, shard, tabletType, true, 0, nil)
tabletAlias := topoproto.TabletAliasString(source.Tablet().Alias)
addTabletToSandboxTopo(t, ctx, st, ks, shard, source.Tablet())
target := &querypb.Target{
Cell: cell,
Keyspace: ks,
Shard: shard,
TabletType: tabletType,
}
highLag := uint32(discovery.GetLowReplicationLag().Seconds()) + 1

type testcase struct {
name string
hcRes *querypb.StreamHealthResponse
wantErr string
}
testcases := []testcase{
{
name: "all healthy", // Will hit the context timeout
},
{
name: "failure",
hcRes: &querypb.StreamHealthResponse{
TabletAlias: source.Tablet().Alias,
Target: nil, // This is seen as a healthcheck stream failure
},
wantErr: fmt.Sprintf("health check failed on %s", tabletAlias),
},
{
name: "tablet type changed",
hcRes: &querypb.StreamHealthResponse{
TabletAlias: source.Tablet().Alias,
Target: &querypb.Target{
Cell: cell,
Keyspace: ks,
Shard: shard,
TabletType: topodatapb.TabletType_PRIMARY,
},
PrimaryTermStartTimestamp: time.Now().Unix(),
RealtimeStats: &querypb.RealtimeStats{},
},
wantErr: fmt.Sprintf("tablet %s type has changed from %s to %s",
tabletAlias, tabletType, topodatapb.TabletType_PRIMARY.String()),
},
{
name: "unhealthy",
hcRes: &querypb.StreamHealthResponse{
TabletAlias: source.Tablet().Alias,
Target: target,
RealtimeStats: &querypb.RealtimeStats{
HealthError: "unhealthy",
},
},
wantErr: fmt.Sprintf("tablet %s is no longer healthy", tabletAlias),
},
{
name: "replication lag too high",
hcRes: &querypb.StreamHealthResponse{
TabletAlias: source.Tablet().Alias,
Target: target,
RealtimeStats: &querypb.RealtimeStats{
ReplicationLagSeconds: highLag,
},
},
wantErr: fmt.Sprintf("%s has a replication lag of %d seconds which is beyond the value provided",
tabletAlias, highLag),
},
}

warningIdx := 0
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
done := make(chan struct{})
go func() {
sctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
defer close(done)
// SandboxConn's VStream implementation always waits for the context to timeout.
err := vsm.VStream(sctx, tabletType, vgtid, nil, nil, func(events []*binlogdatapb.VEvent) error {
require.Fail(t, "unexpected event", "Receieved unexpected events: %v", events)
return nil
})
if tc.wantErr != "" { // Otherwise we simply expect the context to timeout
if len(logger.Events) < warningIdx+1 {
require.Fail(t, "expected vstream error", "vstream did not encounter a healthstream error, expected: %s", tc.wantErr)
}
loggedErr := logger.Events[warningIdx]
warningIdx++
if loggedErr == nil || !strings.Contains(loggedErr.String(), tc.wantErr) {
require.Fail(t, "unexpected vstream error", "vstream ended with error: %v, which did not contain: %s", err, tc.wantErr)
}
}
}()
if tc.wantErr != "" {
source.SetStreamHealthResponse(tc.hcRes)
}
<-done
})
}
}

func newTestVStreamManager(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager {
gw := NewTabletGateway(ctx, hc, serv, cell)
srvResolver := srvtopo.NewResolver(serv, gw, cell)
Expand Down
16 changes: 15 additions & 1 deletion go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ type SandboxConn struct {
getSchemaResult []SchemaResult

parser *sqlparser.Parser

streamHealthResponse *querypb.StreamHealthResponse
}

type SchemaResult struct {
Expand Down Expand Up @@ -475,8 +477,20 @@ func (sbc *SandboxConn) MessageAck(ctx context.Context, target *querypb.Target,
// SandboxSQRowCount is the default number of fake splits returned.
var SandboxSQRowCount = int64(10)

// StreamHealth always mocks a "healthy" result.
func (sbc *SandboxConn) SetStreamHealthResponse(res *querypb.StreamHealthResponse) {
sbc.mapMu.Lock()
defer sbc.mapMu.Unlock()
sbc.streamHealthResponse = res
}

// StreamHealth always mocks a "healthy" result by default. If you want to overrid this behavior you
// call SetStreamHealthResponse.
func (sbc *SandboxConn) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error {
sbc.mapMu.Lock()
defer sbc.mapMu.Unlock()
if sbc.streamHealthResponse != nil {
return callback(sbc.streamHealthResponse)
}
return nil
}

Expand Down

0 comments on commit 9c64cbd

Please sign in to comment.