Skip to content

Commit

Permalink
Cherry-pick 495de69 with conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vitess-bot[bot] committed Apr 22, 2024
1 parent 8aea612 commit 4143178
Show file tree
Hide file tree
Showing 3 changed files with 477 additions and 9 deletions.
34 changes: 25 additions & 9 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ import (

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
<<<<<<< HEAD
=======
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
>>>>>>> 495de697cd (VReplication: Take replication lag into account in VStreamManager healthcheck result processing (#15761))
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
Expand Down Expand Up @@ -521,18 +533,23 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
go func() {
_ = tabletConn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error {
var err error
if ctx.Err() != nil {
switch {
case ctx.Err() != nil:
err = fmt.Errorf("context has ended")
} else if shr == nil || shr.RealtimeStats == nil || shr.Target == nil {
err = fmt.Errorf("health check failed")
} else if vs.tabletType != shr.Target.TabletType {
err = fmt.Errorf("tablet type has changed from %s to %s, restarting vstream",
vs.tabletType, shr.Target.TabletType)
} else if shr.RealtimeStats.HealthError != "" {
case shr == nil || shr.RealtimeStats == nil || shr.Target == nil:
err = fmt.Errorf("health check failed on %s", topoproto.TabletAliasString(tablet.Alias))
case vs.tabletType != shr.Target.TabletType:
err = fmt.Errorf("tablet %s type has changed from %s to %s, restarting vstream",
topoproto.TabletAliasString(tablet.Alias), vs.tabletType, shr.Target.TabletType)
case shr.RealtimeStats.HealthError != "":
err = fmt.Errorf("tablet %s is no longer healthy: %s, restarting vstream",
tablet.Alias, shr.RealtimeStats.HealthError)
topoproto.TabletAliasString(tablet.Alias), shr.RealtimeStats.HealthError)
case shr.RealtimeStats.ReplicationLagSeconds > uint32(discovery.GetLowReplicationLag().Seconds()):
err = fmt.Errorf("tablet %s has a replication lag of %d seconds which is beyond the value provided in --discovery_low_replication_lag of %s so the tablet is no longer considered healthy, restarting vstream",
topoproto.TabletAliasString(tablet.Alias), shr.RealtimeStats.ReplicationLagSeconds, discovery.GetLowReplicationLag())
}
if err != nil {
log.Warningf("Tablet state changed: %s, attempting to restart", err)
errCh <- err
return err
}
Expand Down Expand Up @@ -563,7 +580,6 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
case <-ctx.Done():
return ctx.Err()
case streamErr := <-errCh:
log.Warningf("Tablet state changed: %s, attempting to restart", streamErr)
return vterrors.New(vtrpcpb.Code_UNAVAILABLE, streamErr.Error())
case <-journalDone:
// Unreachable.
Expand Down
Loading

0 comments on commit 4143178

Please sign in to comment.