From 4ef43f676331c4f6566289134f5a10af29ab7cbf Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 21 Feb 2024 11:12:30 +0530 Subject: [PATCH 1/4] feat: handle todo to close topo server Signed-off-by: Manan Gupta --- go/vt/vtorc/logic/tablet_discovery.go | 1 - go/vt/vtorc/logic/vtorc.go | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index dd2e65237bf..47f52f08c6c 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -64,7 +64,6 @@ func RegisterFlags(fs *pflag.FlagSet) { // OpenTabletDiscovery opens the vitess topo if enables and returns a ticker // channel for polling. func OpenTabletDiscovery() <-chan time.Time { - // TODO(sougou): If there's a shutdown signal, we have to close the topo. ts = topo.Open() tmc = tmclient.NewTabletManagerClient() // Clear existing cache and perform a new refresh. diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index f637956fbfd..1a02c96a158 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -129,6 +129,7 @@ func closeVTOrc() { _ = inst.AuditOperation("shutdown", "", "Triggered via SIGTERM") // wait for the locks to be released waitForLocksRelease() + ts.Close() log.Infof("VTOrc closed") } From 296005a78ebc4707593ef9105fe9749ae806e529 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 21 Feb 2024 11:52:45 +0530 Subject: [PATCH 2/4] feat: improve how VTOrc handles first discovery Signed-off-by: Manan Gupta --- go/vt/vtorc/logic/tablet_discovery.go | 10 ++++ go/vt/vtorc/logic/tablet_discovery_test.go | 22 +++++++++ go/vt/vtorc/logic/vtorc.go | 55 ++++++++++------------ go/vt/vtorc/process/health.go | 4 +- 4 files changed, 59 insertions(+), 32 deletions(-) diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 47f52f08c6c..6ac79ceaf1c 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -40,6 +40,7 @@ import ( "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/db" "vitess.io/vitess/go/vt/vtorc/inst" + "vitess.io/vitess/go/vt/vtorc/process" "vitess.io/vitess/go/vt/vttablet/tmclient" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -70,9 +71,18 @@ func OpenTabletDiscovery() <-chan time.Time { if _, err := db.ExecVTOrc("delete from vitess_tablet"); err != nil { log.Error(err) } + // We refresh all information from the topo once before we start the ticks to do it on an timer. + populateAllInformation() return time.Tick(time.Second * time.Duration(config.Config.TopoInformationRefreshSeconds)) //nolint SA1015: using time.Tick leaks the underlying ticker } +// populateAllInformation initializes all the information for VTOrc to function. +func populateAllInformation() { + refreshAllInformation() + // We have completed one discovery cycle in the entirety of it. We should update the process health. + process.FirstDiscoveryCycleComplete.Store(true) +} + // refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while func refreshAllTablets() { refreshTabletsUsing(func(tabletAlias string) { diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index 0e8ac72fabf..f79cecf9ff5 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -34,6 +34,7 @@ import ( "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtorc/db" "vitess.io/vitess/go/vt/vtorc/inst" + "vitess.io/vitess/go/vt/vtorc/process" ) var ( @@ -342,3 +343,24 @@ func TestGetLockAction(t *testing.T) { }) } } + +// TestProcessHealth tests that the health of the process reflects that we have run the first discovery once correctly. +func TestProcessHealth(t *testing.T) { + require.False(t, process.FirstDiscoveryCycleComplete.Load()) + originalTs := ts + defer func() { + ts = originalTs + process.FirstDiscoveryCycleComplete.Store(false) + }() + // Verify in the beginning, we have the first DiscoveredOnce field false. + health, err := process.HealthTest() + require.NoError(t, err) + require.False(t, health.DiscoveredOnce) + ts = memorytopo.NewServer(context.Background(), cell1) + populateAllInformation() + require.True(t, process.FirstDiscoveryCycleComplete.Load()) + // Verify after we populate all information, we have the first DiscoveredOnce field true. + health, err = process.HealthTest() + require.NoError(t, err) + require.True(t, health.DiscoveredOnce) +} diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index 1a02c96a158..66c5590831b 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -336,8 +336,6 @@ func onHealthTick() { // nolint SA1015: using time.Tick leaks the underlying ticker func ContinuousDiscovery() { log.Infof("continuous discovery: setting up") - continuousDiscoveryStartTime := time.Now() - checkAndRecoverWaitPeriod := 3 * instancePollSecondsDuration() recentDiscoveryOperationKeys = cache.New(instancePollSecondsDuration(), time.Second) go handleDiscoveryRequests() @@ -352,10 +350,6 @@ func ContinuousDiscovery() { snapshotTopologiesTick = time.Tick(time.Duration(config.Config.SnapshotTopologiesIntervalHours) * time.Hour) } - runCheckAndRecoverOperationsTimeRipe := func() bool { - return time.Since(continuousDiscoveryStartTime) >= checkAndRecoverWaitPeriod - } - go func() { _ = ometrics.InitMetrics() }() @@ -401,11 +395,7 @@ func ContinuousDiscovery() { } else { return } - if runCheckAndRecoverOperationsTimeRipe() { - CheckAndRecover() - } else { - log.Infof("Waiting for %+v seconds to pass before running failure detection/recovery", checkAndRecoverWaitPeriod.Seconds()) - } + CheckAndRecover() }() } }() @@ -416,27 +406,30 @@ func ContinuousDiscovery() { } }() case <-tabletTopoTick: - // Create a wait group - var wg sync.WaitGroup + refreshAllInformation() + } + } +} - // Refresh all keyspace information. - wg.Add(1) - go func() { - defer wg.Done() - RefreshAllKeyspacesAndShards() - }() +// refreshAllInformation refreshes both shard and tablet information. This is meant to be run on tablet topo ticks. +func refreshAllInformation() { + // Create a wait group + var wg sync.WaitGroup - // Refresh all tablets. - wg.Add(1) - go func() { - defer wg.Done() - refreshAllTablets() - }() + // Refresh all keyspace information. + wg.Add(1) + go func() { + defer wg.Done() + RefreshAllKeyspacesAndShards() + }() - // Wait for both the refreshes to complete - wg.Wait() - // We have completed one discovery cycle in the entirety of it. We should update the process health. - process.FirstDiscoveryCycleComplete.Store(true) - } - } + // Refresh all tablets. + wg.Add(1) + go func() { + defer wg.Done() + refreshAllTablets() + }() + + // Wait for both the refreshes to complete + wg.Wait() } diff --git a/go/vt/vtorc/process/health.go b/go/vt/vtorc/process/health.go index 22db89e1d56..a782b2edf14 100644 --- a/go/vt/vtorc/process/health.go +++ b/go/vt/vtorc/process/health.go @@ -108,7 +108,9 @@ func RegisterNode(nodeHealth *NodeHealth) (healthy bool, err error) { func HealthTest() (health *HealthStatus, err error) { cacheKey := util.ProcessToken.Hash if healthStatus, found := lastHealthCheckCache.Get(cacheKey); found { - return healthStatus.(*HealthStatus), nil + health = healthStatus.(*HealthStatus) + health.DiscoveredOnce = FirstDiscoveryCycleComplete.Load() + return } health = &HealthStatus{Healthy: false, Hostname: ThisHostname, Token: util.ProcessToken.Hash} From 62680ce249cd9b086723d69897018d9cb79ff846 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 21 Feb 2024 12:29:56 +0530 Subject: [PATCH 3/4] test: update test expectations Signed-off-by: Manan Gupta --- go/test/endtoend/vtorc/api/api_test.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/go/test/endtoend/vtorc/api/api_test.go b/go/test/endtoend/vtorc/api/api_test.go index 7dd5c50eefa..b16f4ebf9a2 100644 --- a/go/test/endtoend/vtorc/api/api_test.go +++ b/go/test/endtoend/vtorc/api/api_test.go @@ -37,21 +37,18 @@ func TestAPIEndpoints(t *testing.T) { utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 2, 1, nil, cluster.VTOrcConfiguration{ PreventCrossDataCenterPrimaryFailover: true, RecoveryPeriodBlockSeconds: 5, - // The default topo refresh time is 3 seconds. We are intentionally making it slower for the test, so that we have time to verify - // the /debug/health output before and after the first refresh runs. - TopologyRefreshSeconds: 10, }, 1, "") keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] shard0 := &keyspace.Shards[0] vtorc := clusterInfo.ClusterInstance.VTOrcProcesses[0] // Call API with retry to ensure VTOrc is up status, resp := utils.MakeAPICallRetry(t, vtorc, "/debug/health", func(code int, response string) bool { - return code == 0 + return code != 200 }) - // When VTOrc is up and hasn't run the topo-refresh, is should be healthy but HasDiscovered should be false. - assert.Equal(t, 500, status) + // Verify when VTOrc is healthy, it has also run the first discovery. + assert.Equal(t, 200, status) assert.Contains(t, resp, `"Healthy": true,`) - assert.Contains(t, resp, `"DiscoveredOnce": false`) + assert.Contains(t, resp, `"DiscoveredOnce": true`) // find primary from topo primary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0) From 62f42e2f29c74103517849b877273d3dbcfc812c Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 22 Feb 2024 10:20:39 +0530 Subject: [PATCH 4/4] comments: fix code comments Signed-off-by: Manan Gupta --- go/vt/vtorc/logic/tablet_discovery.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 6ac79ceaf1c..f08ab2c9c15 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -71,7 +71,7 @@ func OpenTabletDiscovery() <-chan time.Time { if _, err := db.ExecVTOrc("delete from vitess_tablet"); err != nil { log.Error(err) } - // We refresh all information from the topo once before we start the ticks to do it on an timer. + // We refresh all information from the topo once before we start the ticks to do it on a timer. populateAllInformation() return time.Tick(time.Second * time.Duration(config.Config.TopoInformationRefreshSeconds)) //nolint SA1015: using time.Tick leaks the underlying ticker } @@ -79,7 +79,7 @@ func OpenTabletDiscovery() <-chan time.Time { // populateAllInformation initializes all the information for VTOrc to function. func populateAllInformation() { refreshAllInformation() - // We have completed one discovery cycle in the entirety of it. We should update the process health. + // We have completed one full discovery cycle. We should update the process health. process.FirstDiscoveryCycleComplete.Store(true) }