Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve VTOrc startup flow #15315

Merged
merged 4 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions go/test/endtoend/vtorc/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,18 @@ func TestAPIEndpoints(t *testing.T) {
utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 2, 1, nil, cluster.VTOrcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
RecoveryPeriodBlockSeconds: 5,
// The default topo refresh time is 3 seconds. We are intentionally making it slower for the test, so that we have time to verify
// the /debug/health output before and after the first refresh runs.
TopologyRefreshSeconds: 10,
}, 1, "")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
vtorc := clusterInfo.ClusterInstance.VTOrcProcesses[0]
// Call API with retry to ensure VTOrc is up
status, resp := utils.MakeAPICallRetry(t, vtorc, "/debug/health", func(code int, response string) bool {
return code == 0
return code != 200
})
// When VTOrc is up and hasn't run the topo-refresh, is should be healthy but HasDiscovered should be false.
assert.Equal(t, 500, status)
// Verify when VTOrc is healthy, it has also run the first discovery.
assert.Equal(t, 200, status)
assert.Contains(t, resp, `"Healthy": true,`)
assert.Contains(t, resp, `"DiscoveredOnce": false`)
assert.Contains(t, resp, `"DiscoveredOnce": true`)

// find primary from topo
primary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0)
Expand Down
11 changes: 10 additions & 1 deletion go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
"vitess.io/vitess/go/vt/vtorc/config"
"vitess.io/vitess/go/vt/vtorc/db"
"vitess.io/vitess/go/vt/vtorc/inst"
"vitess.io/vitess/go/vt/vtorc/process"
"vitess.io/vitess/go/vt/vttablet/tmclient"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand All @@ -64,16 +65,24 @@
// OpenTabletDiscovery opens the vitess topo if enables and returns a ticker
// channel for polling.
func OpenTabletDiscovery() <-chan time.Time {
// TODO(sougou): If there's a shutdown signal, we have to close the topo.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed this too because it was really just 1 line fix, involving adding ts.Close() to closeVTOrc

ts = topo.Open()
tmc = tmclient.NewTabletManagerClient()
// Clear existing cache and perform a new refresh.
if _, err := db.ExecVTOrc("delete from vitess_tablet"); err != nil {
log.Error(err)
}
// We refresh all information from the topo once before we start the ticks to do it on a timer.
populateAllInformation()

Check warning on line 75 in go/vt/vtorc/logic/tablet_discovery.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/tablet_discovery.go#L75

Added line #L75 was not covered by tests
return time.Tick(time.Second * time.Duration(config.Config.TopoInformationRefreshSeconds)) //nolint SA1015: using time.Tick leaks the underlying ticker
}

// populateAllInformation initializes all the information for VTOrc to function.
func populateAllInformation() {
refreshAllInformation()
// We have completed one full discovery cycle. We should update the process health.
process.FirstDiscoveryCycleComplete.Store(true)
}

// refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while
func refreshAllTablets() {
refreshTabletsUsing(func(tabletAlias string) {
Expand Down
22 changes: 22 additions & 0 deletions go/vt/vtorc/logic/tablet_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtorc/db"
"vitess.io/vitess/go/vt/vtorc/inst"
"vitess.io/vitess/go/vt/vtorc/process"
)

var (
Expand Down Expand Up @@ -342,3 +343,24 @@ func TestGetLockAction(t *testing.T) {
})
}
}

// TestProcessHealth tests that the health of the process reflects that we have run the first discovery once correctly.
func TestProcessHealth(t *testing.T) {
require.False(t, process.FirstDiscoveryCycleComplete.Load())
originalTs := ts
defer func() {
ts = originalTs
process.FirstDiscoveryCycleComplete.Store(false)
}()
// Verify in the beginning, we have the first DiscoveredOnce field false.
health, err := process.HealthTest()
require.NoError(t, err)
require.False(t, health.DiscoveredOnce)
ts = memorytopo.NewServer(context.Background(), cell1)
populateAllInformation()
require.True(t, process.FirstDiscoveryCycleComplete.Load())
// Verify after we populate all information, we have the first DiscoveredOnce field true.
health, err = process.HealthTest()
require.NoError(t, err)
require.True(t, health.DiscoveredOnce)
}
56 changes: 25 additions & 31 deletions go/vt/vtorc/logic/vtorc.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
_ = inst.AuditOperation("shutdown", "", "Triggered via SIGTERM")
// wait for the locks to be released
waitForLocksRelease()
ts.Close()

