Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add created timestamp to querypb.StreamHealthResponse #16611

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
12 changes: 8 additions & 4 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ type HealthCheckImpl struct {
subscribers map[chan *TabletHealth]struct{}
// loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted
loadTabletsTrigger chan struct{}
// nowTimeFunc is used to determine the current time.
nowTimeFunc func() time.Time
}

// NewVTGateHealthCheckFilters returns healthcheck filters for vtgate.
Expand Down Expand Up @@ -361,6 +363,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
subscribers: make(map[chan *TabletHealth]struct{}),
cellAliases: make(map[string]string),
loadTabletsTrigger: make(chan struct{}, 1),
nowTimeFunc: func() time.Time { return time.Now() },
}
var topoWatchers []*TopologyWatcher
cells := strings.Split(cellsToWatch, ",")
Expand Down Expand Up @@ -412,10 +415,11 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) {
TabletType: tablet.Type,
}
thc := &tabletHealthCheck{
ctx: ctx,
cancelFunc: cancelFunc,
Tablet: tablet,
Target: target,
ctx: ctx,
cancelFunc: cancelFunc,
nowTimeFunc: hc.nowTimeFunc,
Tablet: tablet,
Target: target,
}

// add to our datastore
Expand Down
41 changes: 36 additions & 5 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/topo"
Expand Down Expand Up @@ -147,6 +148,8 @@ func TestHealthCheck(t *testing.T) {
tablet.Type = topodatapb.TabletType_REPLICA
input := make(chan *querypb.StreamHealthResponse)
conn := createFakeConn(tablet, input)
now := time.Now()
hc.nowTimeFunc = func() time.Time { return now }

// create a channel and subscribe to healthcheck
resultChan := hc.Subscribe()
Expand All @@ -161,6 +164,7 @@ func TestHealthCheck(t *testing.T) {
Serving: false,
Stats: nil,
PrimaryTermStartTime: 0,
Timestamp: protoutil.TimeToProto(now),
}
result := <-resultChan
mustMatch(t, want, result, "Wrong TabletHealth data")
Expand All @@ -172,6 +176,7 @@ func TestHealthCheck(t *testing.T) {

PrimaryTermStartTimestamp: 0,
RealtimeStats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.5},
Timestamp: protoutil.TimeToProto(now),
}
input <- shr
result = <-resultChan
Expand All @@ -181,6 +186,7 @@ func TestHealthCheck(t *testing.T) {
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.5},
PrimaryTermStartTime: 0,
Timestamp: protoutil.TimeToProto(now),
}
// create a context with timeout and select on it and channel
mustMatch(t, want, result, "Wrong TabletHealth data")
Expand All @@ -195,6 +201,7 @@ func TestHealthCheck(t *testing.T) {
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.5},
PrimaryTermStartTime: 0,
Timestamp: protoutil.TimeToProto(now),
}},
}}
// we can't use assert.Equal here because of the special way we want to compare equality
Expand All @@ -208,6 +215,7 @@ func TestHealthCheck(t *testing.T) {
Serving: true,
PrimaryTermStartTimestamp: 10,
RealtimeStats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
Timestamp: protoutil.TimeToProto(now),
}
want = &TabletHealth{
Tablet: tablet,
Expand All @@ -220,6 +228,7 @@ func TestHealthCheck(t *testing.T) {
Conn: conn,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
PrimaryTermStartTime: 10,
Timestamp: protoutil.TimeToProto(now),
}
input <- shr
result = <-resultChan
Expand All @@ -237,13 +246,15 @@ func TestHealthCheck(t *testing.T) {
Serving: false,
PrimaryTermStartTimestamp: 0,
RealtimeStats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.3},
Timestamp: protoutil.TimeToProto(now),
}
want = &TabletHealth{
Tablet: tablet,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: false,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.3},
PrimaryTermStartTime: 0,
Timestamp: protoutil.TimeToProto(now),
}
input <- shr
result = <-resultChan
Expand All @@ -257,6 +268,7 @@ func TestHealthCheck(t *testing.T) {
Serving: true,
PrimaryTermStartTimestamp: 0,
RealtimeStats: &querypb.RealtimeStats{HealthError: "some error", ReplicationLagSeconds: 1, CpuUsage: 0.3},
Timestamp: protoutil.TimeToProto(now),
}
want = &TabletHealth{
Tablet: tablet,
Expand All @@ -265,6 +277,7 @@ func TestHealthCheck(t *testing.T) {
Stats: &querypb.RealtimeStats{HealthError: "some error", ReplicationLagSeconds: 1, CpuUsage: 0.3},
PrimaryTermStartTime: 0,
LastError: fmt.Errorf("vttablet error: some error"),
Timestamp: protoutil.TimeToProto(now),
}
input <- shr
result = <-resultChan
Expand All @@ -291,6 +304,8 @@ func TestHealthCheckStreamError(t *testing.T) {
resultChan := hc.Subscribe()
fc := createFakeConn(tablet, input)
fc.errCh = make(chan error)
now := time.Now()
hc.nowTimeFunc = func() time.Time { return now }
hc.AddTablet(tablet)

// Immediately after AddTablet() there will be the first notification.
Expand All @@ -299,6 +314,7 @@ func TestHealthCheckStreamError(t *testing.T) {
Target: &querypb.Target{Keyspace: "k", Shard: "s"},
Serving: false,
PrimaryTermStartTime: 0,
Timestamp: protoutil.TimeToProto(now),
}
result := <-resultChan
mustMatch(t, want, result, "Wrong TabletHealth data")
Expand All @@ -317,6 +333,7 @@ func TestHealthCheckStreamError(t *testing.T) {
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
PrimaryTermStartTime: 0,
Timestamp: protoutil.TimeToProto(now),
}
input <- shr
result = <-resultChan
Expand Down Expand Up @@ -355,6 +372,8 @@ func TestHealthCheckErrorOnPrimary(t *testing.T) {
resultChan := hc.Subscribe()
fc := createFakeConn(tablet, input)
fc.errCh = make(chan error)
now := time.Now()
hc.nowTimeFunc = func() time.Time { return now }
hc.AddTablet(tablet)

// Immediately after AddTablet() there will be the first notification.
Expand All @@ -363,6 +382,7 @@ func TestHealthCheckErrorOnPrimary(t *testing.T) {
Target: &querypb.Target{Keyspace: "k", Shard: "s"},
Serving: false,
PrimaryTermStartTime: 0,
Timestamp: protoutil.TimeToProto(now),
}
result := <-resultChan
mustMatch(t, want, result, "Wrong TabletHealth data")
Expand All @@ -381,6 +401,7 @@ func TestHealthCheckErrorOnPrimary(t *testing.T) {
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
PrimaryTermStartTime: 10,
Timestamp: protoutil.TimeToProto(now),
}
input <- shr
result = <-resultChan
Expand Down Expand Up @@ -426,6 +447,8 @@ func TestHealthCheckErrorOnPrimaryAfterExternalReparent(t *testing.T) {
tablet2.Type = topodatapb.TabletType_REPLICA
input2 := make(chan *querypb.StreamHealthResponse)
createFakeConn(tablet2, input2)
now := time.Now()
hc.nowTimeFunc = func() time.Time { return now }
hc.AddTablet(tablet2)
<-resultChan

Expand Down Expand Up @@ -454,6 +477,7 @@ func TestHealthCheckErrorOnPrimaryAfterExternalReparent(t *testing.T) {
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 0, CpuUsage: 0.2},
PrimaryTermStartTime: 10,
Timestamp: protoutil.TimeToProto(now),
}}
a := hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_PRIMARY})
mustMatch(t, health, a, "unexpected result")
Expand All @@ -474,6 +498,7 @@ func TestHealthCheckErrorOnPrimaryAfterExternalReparent(t *testing.T) {
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 0, CpuUsage: 0.2},
PrimaryTermStartTime: 20,
Timestamp: protoutil.TimeToProto(now),
}}
a = hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_PRIMARY})
mustMatch(t, health, a, "unexpected result")
Expand Down Expand Up @@ -544,6 +569,9 @@ func TestHealthCheckCloseWaitsForGoRoutines(t *testing.T) {
createFakeConn(tablet, input)
resultChan := hc.Subscribe()

now := time.Now()
hc.nowTimeFunc = func() time.Time { return now }

hc.AddTablet(tablet)

// Immediately after AddTablet() there will be the first notification.
Expand All @@ -565,12 +593,12 @@ func TestHealthCheckCloseWaitsForGoRoutines(t *testing.T) {
RealtimeStats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
}
want = &TabletHealth{
Tablet: tablet,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},

Tablet: tablet,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
PrimaryTermStartTime: 0,
Timestamp: protoutil.TimeToProto(now),
}
input <- shr
result = <-resultChan
Expand Down Expand Up @@ -610,6 +638,8 @@ func TestHealthCheckTimeout(t *testing.T) {
input := make(chan *querypb.StreamHealthResponse)
fc := createFakeConn(tablet, input)
resultChan := hc.Subscribe()
now := time.Now()
hc.nowTimeFunc = func() time.Time { return now }
hc.AddTablet(tablet)
// Immediately after AddTablet() there will be the first notification.
want := &TabletHealth{
Expand All @@ -635,6 +665,7 @@ func TestHealthCheckTimeout(t *testing.T) {
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
PrimaryTermStartTime: 0,
Timestamp: protoutil.TimeToProto(now),
}
input <- shr
result = <-resultChan
Expand Down
12 changes: 9 additions & 3 deletions go/vt/discovery/tablet_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ import (
"bytes"
"encoding/json"
"strings"

"vitess.io/vitess/go/vt/vttablet/queryservice"
"time"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vttime"
"vitess.io/vitess/go/vt/vttablet/queryservice"
)

// TabletHealth represents simple tablet health data that is returned to users of healthcheck.
Expand All @@ -40,6 +42,7 @@ type TabletHealth struct {
PrimaryTermStartTime int64
LastError error
Serving bool
Timestamp *vttime.Time
}

func (th *TabletHealth) MarshalJSON() ([]byte, error) {
Expand All @@ -50,13 +53,15 @@ func (th *TabletHealth) MarshalJSON() ([]byte, error) {
PrimaryTermStartTime int64
Stats *query.RealtimeStats
LastError error
Timestamp time.Time
}{
Tablet: th.Tablet,
Target: th.Target,
Serving: th.Serving,
PrimaryTermStartTime: th.PrimaryTermStartTime,
Stats: th.Stats,
LastError: th.LastError,
Timestamp: protoutil.TimeFromProto(th.Timestamp),
})
}

Expand All @@ -69,7 +74,8 @@ func (th *TabletHealth) DeepEqual(other *TabletHealth) bool {
th.PrimaryTermStartTime == other.PrimaryTermStartTime &&
proto.Equal(th.Stats, other.Stats) &&
((th.LastError == nil && other.LastError == nil) ||
(th.LastError != nil && other.LastError != nil && th.LastError.Error() == other.LastError.Error()))
(th.LastError != nil && other.LastError != nil && th.LastError.Error() == other.LastError.Error())) &&
proto.Equal(th.Timestamp, other.Timestamp)
}

