From 18d41281b9c9b8ac52fc9bd607d9aef8337b0b9b Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Wed, 3 Apr 2024 19:40:13 +0530 Subject: [PATCH] VTOrc: Cleanup node registration and unused code (#15617) Signed-off-by: Manan Gupta --- go/test/endtoend/vtorc/api/api_test.go | 2 - .../promotionrule/promotion_rule.go | 1 - go/vt/vtorc/config/config.go | 5 - go/vt/vtorc/db/generate_base.go | 63 +------ go/vt/vtorc/inst/analysis.go | 28 --- go/vt/vtorc/inst/analysis_dao.go | 15 -- go/vt/vtorc/inst/analysis_dao_test.go | 14 +- go/vt/vtorc/inst/analysis_test.go | 48 ----- go/vt/vtorc/inst/cluster.go | 14 +- go/vt/vtorc/logic/tablet_discovery.go | 3 - go/vt/vtorc/logic/tablet_discovery_test.go | 10 +- go/vt/vtorc/logic/topology_recovery.go | 4 - go/vt/vtorc/logic/topology_recovery_dao.go | 2 - go/vt/vtorc/logic/vtorc.go | 104 ++--------- go/vt/vtorc/process/election_dao.go | 138 -------------- go/vt/vtorc/process/health.go | 150 +++------------ go/vt/vtorc/process/health_dao.go | 176 ------------------ go/vt/vtorc/process/host.go | 33 ---- go/vt/vtorc/server/api.go | 8 +- go/vt/vtorc/server/discovery.go | 3 - go/vt/vtorc/util/token.go | 53 ------ go/vt/vtorc/util/token_test.go | 28 --- 22 files changed, 55 insertions(+), 847 deletions(-) delete mode 100644 go/vt/vtorc/inst/analysis_test.go delete mode 100644 go/vt/vtorc/process/election_dao.go delete mode 100644 go/vt/vtorc/process/health_dao.go delete mode 100644 go/vt/vtorc/process/host.go delete mode 100644 go/vt/vtorc/util/token.go delete mode 100644 go/vt/vtorc/util/token_test.go diff --git a/go/test/endtoend/vtorc/api/api_test.go b/go/test/endtoend/vtorc/api/api_test.go index 05b757a90ac..83b9ba73efe 100644 --- a/go/test/endtoend/vtorc/api/api_test.go +++ b/go/test/endtoend/vtorc/api/api_test.go @@ -48,7 +48,6 @@ func TestAPIEndpoints(t *testing.T) { // Verify when VTOrc is healthy, it has also run the first discovery. assert.Equal(t, 200, status) assert.Contains(t, resp, `"Healthy": true,`) - assert.Contains(t, resp, `"DiscoveredOnce": true`) // find primary from topo primary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0) @@ -77,7 +76,6 @@ func TestAPIEndpoints(t *testing.T) { require.NoError(t, err) assert.Equal(t, 200, status) assert.Contains(t, resp, `"Healthy": true,`) - assert.Contains(t, resp, `"DiscoveredOnce": true`) }) t.Run("Liveness API", func(t *testing.T) { diff --git a/go/vt/vtctl/reparentutil/promotionrule/promotion_rule.go b/go/vt/vtctl/reparentutil/promotionrule/promotion_rule.go index c92805b8955..261c783a228 100644 --- a/go/vt/vtctl/reparentutil/promotionrule/promotion_rule.go +++ b/go/vt/vtctl/reparentutil/promotionrule/promotion_rule.go @@ -21,7 +21,6 @@ import ( ) // CandidatePromotionRule describe the promotion preference/rule for an instance. -// It maps to promotion_rule column in candidate_database_instance type CandidatePromotionRule string const ( diff --git a/go/vt/vtorc/config/config.go b/go/vt/vtorc/config/config.go index 402c67870ba..2d21e377cb6 100644 --- a/go/vt/vtorc/config/config.go +++ b/go/vt/vtorc/config/config.go @@ -27,15 +27,10 @@ import ( "vitess.io/vitess/go/vt/log" ) -const ( - LostInRecoveryDowntimeSeconds int = 60 * 60 * 24 * 365 -) - var configurationLoaded = make(chan bool) const ( HealthPollSeconds = 1 - ActiveNodeExpireSeconds = 5 AuditPageSize = 20 DebugMetricsIntervalSeconds = 10 StaleInstanceCoordinatesExpireSeconds = 60 diff --git a/go/vt/vtorc/db/generate_base.go b/go/vt/vtorc/db/generate_base.go index fbb96ef75c0..44bc7acd483 100644 --- a/go/vt/vtorc/db/generate_base.go +++ b/go/vt/vtorc/db/generate_base.go @@ -19,15 +19,12 @@ package db var TableNames = []string{ "database_instance", "audit", - "active_node", "node_health", "topology_recovery", "database_instance_topology_history", - "candidate_database_instance", "recovery_detection", "database_instance_last_analysis", "database_instance_analysis_changelog", - "node_health_history", "vtorc_db_deployments", "global_recovery_disable", "topology_recovery_steps", @@ -137,32 +134,11 @@ CREATE INDEX audit_timestamp_idx_audit ON audit (audit_timestamp) CREATE INDEX alias_idx_audit ON audit (alias, audit_timestamp) `, ` -DROP TABLE IF EXISTS active_node -`, - ` -CREATE TABLE active_node ( - anchor tinyint NOT NULL, - hostname varchar(128) NOT NULL, - token varchar(128) NOT NULL, - last_seen_active timestamp not null default (''), - first_seen_active timestamp NOT NULL DEFAULT '1971-01-01 00:00:00', - PRIMARY KEY (anchor) -)`, - ` DROP TABLE IF EXISTS node_health `, ` CREATE TABLE node_health ( - hostname varchar(128) NOT NULL, - token varchar(128) NOT NULL, - last_seen_active timestamp not null default (''), - extra_info varchar(128) not null default '', - command varchar(128) not null default '', - app_version varchar(64) NOT NULL DEFAULT "", - first_seen_active timestamp NOT NULL DEFAULT '1971-01-01 00:00:00', - db_backend varchar(255) NOT NULL DEFAULT "", - incrementing_indicator bigint not null default 0, - PRIMARY KEY (hostname, token) + last_seen_active timestamp not null default ('') )`, ` DROP TABLE IF EXISTS topology_recovery @@ -205,20 +181,6 @@ CREATE TABLE database_instance_topology_history ( CREATE INDEX keyspace_shard_idx_database_instance_topology_history ON database_instance_topology_history (snapshot_unix_timestamp, keyspace, shard) `, ` -DROP TABLE IF EXISTS candidate_database_instance -`, - ` -CREATE TABLE candidate_database_instance ( - alias varchar(256) NOT NULL, - last_suggested timestamp not null default (''), - priority TINYINT SIGNED NOT NULL DEFAULT 1, - promotion_rule text check(promotion_rule in ('must', 'prefer', 'neutral', 'prefer_not', 'must_not')) NOT NULL DEFAULT 'neutral', - PRIMARY KEY (alias) -)`, - ` -CREATE INDEX last_suggested_idx_candidate_database_instance ON candidate_database_instance (last_suggested) - `, - ` DROP TABLE IF EXISTS recovery_detection `, ` @@ -259,26 +221,6 @@ CREATE TABLE database_instance_analysis_changelog ( CREATE INDEX analysis_timestamp_idx_database_instance_analysis_changelog ON database_instance_analysis_changelog (analysis_timestamp) `, ` -DROP TABLE IF EXISTS node_health_history -`, - ` -CREATE TABLE node_health_history ( - history_id integer, - hostname varchar(128) NOT NULL, - token varchar(128) NOT NULL, - first_seen_active timestamp NOT NULL, - extra_info varchar(128) NOT NULL, - command varchar(128) not null default '', - app_version varchar(64) NOT NULL DEFAULT "", - PRIMARY KEY (history_id) -)`, - ` -CREATE INDEX first_seen_active_idx_node_health_history ON node_health_history (first_seen_active) - `, - ` -CREATE UNIQUE INDEX hostname_token_idx_node_health_history ON node_health_history (hostname, token) - `, - ` DROP TABLE IF EXISTS vtorc_db_deployments `, ` @@ -379,9 +321,6 @@ CREATE INDEX instance_timestamp_idx_database_instance_analysis_changelog on data CREATE INDEX detection_idx_topology_recovery on topology_recovery (detection_id) `, ` -CREATE INDEX last_seen_active_idx_node_health on node_health (last_seen_active) - `, - ` CREATE INDEX recovery_id_idx_topology_recovery_steps ON topology_recovery_steps(recovery_id) `, } diff --git a/go/vt/vtorc/inst/analysis.go b/go/vt/vtorc/inst/analysis.go index 328b43df0c5..be254cb718a 100644 --- a/go/vt/vtorc/inst/analysis.go +++ b/go/vt/vtorc/inst/analysis.go @@ -54,9 +54,7 @@ const ( AllPrimaryReplicasNotReplicatingOrDead AnalysisCode = "AllPrimaryReplicasNotReplicatingOrDead" LockedSemiSyncPrimaryHypothesis AnalysisCode = "LockedSemiSyncPrimaryHypothesis" LockedSemiSyncPrimary AnalysisCode = "LockedSemiSyncPrimary" - PrimaryWithoutReplicas AnalysisCode = "PrimaryWithoutReplicas" BinlogServerFailingToConnectToPrimary AnalysisCode = "BinlogServerFailingToConnectToPrimary" - GraceFulPrimaryTakeover AnalysisCode = "GracefulPrimaryTakeover" ErrantGTIDDetected AnalysisCode = "ErrantGTIDDetected" ) @@ -83,34 +81,20 @@ type ReplicationAnalysisHints struct { AuditAnalysis bool } -type AnalysisInstanceType string - -const ( - AnalysisInstanceTypePrimary AnalysisInstanceType = "primary" - AnalysisInstanceTypeCoPrimary AnalysisInstanceType = "co-primary" - AnalysisInstanceTypeIntermediatePrimary AnalysisInstanceType = "intermediate-primary" -) - // ReplicationAnalysis notes analysis on replication chain status, per instance type ReplicationAnalysis struct { - AnalyzedInstanceHostname string - AnalyzedInstancePort int AnalyzedInstanceAlias string AnalyzedInstancePrimaryAlias string TabletType topodatapb.TabletType PrimaryTimeStamp time.Time ClusterDetails ClusterInfo - AnalyzedInstanceDataCenter string - AnalyzedInstanceRegion string AnalyzedKeyspace string AnalyzedShard string // ShardPrimaryTermTimestamp is the primary term start time stored in the shard record. ShardPrimaryTermTimestamp string - AnalyzedInstancePhysicalEnvironment string AnalyzedInstanceBinlogCoordinates BinlogCoordinates IsPrimary bool IsClusterPrimary bool - IsCoPrimary bool LastCheckValid bool LastCheckPartialSuccess bool CountReplicas uint @@ -159,18 +143,6 @@ func (replicationAnalysis *ReplicationAnalysis) MarshalJSON() ([]byte, error) { return json.Marshal(i) } -// Get a string description of the analyzed instance type (primary? co-primary? intermediate-primary?) -func (replicationAnalysis *ReplicationAnalysis) GetAnalysisInstanceType() AnalysisInstanceType { - if replicationAnalysis.IsCoPrimary { - return AnalysisInstanceTypeCoPrimary - } - - if replicationAnalysis.IsPrimary { - return AnalysisInstanceTypePrimary - } - return AnalysisInstanceTypeIntermediatePrimary -} - // ValidSecondsFromSeenToLastAttemptedCheck returns the maximum allowed elapsed time // between last_attempted_check to last_checked before we consider the instance as invalid. func ValidSecondsFromSeenToLastAttemptedCheck() uint { diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go index b348d17d45f..7e9cef9c30b 100644 --- a/go/vt/vtorc/inst/analysis_dao.go +++ b/go/vt/vtorc/inst/analysis_dao.go @@ -75,8 +75,6 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna query := ` SELECT vitess_tablet.info AS tablet_info, - vitess_tablet.hostname, - vitess_tablet.port, vitess_tablet.tablet_type, vitess_tablet.primary_timestamp, vitess_tablet.shard AS shard, @@ -87,9 +85,6 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna primary_instance.read_only AS read_only, MIN(primary_instance.gtid_errant) AS gtid_errant, MIN(primary_instance.alias) IS NULL AS is_invalid, - MIN(primary_instance.data_center) AS data_center, - MIN(primary_instance.region) AS region, - MIN(primary_instance.physical_environment) AS physical_environment, MIN(primary_instance.binary_log_file) AS binary_log_file, MIN(primary_instance.binary_log_pos) AS binary_log_pos, MIN(primary_tablet.info) AS primary_tablet_info, @@ -115,7 +110,6 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna OR substr(primary_instance.source_host, 1, 2) = '//' ) ) AS is_primary, - MIN(primary_instance.is_co_primary) AS is_co_primary, MIN(primary_instance.gtid_mode) AS gtid_mode, COUNT(replica_instance.server_id) AS count_replicas, IFNULL( @@ -172,7 +166,6 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna MIN( primary_instance.semi_sync_replica_enabled ) AS semi_sync_replica_enabled, - SUM(replica_instance.is_co_primary) AS count_co_primary_replicas, SUM(replica_instance.oracle_gtid) AS count_oracle_gtid_replicas, IFNULL( SUM( @@ -331,15 +324,8 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna a.ShardPrimaryTermTimestamp = m.GetString("shard_primary_term_timestamp") a.IsPrimary = m.GetBool("is_primary") - countCoPrimaryReplicas := m.GetUint("count_co_primary_replicas") - a.IsCoPrimary = m.GetBool("is_co_primary") || (countCoPrimaryReplicas > 0) - a.AnalyzedInstanceHostname = m.GetString("hostname") - a.AnalyzedInstancePort = m.GetInt("port") a.AnalyzedInstanceAlias = topoproto.TabletAliasString(tablet.Alias) a.AnalyzedInstancePrimaryAlias = topoproto.TabletAliasString(primaryTablet.Alias) - a.AnalyzedInstanceDataCenter = m.GetString("data_center") - a.AnalyzedInstanceRegion = m.GetString("region") - a.AnalyzedInstancePhysicalEnvironment = m.GetString("physical_environment") a.AnalyzedInstanceBinlogCoordinates = BinlogCoordinates{ LogFile: m.GetString("binary_log_file"), LogPos: m.GetUint32("binary_log_pos"), @@ -359,7 +345,6 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna a.IsFailingToConnectToPrimary = m.GetBool("is_failing_to_connect_to_primary") a.ReplicationStopped = m.GetBool("replication_stopped") a.IsBinlogServer = m.GetBool("is_binlog_server") - a.ClusterDetails.ReadRecoveryInfo() a.ErrantGTID = m.GetString("gtid_errant") countValidOracleGTIDReplicas := m.GetUint("count_valid_oracle_gtid_replicas") diff --git a/go/vt/vtorc/inst/analysis_dao_test.go b/go/vt/vtorc/inst/analysis_dao_test.go index c1926fca089..d25072c543d 100644 --- a/go/vt/vtorc/inst/analysis_dao_test.go +++ b/go/vt/vtorc/inst/analysis_dao_test.go @@ -917,21 +917,19 @@ func TestAuditInstanceAnalysisInChangelog(t *testing.T) { // TestPostProcessAnalyses tests the functionality of the postProcessAnalyses function. func TestPostProcessAnalyses(t *testing.T) { ks0 := ClusterInfo{ - Keyspace: "ks", - Shard: "0", - CountInstances: 4, + Keyspace: "ks", + Shard: "0", } ks80 := ClusterInfo{ - Keyspace: "ks", - Shard: "80-", - CountInstances: 3, + Keyspace: "ks", + Shard: "80-", } clusters := map[string]*clusterAnalysis{ getKeyspaceShardName(ks0.Keyspace, ks0.Shard): { - totalTablets: int(ks0.CountInstances), + totalTablets: 4, }, getKeyspaceShardName(ks80.Keyspace, ks80.Shard): { - totalTablets: int(ks80.CountInstances), + totalTablets: 3, }, } diff --git a/go/vt/vtorc/inst/analysis_test.go b/go/vt/vtorc/inst/analysis_test.go deleted file mode 100644 index 70849379a5e..00000000000 --- a/go/vt/vtorc/inst/analysis_test.go +++ /dev/null @@ -1,48 +0,0 @@ -/* - Copyright 2014 Outbrain Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package inst - -import ( - "testing" - - "vitess.io/vitess/go/vt/vtorc/config" - - "github.com/stretchr/testify/require" -) - -func init() { - config.MarkConfigurationLoaded() -} - -func TestGetAnalysisInstanceType(t *testing.T) { - { - analysis := &ReplicationAnalysis{} - require.Equal(t, string(analysis.GetAnalysisInstanceType()), "intermediate-primary") - } - { - analysis := &ReplicationAnalysis{IsPrimary: true} - require.Equal(t, string(analysis.GetAnalysisInstanceType()), "primary") - } - { - analysis := &ReplicationAnalysis{IsCoPrimary: true} - require.Equal(t, string(analysis.GetAnalysisInstanceType()), "co-primary") - } - { - analysis := &ReplicationAnalysis{IsPrimary: true, IsCoPrimary: true} - require.Equal(t, string(analysis.GetAnalysisInstanceType()), "co-primary") - } -} diff --git a/go/vt/vtorc/inst/cluster.go b/go/vt/vtorc/inst/cluster.go index c3a77485e74..f163885a283 100644 --- a/go/vt/vtorc/inst/cluster.go +++ b/go/vt/vtorc/inst/cluster.go @@ -18,16 +18,6 @@ package inst // ClusterInfo makes for a cluster status/info summary type ClusterInfo struct { - Keyspace string - Shard string - CountInstances uint - HeuristicLag int64 - HasAutomatedPrimaryRecovery bool - HasAutomatedIntermediatePrimaryRecovery bool -} - -// ReadRecoveryInfo -func (clusterInfo *ClusterInfo) ReadRecoveryInfo() { - clusterInfo.HasAutomatedPrimaryRecovery = true - clusterInfo.HasAutomatedIntermediatePrimaryRecovery = true + Keyspace string + Shard string } diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 58c042232ee..ca20a554adb 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -90,9 +90,6 @@ func refreshAllTablets() { } func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) { - if !IsLeaderOrActive() { - return - } if len(clustersToWatch) == 0 { // all known clusters ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) defer cancel() diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index f79cecf9ff5..d7e645ba66d 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -353,14 +353,12 @@ func TestProcessHealth(t *testing.T) { process.FirstDiscoveryCycleComplete.Store(false) }() // Verify in the beginning, we have the first DiscoveredOnce field false. - health, err := process.HealthTest() - require.NoError(t, err) - require.False(t, health.DiscoveredOnce) + _, discoveredOnce := process.HealthTest() + require.False(t, discoveredOnce) ts = memorytopo.NewServer(context.Background(), cell1) populateAllInformation() require.True(t, process.FirstDiscoveryCycleComplete.Load()) // Verify after we populate all information, we have the first DiscoveredOnce field true. - health, err = process.HealthTest() - require.NoError(t, err) - require.True(t, health.DiscoveredOnce) + _, discoveredOnce = process.HealthTest() + require.True(t, discoveredOnce) } diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index 4c1e4264b5d..e7d9735f0d4 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -198,10 +198,6 @@ func recoverPrimaryHasPrimary(ctx context.Context, analysisEntry *inst.Replicati // runEmergencyReparentOp runs a recovery for which we have to run ERS. Here waitForAllTablets is a boolean telling ERS whether it should wait for all the tablets // or is it okay to skip 1. func runEmergencyReparentOp(ctx context.Context, analysisEntry *inst.ReplicationAnalysis, recoveryName string, waitForAllTablets bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { - if !analysisEntry.ClusterDetails.HasAutomatedPrimaryRecovery { - return false, nil, nil - } - // Read the tablet information from the database to find the shard and keyspace of the tablet tablet, err := inst.ReadTablet(analysisEntry.AnalyzedInstanceAlias) if err != nil { diff --git a/go/vt/vtorc/logic/topology_recovery_dao.go b/go/vt/vtorc/logic/topology_recovery_dao.go index dd5f8a96430..e8af34bdad4 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao.go +++ b/go/vt/vtorc/logic/topology_recovery_dao.go @@ -194,8 +194,6 @@ func readRecoveries(whereCondition string, limit string, args []any) ([]*Topolog topologyRecovery.SuccessorAlias = m.GetString("successor_alias") - topologyRecovery.AnalysisEntry.ClusterDetails.ReadRecoveryInfo() - topologyRecovery.AllErrors = strings.Split(m.GetString("all_errors"), "\n") topologyRecovery.DetectionID = m.GetInt64("detection_id") diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index b9e5795a31f..ffe6e73fa35 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -35,7 +35,6 @@ import ( "vitess.io/vitess/go/vt/vtorc/discovery" "vitess.io/vitess/go/vt/vtorc/inst" ometrics "vitess.io/vitess/go/vt/vtorc/metrics" - "vitess.io/vitess/go/vt/vtorc/process" "vitess.io/vitess/go/vt/vtorc/util" ) @@ -56,12 +55,8 @@ var failedDiscoveriesCounter = metrics.NewCounter() var instancePollSecondsExceededCounter = metrics.NewCounter() var discoveryQueueLengthGauge = metrics.NewGauge() var discoveryRecentCountGauge = metrics.NewGauge() -var isElectedGauge = metrics.NewGauge() -var isHealthyGauge = metrics.NewGauge() var discoveryMetrics = collection.CreateOrReturnCollection(DiscoveryMetricsName) -var isElectedNode int64 - var recentDiscoveryOperationKeys *cache.Cache func init() { @@ -72,8 +67,6 @@ func init() { _ = metrics.Register("discoveries.instance_poll_seconds_exceeded", instancePollSecondsExceededCounter) _ = metrics.Register("discoveries.queue_length", discoveryQueueLengthGauge) _ = metrics.Register("discoveries.recent_count", discoveryRecentCountGauge) - _ = metrics.Register("elect.is_elected", isElectedGauge) - _ = metrics.Register("health.is_healthy", isHealthyGauge) ometrics.OnMetricsTick(func() { discoveryQueueLengthGauge.Update(int64(discoveryQueue.QueueLen())) @@ -84,20 +77,6 @@ func init() { } discoveryRecentCountGauge.Update(int64(recentDiscoveryOperationKeys.ItemCount())) }) - ometrics.OnMetricsTick(func() { - isElectedGauge.Update(atomic.LoadInt64(&isElectedNode)) - }) - ometrics.OnMetricsTick(func() { - isHealthyGauge.Update(atomic.LoadInt64(&process.LastContinousCheckHealthy)) - }) -} - -func IsLeader() bool { - return atomic.LoadInt64(&isElectedNode) == 1 -} - -func IsLeaderOrActive() bool { - return atomic.LoadInt64(&isElectedNode) == 1 } // used in several places @@ -161,15 +140,6 @@ func handleDiscoveryRequests() { go func() { for { tabletAlias := discoveryQueue.Consume() - // Possibly this used to be the elected node, but has - // been demoted, while still the queue is full. - if !IsLeaderOrActive() { - log.Infof("Node apparently demoted. Skipping discovery of %+v. "+ - "Remaining queue size: %+v", tabletAlias, discoveryQueue.QueueLen()) - discoveryQueue.Release(tabletAlias) - continue - } - DiscoverInstance(tabletAlias, false /* forceDiscovery */) discoveryQueue.Release(tabletAlias) } @@ -275,40 +245,11 @@ func DiscoverInstance(tabletAlias string, forceDiscovery bool) { // onHealthTick handles the actions to take to discover/poll instances func onHealthTick() { - wasAlreadyElected := IsLeader() - { - myIsElectedNode, err := process.AttemptElection() - if err != nil { - log.Error(err) - } - if myIsElectedNode { - atomic.StoreInt64(&isElectedNode, 1) - } else { - atomic.StoreInt64(&isElectedNode, 0) - } - if !myIsElectedNode { - if electedNode, _, err := process.ElectedNode(); err == nil { - log.Infof("Not elected as active node; active node: %v; polling", electedNode.Hostname) - } else { - log.Infof("Not elected as active node; active node: Unable to determine: %v; polling", err) - } - } - } - if !IsLeaderOrActive() { - return - } tabletAliases, err := inst.ReadOutdatedInstanceKeys() if err != nil { log.Error(err) } - if !wasAlreadyElected { - // Just turned to be leader! - go func() { - _, _ = process.RegisterNode(process.ThisNodeHealth) - }() - } - func() { // Normally onHealthTick() shouldn't run concurrently. It is kicked by a ticker. // However it _is_ invoked inside a goroutine. I like to be safe here. @@ -367,39 +308,30 @@ func ContinuousDiscovery() { case <-caretakingTick: // Various periodic internal maintenance tasks go func() { - if IsLeaderOrActive() { - - go inst.ForgetLongUnseenInstances() - go inst.ExpireAudit() - go inst.ExpireStaleInstanceBinlogCoordinates() - go process.ExpireNodesHistory() - go process.ExpireAvailableNodes() - go ExpireRecoveryDetectionHistory() - go ExpireTopologyRecoveryHistory() - go ExpireTopologyRecoveryStepsHistory() - } + go inst.ForgetLongUnseenInstances() + go inst.ExpireAudit() + go inst.ExpireStaleInstanceBinlogCoordinates() + go ExpireRecoveryDetectionHistory() + go ExpireTopologyRecoveryHistory() + go ExpireTopologyRecoveryStepsHistory() }() case <-recoveryTick: go func() { - if IsLeaderOrActive() { - go inst.ExpireInstanceAnalysisChangelog() - - go func() { - // This function is non re-entrant (it can only be running once at any point in time) - if atomic.CompareAndSwapInt64(&recoveryEntrance, 0, 1) { - defer atomic.StoreInt64(&recoveryEntrance, 0) - } else { - return - } - CheckAndRecover() - }() - } + go inst.ExpireInstanceAnalysisChangelog() + + go func() { + // This function is non re-entrant (it can only be running once at any point in time) + if atomic.CompareAndSwapInt64(&recoveryEntrance, 0, 1) { + defer atomic.StoreInt64(&recoveryEntrance, 0) + } else { + return + } + CheckAndRecover() + }() }() case <-snapshotTopologiesTick: go func() { - if IsLeaderOrActive() { - go inst.SnapshotTopologies() - } + go inst.SnapshotTopologies() }() case <-tabletTopoTick: refreshAllInformation() diff --git a/go/vt/vtorc/process/election_dao.go b/go/vt/vtorc/process/election_dao.go deleted file mode 100644 index aa2bfbc2dee..00000000000 --- a/go/vt/vtorc/process/election_dao.go +++ /dev/null @@ -1,138 +0,0 @@ -/* - Copyright 2015 Shlomi Noach, courtesy Booking.com - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package process - -import ( - "vitess.io/vitess/go/vt/external/golib/sqlutils" - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/vtorc/config" - "vitess.io/vitess/go/vt/vtorc/db" - "vitess.io/vitess/go/vt/vtorc/util" -) - -// AttemptElection tries to grab leadership (become active node) -func AttemptElection() (bool, error) { - { - sqlResult, err := db.ExecVTOrc(` - insert ignore into active_node ( - anchor, hostname, token, first_seen_active, last_seen_active - ) values ( - 1, ?, ?, now(), now() - ) - `, - ThisHostname, util.ProcessToken.Hash, - ) - if err != nil { - log.Error(err) - return false, err - } - rows, err := sqlResult.RowsAffected() - if err != nil { - log.Error(err) - return false, err - } - if rows > 0 { - // We managed to insert a row - return true, nil - } - } - { - // takeover from a node that has been inactive - sqlResult, err := db.ExecVTOrc(` - update active_node set - hostname = ?, - token = ?, - first_seen_active=now(), - last_seen_active=now() - where - anchor = 1 - and last_seen_active < (now() - interval ? second) - `, - ThisHostname, util.ProcessToken.Hash, config.ActiveNodeExpireSeconds, - ) - if err != nil { - log.Error(err) - return false, err - } - rows, err := sqlResult.RowsAffected() - if err != nil { - log.Error(err) - return false, err - } - if rows > 0 { - // We managed to update a row: overtaking a previous leader - return true, nil - } - } - { - // Update last_seen_active is this very node is already the active node - sqlResult, err := db.ExecVTOrc(` - update active_node set - last_seen_active=now() - where - anchor = 1 - and hostname = ? - and token = ? - `, - ThisHostname, util.ProcessToken.Hash, - ) - if err != nil { - log.Error(err) - return false, err - } - rows, err := sqlResult.RowsAffected() - if err != nil { - log.Error(err) - return false, err - } - if rows > 0 { - // Reaffirmed our own leadership - return true, nil - } - } - return false, nil -} - -// ElectedNode returns the details of the elected node, as well as answering the question "is this process the elected one"? -func ElectedNode() (node *NodeHealth, isElected bool, err error) { - node = &NodeHealth{} - query := ` - select - hostname, - token, - first_seen_active, - last_seen_Active - from - active_node - where - anchor = 1 - ` - err = db.QueryVTOrcRowsMap(query, func(m sqlutils.RowMap) error { - node.Hostname = m.GetString("hostname") - node.Token = m.GetString("token") - node.FirstSeenActive = m.GetString("first_seen_active") - node.LastSeenActive = m.GetString("last_seen_active") - - return nil - }) - - isElected = (node.Hostname == ThisHostname && node.Token == util.ProcessToken.Hash) - if err != nil { - log.Error(err) - } - return node, isElected, err //nolint copylocks: return copies lock value -} diff --git a/go/vt/vtorc/process/health.go b/go/vt/vtorc/process/health.go index a782b2edf14..86101d6c5c0 100644 --- a/go/vt/vtorc/process/health.go +++ b/go/vt/vtorc/process/health.go @@ -17,153 +17,47 @@ package process import ( - "sync" "sync/atomic" "time" "vitess.io/vitess/go/vt/log" - - "vitess.io/vitess/go/vt/vtorc/config" - "vitess.io/vitess/go/vt/vtorc/util" - - "github.com/patrickmn/go-cache" + "vitess.io/vitess/go/vt/vtorc/db" ) -var lastHealthCheckUnixNano int64 -var lastGoodHealthCheckUnixNano int64 -var LastContinousCheckHealthy int64 var FirstDiscoveryCycleComplete atomic.Bool -var lastHealthCheckCache = cache.New(config.HealthPollSeconds*time.Second, time.Second) - type NodeHealth struct { - Hostname string - Token string - AppVersion string - FirstSeenActive string - LastSeenActive string - ExtraInfo string - Command string - DBBackend string - + Healthy bool LastReported time.Time - onceHistory sync.Once - onceUpdate sync.Once -} - -func NewNodeHealth() *NodeHealth { - return &NodeHealth{ - Hostname: ThisHostname, - Token: util.ProcessToken.Hash, - } } -func (nodeHealth *NodeHealth) Update() *NodeHealth { - nodeHealth.onceUpdate.Do(func() { - nodeHealth.Hostname = ThisHostname - nodeHealth.Token = util.ProcessToken.Hash - }) - nodeHealth.LastReported = time.Now() - return nodeHealth -} - -var ThisNodeHealth = NewNodeHealth() - -type HealthStatus struct { - Healthy bool - Hostname string - Token string - IsActiveNode bool - DiscoveredOnce bool - ActiveNode *NodeHealth - Error error - AvailableNodes [](*NodeHealth) - RaftLeader string - IsRaftLeader bool - RaftLeaderURI string - RaftAdvertise string - RaftHealthyMembers []string -} - -type VTOrcExecutionMode string - -const ( - VTOrcExecutionCliMode VTOrcExecutionMode = "CLIMode" - VTOrcExecutionHTTPMode VTOrcExecutionMode = "HttpMode" -) - -var continuousRegistrationOnce sync.Once - -func RegisterNode(nodeHealth *NodeHealth) (healthy bool, err error) { - nodeHealth.Update() - healthy, err = WriteRegisterNode(nodeHealth) - atomic.StoreInt64(&lastHealthCheckUnixNano, time.Now().UnixNano()) - if healthy { - atomic.StoreInt64(&lastGoodHealthCheckUnixNano, time.Now().UnixNano()) - } - return healthy, err -} +var ThisNodeHealth = &NodeHealth{} -// HealthTest attempts to write to the backend database and get a result -func HealthTest() (health *HealthStatus, err error) { - cacheKey := util.ProcessToken.Hash - if healthStatus, found := lastHealthCheckCache.Get(cacheKey); found { - health = healthStatus.(*HealthStatus) - health.DiscoveredOnce = FirstDiscoveryCycleComplete.Load() - return +// writeHealthToDatabase writes to the database and returns if it was successful. +func writeHealthToDatabase() bool { + _, err := db.ExecVTOrc("delete from node_health") + if err != nil { + log.Error(err) + return false } - - health = &HealthStatus{Healthy: false, Hostname: ThisHostname, Token: util.ProcessToken.Hash} - defer lastHealthCheckCache.Set(cacheKey, health, cache.DefaultExpiration) - - healthy, err := RegisterNode(ThisNodeHealth) + sqlResult, err := db.ExecVTOrc(`insert into node_health (last_seen_active) values (now())`) if err != nil { - health.Error = err log.Error(err) - return health, err + return false } - health.Healthy = healthy - health.DiscoveredOnce = FirstDiscoveryCycleComplete.Load() - - if health.ActiveNode, health.IsActiveNode, err = ElectedNode(); err != nil { - health.Error = err + rows, err := sqlResult.RowsAffected() + if err != nil { log.Error(err) - return health, err + return false } - health.AvailableNodes, _ = ReadAvailableNodes(true) - - return health, nil + return rows > 0 } -// ContinuousRegistration will continuously update the node_health -// table showing that the current process is still running. -func ContinuousRegistration(extraInfo string, command string) { - ThisNodeHealth.ExtraInfo = extraInfo - ThisNodeHealth.Command = command - continuousRegistrationOnce.Do(func() { - tickOperation := func() { - healthy, err := RegisterNode(ThisNodeHealth) - if err != nil { - log.Errorf("ContinuousRegistration: RegisterNode failed: %+v", err) - } - if healthy { - atomic.StoreInt64(&LastContinousCheckHealthy, 1) - } else { - atomic.StoreInt64(&LastContinousCheckHealthy, 0) - } - } - // First one is synchronous - tickOperation() - go func() { - registrationTick := time.Tick(config.HealthPollSeconds * time.Second) - for range registrationTick { - // We already run inside a go-routine so - // do not do this asynchronously. If we - // get stuck then we don't want to fill up - // the backend pool with connections running - // this maintenance operation. - tickOperation() - } - }() - }) +// HealthTest attempts to write to the backend database and get a result +func HealthTest() (health *NodeHealth, discoveredOnce bool) { + ThisNodeHealth.LastReported = time.Now() + discoveredOnce = FirstDiscoveryCycleComplete.Load() + ThisNodeHealth.Healthy = writeHealthToDatabase() + + return ThisNodeHealth, discoveredOnce } diff --git a/go/vt/vtorc/process/health_dao.go b/go/vt/vtorc/process/health_dao.go deleted file mode 100644 index 59ea557223d..00000000000 --- a/go/vt/vtorc/process/health_dao.go +++ /dev/null @@ -1,176 +0,0 @@ -/* - Copyright 2015 Shlomi Noach, courtesy Booking.com - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package process - -import ( - "time" - - "vitess.io/vitess/go/vt/external/golib/sqlutils" - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/vtorc/config" - "vitess.io/vitess/go/vt/vtorc/db" -) - -// WriteRegisterNode writes down this node in the node_health table -func WriteRegisterNode(nodeHealth *NodeHealth) (healthy bool, err error) { - timeNow := time.Now() - reportedAgo := timeNow.Sub(nodeHealth.LastReported) - reportedSecondsAgo := int64(reportedAgo.Seconds()) - if reportedSecondsAgo > config.HealthPollSeconds*2 { - // This entry is too old. No reason to persist it; already expired. - return false, nil - } - - nodeHealth.onceHistory.Do(func() { - _, _ = db.ExecVTOrc(` - insert ignore into node_health_history - (hostname, token, first_seen_active, extra_info, command, app_version) - values - (?, ?, NOW(), ?, ?, ?) - `, - nodeHealth.Hostname, nodeHealth.Token, nodeHealth.ExtraInfo, nodeHealth.Command, - nodeHealth.AppVersion, - ) - }) - { - sqlResult, err := db.ExecVTOrc(` - update node_health set - last_seen_active = now() - interval ? second, - extra_info = case when ? != '' then ? else extra_info end, - app_version = ?, - incrementing_indicator = incrementing_indicator + 1 - where - hostname = ? - and token = ? - `, - reportedSecondsAgo, - nodeHealth.ExtraInfo, nodeHealth.ExtraInfo, - nodeHealth.AppVersion, - nodeHealth.Hostname, nodeHealth.Token, - ) - if err != nil { - log.Error(err) - return false, err - } - rows, err := sqlResult.RowsAffected() - if err != nil { - log.Error(err) - return false, err - } - if rows > 0 { - return true, nil - } - } - // Got here? The UPDATE didn't work. Row isn't there. - { - dbBackend := config.Config.SQLite3DataFile - sqlResult, err := db.ExecVTOrc(` - insert ignore into node_health - (hostname, token, first_seen_active, last_seen_active, extra_info, command, app_version, db_backend) - values ( - ?, ?, - now() - interval ? second, now() - interval ? second, - ?, ?, ?, ?) - `, - nodeHealth.Hostname, nodeHealth.Token, - reportedSecondsAgo, reportedSecondsAgo, - nodeHealth.ExtraInfo, nodeHealth.Command, - nodeHealth.AppVersion, dbBackend, - ) - if err != nil { - log.Error(err) - return false, err - } - rows, err := sqlResult.RowsAffected() - if err != nil { - log.Error(err) - return false, err - } - if rows > 0 { - return true, nil - } - } - return false, nil -} - -// ExpireAvailableNodes is an aggressive purging method to remove -// node entries who have skipped their keepalive for two times. -func ExpireAvailableNodes() { - _, err := db.ExecVTOrc(` - delete - from node_health - where - last_seen_active < now() - interval ? second - `, - config.HealthPollSeconds*5, - ) - if err != nil { - log.Errorf("ExpireAvailableNodes: failed to remove old entries: %+v", err) - } -} - -// ExpireNodesHistory cleans up the nodes history and is run by -// the vtorc active node. -func ExpireNodesHistory() error { - _, err := db.ExecVTOrc(` - delete - from node_health_history - where - first_seen_active < now() - interval ? hour - `, - config.UnseenInstanceForgetHours, - ) - if err != nil { - log.Error(err) - } - return err -} - -func ReadAvailableNodes(onlyHTTPNodes bool) (nodes [](*NodeHealth), err error) { - extraInfo := "" - if onlyHTTPNodes { - extraInfo = string(VTOrcExecutionHTTPMode) - } - query := ` - select - hostname, token, app_version, first_seen_active, last_seen_active, db_backend - from - node_health - where - last_seen_active > now() - interval ? second - and ? in (extra_info, '') - order by - hostname - ` - - err = db.QueryVTOrc(query, sqlutils.Args(config.HealthPollSeconds*2, extraInfo), func(m sqlutils.RowMap) error { - nodeHealth := &NodeHealth{ - Hostname: m.GetString("hostname"), - Token: m.GetString("token"), - AppVersion: m.GetString("app_version"), - FirstSeenActive: m.GetString("first_seen_active"), - LastSeenActive: m.GetString("last_seen_active"), - DBBackend: m.GetString("db_backend"), - } - nodes = append(nodes, nodeHealth) - return nil - }) - if err != nil { - log.Error(err) - } - return nodes, err -} diff --git a/go/vt/vtorc/process/host.go b/go/vt/vtorc/process/host.go deleted file mode 100644 index 21e3909cbdd..00000000000 --- a/go/vt/vtorc/process/host.go +++ /dev/null @@ -1,33 +0,0 @@ -/* - Copyright 2015 Shlomi Noach, courtesy Booking.com - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package process - -import ( - "os" - - "vitess.io/vitess/go/vt/log" -) - -var ThisHostname string - -func init() { - var err error - ThisHostname, err = os.Hostname() - if err != nil { - log.Fatalf("Cannot resolve self hostname; required. Aborting. %+v", err) - } -} diff --git a/go/vt/vtorc/server/api.go b/go/vt/vtorc/server/api.go index 60fdf226e95..5e9a84c0a29 100644 --- a/go/vt/vtorc/server/api.go +++ b/go/vt/vtorc/server/api.go @@ -247,14 +247,10 @@ func replicationAnalysisAPIHandler(response http.ResponseWriter, request *http.R // healthAPIHandler is the handler for the healthAPI endpoint func healthAPIHandler(response http.ResponseWriter, request *http.Request) { - health, err := process.HealthTest() - if err != nil { - http.Error(response, err.Error(), http.StatusInternalServerError) - return - } + health, discoveredOnce := process.HealthTest() code := http.StatusOK // If the process isn't healthy, or if the first discovery cycle hasn't completed, we return an internal server error. - if !health.Healthy || !health.DiscoveredOnce { + if !health.Healthy || !discoveredOnce { code = http.StatusInternalServerError } returnAsJSON(response, code, health) diff --git a/go/vt/vtorc/server/discovery.go b/go/vt/vtorc/server/discovery.go index 0e5cf5923c8..26e5c9e108e 100644 --- a/go/vt/vtorc/server/discovery.go +++ b/go/vt/vtorc/server/discovery.go @@ -19,13 +19,10 @@ package server import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/vtorc/logic" - "vitess.io/vitess/go/vt/vtorc/process" ) // StartVTOrcDiscovery starts VTOrc discovery serving func StartVTOrcDiscovery() { - process.ContinuousRegistration(string(process.VTOrcExecutionHTTPMode), "") - log.Info("Starting Discovery") go logic.ContinuousDiscovery() } diff --git a/go/vt/vtorc/util/token.go b/go/vt/vtorc/util/token.go deleted file mode 100644 index b3e61594c29..00000000000 --- a/go/vt/vtorc/util/token.go +++ /dev/null @@ -1,53 +0,0 @@ -/* - Copyright 2014 Outbrain Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package util - -import ( - "crypto/rand" - "crypto/sha256" - "encoding/hex" -) - -func toHash(input []byte) string { - hasher := sha256.New() - hasher.Write(input) - return hex.EncodeToString(hasher.Sum(nil)) -} - -func getRandomData() []byte { - size := 64 - rb := make([]byte, size) - _, _ = rand.Read(rb) - return rb -} - -func RandomHash() string { - return toHash(getRandomData()) -} - -// Token is used to identify and validate requests to this service -type Token struct { - Hash string -} - -var ProcessToken = NewToken() - -func NewToken() *Token { - return &Token{ - Hash: RandomHash(), - } -} diff --git a/go/vt/vtorc/util/token_test.go b/go/vt/vtorc/util/token_test.go deleted file mode 100644 index 5e634c05f31..00000000000 --- a/go/vt/vtorc/util/token_test.go +++ /dev/null @@ -1,28 +0,0 @@ -package util - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "vitess.io/vitess/go/vt/log" -) - -func init() { -} - -func TestNewToken(t *testing.T) { - token1 := NewToken() - - require.NotEqual(t, token1.Hash, "") - require.Equal(t, len(token1.Hash), 64) -} - -func TestNewTokenRandom(t *testing.T) { - log.Infof("test") - token1 := NewToken() - token2 := NewToken() - - // The following test can fail once in a quadrazillion eons - require.NotEqual(t, token1.Hash, token2.Hash) -}