Check warning on line 132 in go/vt/vtorc/logic/vtorc.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/vtorc.go#L132

Added line #L132 was not covered by tests
log.Infof("VTOrc closed")
}

Expand Down Expand Up @@ -335,8 +336,6 @@
// nolint SA1015: using time.Tick leaks the underlying ticker
func ContinuousDiscovery() {
log.Infof("continuous discovery: setting up")
continuousDiscoveryStartTime := time.Now()
checkAndRecoverWaitPeriod := 3 * instancePollSecondsDuration()
recentDiscoveryOperationKeys = cache.New(instancePollSecondsDuration(), time.Second)

go handleDiscoveryRequests()
Expand All @@ -351,10 +350,6 @@
snapshotTopologiesTick = time.Tick(time.Duration(config.Config.SnapshotTopologiesIntervalHours) * time.Hour)
}

runCheckAndRecoverOperationsTimeRipe := func() bool {
return time.Since(continuousDiscoveryStartTime) >= checkAndRecoverWaitPeriod
}

go func() {
_ = ometrics.InitMetrics()
}()
Expand Down Expand Up @@ -400,11 +395,7 @@
} else {
return
}
if runCheckAndRecoverOperationsTimeRipe() {
CheckAndRecover()
} else {
log.Infof("Waiting for %+v seconds to pass before running failure detection/recovery", checkAndRecoverWaitPeriod.Seconds())
}
CheckAndRecover()

Check warning on line 398 in go/vt/vtorc/logic/vtorc.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/vtorc.go#L398

Added line #L398 was not covered by tests
}()
}
}()
Expand All @@ -415,27 +406,30 @@
}
}()
case <-tabletTopoTick:
// Create a wait group
var wg sync.WaitGroup
refreshAllInformation()

Check warning on line 409 in go/vt/vtorc/logic/vtorc.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/vtorc.go#L409

Added line #L409 was not covered by tests
}
}
}

// Refresh all keyspace information.
wg.Add(1)
go func() {
defer wg.Done()
RefreshAllKeyspacesAndShards()
}()
// refreshAllInformation refreshes both shard and tablet information. This is meant to be run on tablet topo ticks.
func refreshAllInformation() {
// Create a wait group
var wg sync.WaitGroup

// Refresh all tablets.
wg.Add(1)
go func() {
defer wg.Done()
refreshAllTablets()
}()
// Refresh all keyspace information.
wg.Add(1)
go func() {
defer wg.Done()
RefreshAllKeyspacesAndShards()
}()

// Wait for both the refreshes to complete
wg.Wait()
// We have completed one discovery cycle in the entirety of it. We should update the process health.
process.FirstDiscoveryCycleComplete.Store(true)
}
}
// Refresh all tablets.
wg.Add(1)
go func() {
defer wg.Done()
refreshAllTablets()
}()

// Wait for both the refreshes to complete
wg.Wait()
}
4 changes: 3 additions & 1 deletion go/vt/vtorc/process/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ func RegisterNode(nodeHealth *NodeHealth) (healthy bool, err error) {
func HealthTest() (health *HealthStatus, err error) {
cacheKey := util.ProcessToken.Hash
if healthStatus, found := lastHealthCheckCache.Get(cacheKey); found {
return healthStatus.(*HealthStatus), nil
health = healthStatus.(*HealthStatus)
health.DiscoveredOnce = FirstDiscoveryCycleComplete.Load()
return
}

health = &HealthStatus{Healthy: false, Hostname: ThisHostname, Token: util.ProcessToken.Hash}
Expand Down
Loading