// GetTabletHostPort formats a tablet host port address.
Expand Down
21 changes: 16 additions & 5 deletions go/vt/discovery/tablet_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,20 @@ import (
"sync/atomic"
"time"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/proto/vttime"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
)

// tabletHealthCheck maintains the health status of a tablet. A map of this
Expand Down Expand Up @@ -68,9 +69,13 @@ type tabletHealthCheck struct {
// LastError is the error we last saw when trying to get the
// tablet's healthcheck.
LastError error
// Timestamp represents the time the healthcheck data was produced.
Timestamp *vttime.Time
// possibly delete both these
loggedServingState bool
lastResponseTimestamp time.Time // timestamp of the last healthcheck response
// nowTimeFunc provides the current time
nowTimeFunc func() time.Time
}

// String is defined because we want to print a []*tabletHealthCheck array nicely.
Expand All @@ -93,6 +98,7 @@ func (thc *tabletHealthCheck) SimpleCopy() *TabletHealth {
LastError: thc.LastError,
PrimaryTermStartTime: thc.PrimaryTermStartTime,
Serving: thc.Serving,
Timestamp: thc.Timestamp,
}
}

Expand All @@ -119,6 +125,10 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) {
thc.loggedServingState = true
}
thc.Serving = serving

if thc.nowTimeFunc != nil {
thc.Timestamp = protoutil.TimeToProto(thc.nowTimeFunc())
}
}

// stream streams healthcheck responses to callback.
Expand Down Expand Up @@ -192,6 +202,7 @@ func (thc *tabletHealthCheck) processResponse(hc *HealthCheckImpl, shr *query.St
thc.PrimaryTermStartTime = shr.PrimaryTermStartTimestamp
thc.Stats = shr.RealtimeStats
thc.LastError = healthErr
thc.Timestamp = shr.Timestamp
reason := "healthCheck update"
if healthErr != nil {
reason = "healthCheck update error: " + healthErr.Error()
Expand Down
Loading