Skip to content

Commit

Permalink
Add a specialized function to recompute the health of primary tablets.
Browse files Browse the repository at this point in the history
This ensures that we can never end up in a situation where there is more
than one healthy primary tablet for a given shard.

Signed-off-by: Arthur Schreiber <[email protected]>

Co-authored-by: Daniel Joos <[email protected]>
  • Loading branch information
arthurschreiber and danieljoos committed Jul 15, 2024
1 parent 8515324 commit 9894a65
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 2 deletions.
35 changes: 33 additions & 2 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"encoding/json"
"fmt"
"hash/crc32"
"math"
"net/http"
"sort"
"strings"
Expand Down Expand Up @@ -448,10 +449,16 @@ func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) {
continue
}
delete(ths, tabletAlias)
// delete from healthy list

healthy, ok := hc.healthy[key]
if ok && len(healthy) > 0 {
hc.recomputeHealthy(key)
if tabletType == topodata.TabletType_PRIMARY {
// If tablet type is primary, we should only have one tablet in the healthy list.
hc.recomputeHealthyPrimary(key)
} else {
// Simply recompute the list of healthy tablets for all other tablet types.
hc.recomputeHealthy(key)
}
}
}
}()
Expand Down Expand Up @@ -579,6 +586,25 @@ func (hc *HealthCheckImpl) recomputeHealthy(key KeyspaceShardTabletType) {
hc.healthy[key] = FilterStatsByReplicationLag(allArray)
}

// Recompute the healthy primary tablet for the given key.
func (hc *HealthCheckImpl) recomputeHealthyPrimary(key KeyspaceShardTabletType) {
highestPrimaryTermStartTime := int64(math.MinInt64)
var newestPrimary *TabletHealth

for _, s := range hc.healthData[key] {
if s.PrimaryTermStartTime >= highestPrimaryTermStartTime {
highestPrimaryTermStartTime = s.PrimaryTermStartTime
newestPrimary = s
}
}

if newestPrimary != nil {
hc.healthy[key] = []*TabletHealth{newestPrimary}
} else {
hc.healthy[key] = []*TabletHealth{}
}
}

// Subscribe adds a listener. Used by vtgate buffer to learn about primary changes.
func (hc *HealthCheckImpl) Subscribe() chan *TabletHealth {
hc.subMu.Lock()
Expand Down Expand Up @@ -680,6 +706,11 @@ func (hc *HealthCheckImpl) GetHealthyTabletStats(target *query.Target) []*Tablet
var result []*TabletHealth
hc.mu.Lock()
defer hc.mu.Unlock()

if (target.TabletType == topodata.TabletType_PRIMARY) && len(hc.healthy[KeyFromTarget(target)]) > 1 {
log.Warningf("[BUG] GetHealthyTabletStats called for primary tablet type, but returning more than one primary tablet")
}

return append(result, hc.healthy[KeyFromTarget(target)]...)
}

Expand Down
119 changes: 119 additions & 0 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,125 @@ func TestRemoveTablet(t *testing.T) {
assert.Empty(t, a, "wrong result, expected empty list")
}

// When an external primary failover is performed,
// the demoted primary will advertise itself as a `PRIMARY`
// tablet until it recognizes that it was demoted,
// and until all in-flight operations have either finished
// (successfully or unsuccessfully, see `--shutdown_grace_period` flag).
//
// During this time, operations like `RemoveTablet` should not lead
// to multiple tablets becoming valid targets for `PRIMARY`.
func TestRemoveTabletDuringExternalReparenting(t *testing.T) {
// reset error counters
hcErrorCounters.ResetAll()
ts := memorytopo.NewServer("cell")
defer ts.Close()
hc := createTestHc(ts)
// close healthcheck
defer hc.Close()

firstTablet := createTestTablet(0, "cell", "a")
firstTablet.Type = topodatapb.TabletType_PRIMARY

secondTablet := createTestTablet(1, "cell", "b")
secondTablet.Type = topodatapb.TabletType_REPLICA

thirdTablet := createTestTablet(2, "cell", "c")
thirdTablet.Type = topodatapb.TabletType_REPLICA

firstTabletHealthStream := make(chan *querypb.StreamHealthResponse)
firstTabletConn := createFakeConn(firstTablet, firstTabletHealthStream)
firstTabletConn.errCh = make(chan error)

secondTabletHealthStream := make(chan *querypb.StreamHealthResponse)
secondTabletConn := createFakeConn(secondTablet, secondTabletHealthStream)
secondTabletConn.errCh = make(chan error)

thirdTabletHealthStream := make(chan *querypb.StreamHealthResponse)
thirdTabletConn := createFakeConn(thirdTablet, thirdTabletHealthStream)
thirdTabletConn.errCh = make(chan error)

resultChan := hc.Subscribe()

hc.AddTablet(firstTablet)
hc.AddTablet(secondTablet)
hc.AddTablet(thirdTablet)

<-resultChan
<-resultChan

firstTabletPrimaryTermStartTimestamp := time.Now().Unix() - 10

firstTabletHealthStream <- &querypb.StreamHealthResponse{
TabletAlias: firstTablet.Alias,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_PRIMARY},
Serving: true,

TabletExternallyReparentedTimestamp: firstTabletPrimaryTermStartTimestamp,
RealtimeStats: &querypb.RealtimeStats{ReplicationLagSeconds: 0, CpuUsage: 0.5},
}

secondTabletHealthStream <- &querypb.StreamHealthResponse{
TabletAlias: secondTablet.Alias,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,

TabletExternallyReparentedTimestamp: 0,
RealtimeStats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.5},
}

thirdTabletHealthStream <- &querypb.StreamHealthResponse{
TabletAlias: thirdTablet.Alias,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,

TabletExternallyReparentedTimestamp: 0,
RealtimeStats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.5},
}

<-resultChan
<-resultChan
<-resultChan

secondTabletPrimaryTermStartTimestamp := time.Now().Unix()

// Simulate a failover
firstTabletHealthStream <- &querypb.StreamHealthResponse{
TabletAlias: firstTablet.Alias,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_PRIMARY},
Serving: true,

TabletExternallyReparentedTimestamp: firstTabletPrimaryTermStartTimestamp,
RealtimeStats: &querypb.RealtimeStats{ReplicationLagSeconds: 0, CpuUsage: 0.5},
}

secondTabletHealthStream <- &querypb.StreamHealthResponse{
TabletAlias: secondTablet.Alias,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_PRIMARY},
Serving: true,

TabletExternallyReparentedTimestamp: secondTabletPrimaryTermStartTimestamp,
RealtimeStats: &querypb.RealtimeStats{ReplicationLagSeconds: 0, CpuUsage: 0.5},
}

<-resultChan
<-resultChan

hc.RemoveTablet(thirdTablet)

// `secondTablet` should be the primary now
expectedTabletStats := []*TabletHealth{{
Tablet: secondTablet,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_PRIMARY},
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 0, CpuUsage: 0.5},
PrimaryTermStartTime: secondTabletPrimaryTermStartTimestamp,
}}

actualTabletStats := hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_PRIMARY})
mustMatch(t, expectedTabletStats, actualTabletStats, "unexpected result")
}

// TestGetHealthyTablets tests the functionality of GetHealthyTabletStats.
func TestGetHealthyTablets(t *testing.T) {
ts := memorytopo.NewServer("cell")
Expand Down

0 comments on commit 9894a65

Please sign in to comment.