From fed9d3bf2a7387c6677484bdd7bd9b0add2b8d6f Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 28 Mar 2024 19:17:55 +0530 Subject: [PATCH 01/13] feat: refactor topology_failure_detection into recovery_detection table Signed-off-by: Manan Gupta --- go/vt/vtorc/config/config.go | 1 - go/vt/vtorc/db/generate_base.go | 20 +---- go/vt/vtorc/inst/analysis.go | 4 +- go/vt/vtorc/inst/analysis_dao.go | 5 +- go/vt/vtorc/inst/instance_dao.go | 6 +- go/vt/vtorc/logic/topology_recovery.go | 24 +----- go/vt/vtorc/logic/topology_recovery_dao.go | 91 ++++++---------------- go/vt/vtorc/logic/vtorc.go | 3 +- 8 files changed, 38 insertions(+), 116 deletions(-) diff --git a/go/vt/vtorc/config/config.go b/go/vt/vtorc/config/config.go index ba3c41ddc61..5ef80164cea 100644 --- a/go/vt/vtorc/config/config.go +++ b/go/vt/vtorc/config/config.go @@ -44,7 +44,6 @@ const ( DiscoveryQueueMaxStatisticsSize = 120 DiscoveryCollectionRetentionSeconds = 120 UnseenInstanceForgetHours = 240 // Number of hours after which an unseen instance is forgotten - FailureDetectionPeriodBlockMinutes = 60 // The time for which an instance's failure discovery is kept "active", so as to avoid concurrent "discoveries" of the instance's failure; this precedes any recovery process, if any. ) var ( diff --git a/go/vt/vtorc/db/generate_base.go b/go/vt/vtorc/db/generate_base.go index 94daebbf7f0..98821338582 100644 --- a/go/vt/vtorc/db/generate_base.go +++ b/go/vt/vtorc/db/generate_base.go @@ -24,7 +24,7 @@ var TableNames = []string{ "topology_recovery", "database_instance_topology_history", "candidate_database_instance", - "topology_failure_detection", + "recovery_detection", "blocked_topology_recovery", "database_instance_last_analysis", "database_instance_analysis_changelog", @@ -236,28 +236,19 @@ CREATE TABLE candidate_database_instance ( CREATE INDEX last_suggested_idx_candidate_database_instance ON candidate_database_instance (last_suggested) `, ` -DROP TABLE IF EXISTS topology_failure_detection +DROP TABLE IF EXISTS recovery_detection `, ` -CREATE TABLE topology_failure_detection ( +CREATE TABLE recovery_detection ( detection_id integer, alias varchar(256) NOT NULL, - in_active_period tinyint NOT NULL DEFAULT '0', - start_active_period timestamp not null default (''), - end_active_period_unixtime int NOT NULL, - processing_node_hostname varchar(128) NOT NULL, - processcing_node_token varchar(128) NOT NULL, analysis varchar(128) NOT NULL, keyspace varchar(128) NOT NULL, shard varchar(128) NOT NULL, - count_affected_replicas int NOT NULL, - is_actionable tinyint not null default 0, + detection_timestamp timestamp NOT NULL default (''), PRIMARY KEY (detection_id) )`, ` -CREATE INDEX in_active_start_period_idx_topology_failure_detection ON topology_failure_detection (in_active_period, start_active_period) - `, - ` DROP TABLE IF EXISTS blocked_topology_recovery `, ` @@ -435,7 +426,4 @@ CREATE INDEX uid_idx_topology_recovery ON topology_recovery(uid) ` CREATE INDEX recovery_uid_idx_topology_recovery_steps ON topology_recovery_steps(recovery_uid) `, - ` -CREATE UNIQUE INDEX alias_active_recoverable_uidx_topology_failure_detection ON topology_failure_detection (alias, in_active_period, end_active_period_unixtime, is_actionable) - `, } diff --git a/go/vt/vtorc/inst/analysis.go b/go/vt/vtorc/inst/analysis.go index 54500621cb9..328b43df0c5 100644 --- a/go/vt/vtorc/inst/analysis.go +++ b/go/vt/vtorc/inst/analysis.go @@ -142,9 +142,7 @@ type ReplicationAnalysis struct { CountDelayedReplicas uint CountLaggingReplicas uint IsActionableRecovery bool - ProcessingNodeHostname string - ProcessingNodeToken string - StartActivePeriod string + RecoveryId int64 GTIDMode string MinReplicaGTIDMode string MaxReplicaGTIDMode string diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go index 749827f006c..b348d17d45f 100644 --- a/go/vt/vtorc/inst/analysis_dao.go +++ b/go/vt/vtorc/inst/analysis_dao.go @@ -31,7 +31,6 @@ import ( "vitess.io/vitess/go/vt/vtctl/reparentutil" "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/db" - "vitess.io/vitess/go/vt/vtorc/process" "vitess.io/vitess/go/vt/vtorc/util" "github.com/patrickmn/go-cache" @@ -302,9 +301,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna clusters := make(map[string]*clusterAnalysis) err := db.Db.QueryVTOrc(query, args, func(m sqlutils.RowMap) error { a := &ReplicationAnalysis{ - Analysis: NoProblem, - ProcessingNodeHostname: process.ThisHostname, - ProcessingNodeToken: util.ProcessToken.Hash, + Analysis: NoProblem, } tablet := &topodatapb.Tablet{} diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index 35b5d11bc95..a03071c3a4f 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -119,9 +119,11 @@ func ExecDBWriteFunc(f func() error) error { } func ExpireTableData(tableName string, timestampColumn string) error { - query := fmt.Sprintf("delete from %s where %s < NOW() - INTERVAL ? DAY", tableName, timestampColumn) writeFunc := func() error { - _, err := db.ExecVTOrc(query, config.Config.AuditPurgeDays) + _, err := db.ExecVTOrc( + fmt.Sprintf("delete from %s where %s < NOW() - INTERVAL ? DAY", tableName, timestampColumn), + config.Config.AuditPurgeDays, + ) return err } return ExecDBWriteFunc(writeFunc) diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index 8a1c3af5ac9..8a9517e5e1d 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -300,19 +300,6 @@ func checkAndRecoverGenericProblem(ctx context.Context, analysisEntry *inst.Repl return false, nil, nil } -// checkAndExecuteFailureDetectionProcesses tries to register for failure detection and potentially executes -// failure-detection processes. -func checkAndExecuteFailureDetectionProcesses(analysisEntry *inst.ReplicationAnalysis) (detectionRegistrationSuccess bool, processesExecutionAttempted bool, err error) { - if ok, _ := AttemptFailureDetectionRegistration(analysisEntry); !ok { - if util.ClearToLog("checkAndExecuteFailureDetectionProcesses", analysisEntry.AnalyzedInstanceAlias) { - log.Infof("checkAndExecuteFailureDetectionProcesses: could not register %+v detection on %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) - } - return false, false, nil - } - log.Infof("topology_recovery: detected %+v failure on %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceAlias) - return true, false, nil -} - // getCheckAndRecoverFunctionCode gets the recovery function code to use for the given analysis. func getCheckAndRecoverFunctionCode(analysisCode inst.AnalysisCode, tabletAlias string) recoveryFunction { switch analysisCode { @@ -500,17 +487,12 @@ func executeCheckAndRecoverFunction(analysisEntry *inst.ReplicationAnalysis) (er } // At this point we have validated there's a failure scenario for which we have a recovery path. - - // Initiate detection: - _, _, err = checkAndExecuteFailureDetectionProcesses(analysisEntry) + // Record the failure detected in the logs. + err = InsertRecoveryDetection(analysisEntry) if err != nil { - log.Errorf("executeCheckAndRecoverFunction: error on failure detection: %+v", err) + log.Errorf("executeCheckAndRecoverFunction: error on inserting recovery detection record: %+v", err) return err } - // We don't mind whether detection really executed the processes or not - // (it may have been silenced due to previous detection). We only care there's no error. - - // We're about to embark on recovery shortly... // Check for recovery being disabled globally if recoveryDisabledGlobally, err := IsRecoveryDisabled(); err != nil { diff --git a/go/vt/vtorc/logic/topology_recovery_dao.go b/go/vt/vtorc/logic/topology_recovery_dao.go index 4a7a6c77ef1..8da0f25b6f0 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao.go +++ b/go/vt/vtorc/logic/topology_recovery_dao.go @@ -29,83 +29,39 @@ import ( "vitess.io/vitess/go/vt/vtorc/util" ) -// AttemptFailureDetectionRegistration tries to add a failure-detection entry; if this fails that means the problem has already been detected -func AttemptFailureDetectionRegistration(analysisEntry *inst.ReplicationAnalysis) (registrationSuccessful bool, err error) { - args := sqlutils.Args( - analysisEntry.AnalyzedInstanceAlias, - process.ThisHostname, - util.ProcessToken.Hash, - string(analysisEntry.Analysis), - analysisEntry.ClusterDetails.Keyspace, - analysisEntry.ClusterDetails.Shard, - analysisEntry.CountReplicas, - analysisEntry.IsActionableRecovery, - ) - startActivePeriodHint := "now()" - if analysisEntry.StartActivePeriod != "" { - startActivePeriodHint = "?" - args = append(args, analysisEntry.StartActivePeriod) - } - - query := fmt.Sprintf(` +// InsertRecoveryDetection inserts the recovery analysis that has been detected. +func InsertRecoveryDetection(analysisEntry *inst.ReplicationAnalysis) error { + sqlResult, err := db.ExecVTOrc(` insert ignore - into topology_failure_detection ( + into recovery_detection ( alias, - in_active_period, - end_active_period_unixtime, - processing_node_hostname, - processcing_node_token, analysis, keyspace, shard, - count_affected_replicas, - is_actionable, - start_active_period + detection_timestamp ) values ( - ?, - 1, - 0, ?, ?, ?, ?, - ?, - ?, - ?, - %s - ) - `, startActivePeriodHint) - - sqlResult, err := db.ExecVTOrc(query, args...) + now() + )`, + analysisEntry.AnalyzedInstanceAlias, + string(analysisEntry.Analysis), + analysisEntry.ClusterDetails.Keyspace, + analysisEntry.ClusterDetails.Shard, + ) if err != nil { log.Error(err) - return false, err + return err } - rows, err := sqlResult.RowsAffected() - if err != nil { - log.Error(err) - return false, err - } - return (rows > 0), nil -} - -// ClearActiveFailureDetections clears the "in_active_period" flag for old-enough detections, thereby allowing for -// further detections on cleared instances. -func ClearActiveFailureDetections() error { - _, err := db.ExecVTOrc(` - update topology_failure_detection set - in_active_period = 0, - end_active_period_unixtime = UNIX_TIMESTAMP() - where - in_active_period = 1 - AND start_active_period < NOW() - INTERVAL ? MINUTE - `, - config.FailureDetectionPeriodBlockMinutes, - ) + id, err := sqlResult.LastInsertId() if err != nil { log.Error(err) + return err } - return err + analysisEntry.RecoveryId = id + return nil } func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecovery, error) { @@ -139,7 +95,7 @@ func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecover ?, ?, ?, - (select ifnull(max(detection_id), 0) from topology_failure_detection where alias = ?) + ? ) `, sqlutils.NilIfZero(topologyRecovery.ID), @@ -151,6 +107,7 @@ func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecover analysisEntry.ClusterDetails.Shard, analysisEntry.CountReplicas, analysisEntry.AnalyzedInstanceAlias, + analysisEntry.RecoveryId, ) if err != nil { return nil, err @@ -548,17 +505,17 @@ func writeTopologyRecoveryStep(topologyRecoveryStep *TopologyRecoveryStep) error return err } -// ExpireFailureDetectionHistory removes old rows from the topology_failure_detection table -func ExpireFailureDetectionHistory() error { - return inst.ExpireTableData("topology_failure_detection", "start_active_period") +// ExpireRecoveryDetectionHistory removes old rows from the recovery_detection table +func ExpireRecoveryDetectionHistory() error { + return inst.ExpireTableData("recovery_detection", "detection_timestamp") } -// ExpireTopologyRecoveryHistory removes old rows from the topology_failure_detection table +// ExpireTopologyRecoveryHistory removes old rows from the topology_recovery table func ExpireTopologyRecoveryHistory() error { return inst.ExpireTableData("topology_recovery", "start_active_period") } -// ExpireTopologyRecoveryStepsHistory removes old rows from the topology_failure_detection table +// ExpireTopologyRecoveryStepsHistory removes old rows from the topology_recovery_steps table func ExpireTopologyRecoveryStepsHistory() error { return inst.ExpireTableData("topology_recovery_steps", "audit_at") } diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index 66c5590831b..bf71f9f1efe 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -374,7 +374,7 @@ func ContinuousDiscovery() { go inst.ExpireStaleInstanceBinlogCoordinates() go process.ExpireNodesHistory() go process.ExpireAvailableNodes() - go ExpireFailureDetectionHistory() + go ExpireRecoveryDetectionHistory() go ExpireTopologyRecoveryHistory() go ExpireTopologyRecoveryStepsHistory() } @@ -382,7 +382,6 @@ func ContinuousDiscovery() { case <-recoveryTick: go func() { if IsLeaderOrActive() { - go ClearActiveFailureDetections() go ClearActiveRecoveries() go ExpireBlockedRecoveries() go AcknowledgeCrashedRecoveries() From ccb3af4213c7874de769a5f09e0a87e0c283d031 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 28 Mar 2024 20:21:10 +0530 Subject: [PATCH 02/13] feat: remove in_active_period and store the start of the recovery instead Signed-off-by: Manan Gupta --- go/vt/vtorc/db/generate_base.go | 14 +- go/vt/vtorc/logic/topology_recovery.go | 13 +- go/vt/vtorc/logic/topology_recovery_dao.go | 161 +++----------------- go/vt/vtorc/logic/topology_recovery_test.go | 4 +- go/vt/vtorc/logic/vtorc.go | 2 - 5 files changed, 31 insertions(+), 163 deletions(-) diff --git a/go/vt/vtorc/db/generate_base.go b/go/vt/vtorc/db/generate_base.go index 98821338582..f631b746d3c 100644 --- a/go/vt/vtorc/db/generate_base.go +++ b/go/vt/vtorc/db/generate_base.go @@ -172,9 +172,7 @@ DROP TABLE IF EXISTS topology_recovery CREATE TABLE topology_recovery ( recovery_id integer, alias varchar(256) NOT NULL, - in_active_period tinyint NOT NULL DEFAULT 0, - start_active_period timestamp not null default (''), - end_active_period_unixtime int, + start_recovery timestamp NOT NULL DEFAULT (''), end_recovery timestamp NULL DEFAULT NULL, processing_node_hostname varchar(128) NOT NULL, processcing_node_token varchar(128) NOT NULL, @@ -194,13 +192,7 @@ CREATE TABLE topology_recovery ( PRIMARY KEY (recovery_id) )`, ` -CREATE INDEX in_active_start_period_idx_topology_recovery ON topology_recovery (in_active_period, start_active_period) - `, - ` -CREATE INDEX start_active_period_idx_topology_recovery ON topology_recovery (start_active_period) - `, - ` -CREATE UNIQUE INDEX alias_active_period_uidx_topology_recovery ON topology_recovery (alias, in_active_period, end_active_period_unixtime) +CREATE INDEX start_recovery_idx_topology_recovery ON topology_recovery (start_recovery) `, ` DROP TABLE IF EXISTS database_instance_topology_history @@ -400,7 +392,7 @@ CREATE TABLE vitess_shard ( CREATE INDEX source_host_port_idx_database_instance_database_instance on database_instance (source_host, source_port) `, ` -CREATE INDEX keyspace_shard_in_active_idx_topology_recovery on topology_recovery (keyspace, shard, in_active_period) +CREATE INDEX keyspace_shard_idx_topology_recovery on topology_recovery (keyspace, shard) `, ` CREATE INDEX end_recovery_idx_topology_recovery on topology_recovery (end_recovery) diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index 8a9517e5e1d..7874515a90a 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -107,7 +107,6 @@ type TopologyRecovery struct { SuccessorHostname string SuccessorPort int SuccessorAlias string - IsActive bool IsSuccessful bool AllErrors []string RecoveryStartTimestamp string @@ -187,7 +186,7 @@ func resolveRecovery(topologyRecovery *TopologyRecovery, successorInstance *inst // recoverPrimaryHasPrimary resets the replication on the primary instance func recoverPrimaryHasPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { - topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false, true) + topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry) if topologyRecovery == nil { _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixPrimaryHasPrimary.", analysisEntry.AnalyzedInstanceAlias)) return false, nil, err @@ -223,7 +222,7 @@ func runEmergencyReparentOp(ctx context.Context, analysisEntry *inst.Replication return false, nil, err } - topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, true, true) + topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry) if topologyRecovery == nil { _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another %v.", analysisEntry.AnalyzedInstanceAlias, recoveryName)) return false, nil, err @@ -687,7 +686,7 @@ func postPrsCompletion(topologyRecovery *TopologyRecovery, analysisEntry *inst.R // electNewPrimary elects a new primary while none were present before. func electNewPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { - topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false /*failIfFailedInstanceInActiveRecovery*/, true /*failIfClusterInActiveRecovery*/) + topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry) if topologyRecovery == nil || err != nil { _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another electNewPrimary.", analysisEntry.AnalyzedInstanceAlias)) return false, nil, err @@ -736,7 +735,7 @@ func electNewPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysi // fixPrimary sets the primary as read-write. func fixPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { - topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false, true) + topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry) if topologyRecovery == nil { _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixPrimary.", analysisEntry.AnalyzedInstanceAlias)) return false, nil, err @@ -767,7 +766,7 @@ func fixPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (r // fixReplica sets the replica as read-only and points it at the current primary. func fixReplica(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { - topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false, true) + topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry) if topologyRecovery == nil { _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another fixReplica.", analysisEntry.AnalyzedInstanceAlias)) return false, nil, err @@ -808,7 +807,7 @@ func fixReplica(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (r // recoverErrantGTIDDetected changes the tablet type of a replica tablet that has errant GTIDs. func recoverErrantGTIDDetected(ctx context.Context, analysisEntry *inst.ReplicationAnalysis) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) { - topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry, false, true) + topologyRecovery, err = AttemptRecoveryRegistration(analysisEntry) if topologyRecovery == nil { _ = AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another recoverErrantGTIDDetected.", analysisEntry.AnalyzedInstanceAlias)) return false, nil, err diff --git a/go/vt/vtorc/logic/topology_recovery_dao.go b/go/vt/vtorc/logic/topology_recovery_dao.go index 8da0f25b6f0..3112a8b6e8c 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao.go +++ b/go/vt/vtorc/logic/topology_recovery_dao.go @@ -72,9 +72,7 @@ func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecover recovery_id, uid, alias, - in_active_period, - start_active_period, - end_active_period_unixtime, + start_recovery, processing_node_hostname, processcing_node_token, analysis, @@ -86,9 +84,7 @@ func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecover ?, ?, ?, - 1, NOW(), - 0, ?, ?, ?, @@ -128,47 +124,22 @@ func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecover } // AttemptRecoveryRegistration tries to add a recovery entry; if this fails that means recovery is already in place. -func AttemptRecoveryRegistration(analysisEntry *inst.ReplicationAnalysis, failIfFailedInstanceInActiveRecovery bool, failIfClusterInActiveRecovery bool) (*TopologyRecovery, error) { - if failIfFailedInstanceInActiveRecovery { - // Let's check if this instance has just been promoted recently and is still in active period. - // If so, we reject recovery registration to avoid flapping. - recoveries, err := ReadInActivePeriodSuccessorInstanceRecovery(analysisEntry.AnalyzedInstanceAlias) - if err != nil { - log.Error(err) - return nil, err - } - if len(recoveries) > 0 { - _ = RegisterBlockedRecoveries(analysisEntry, recoveries) - errMsg := fmt.Sprintf("AttemptRecoveryRegistration: tablet %+v has recently been promoted (by failover of %+v) and is in active period. It will not be failed over. You may acknowledge the failure on %+v (-c ack-instance-recoveries) to remove this blockage", analysisEntry.AnalyzedInstanceAlias, recoveries[0].AnalysisEntry.AnalyzedInstanceAlias, recoveries[0].AnalysisEntry.AnalyzedInstanceAlias) - log.Errorf(errMsg) - return nil, fmt.Errorf(errMsg) - } - } - if failIfClusterInActiveRecovery { - // Let's check if this cluster has just experienced a failover of the same analysis and is still in active period. - // If so, we reject recovery registration to avoid flapping. - recoveries, err := ReadInActivePeriodClusterRecovery(analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard, string(analysisEntry.Analysis)) - if err != nil { - log.Error(err) - return nil, err - } - if len(recoveries) > 0 { - _ = RegisterBlockedRecoveries(analysisEntry, recoveries) - errMsg := fmt.Sprintf("AttemptRecoveryRegistration: keyspace %+v shard %+v has recently experienced a failover (of %+v) and is in active period. It will not be failed over again. You may acknowledge the failure on this cluster (-c ack-cluster-recoveries) or on %+v (-c ack-instance-recoveries) to remove this blockage", analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard, recoveries[0].AnalysisEntry.AnalyzedInstanceAlias, recoveries[0].AnalysisEntry.AnalyzedInstanceAlias) - log.Errorf(errMsg) - return nil, fmt.Errorf(errMsg) - } +func AttemptRecoveryRegistration(analysisEntry *inst.ReplicationAnalysis) (*TopologyRecovery, error) { + // Check if there is an active recovery in progress for the cluster of the given instance. + recoveries, err := ReadActiveClusterRecoveries(analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard) + if err != nil { + log.Error(err) + return nil, err } - if !failIfFailedInstanceInActiveRecovery { - // Implicitly acknowledge this instance's possibly existing active recovery, provided they are completed. - _, _ = AcknowledgeInstanceCompletedRecoveries(analysisEntry.AnalyzedInstanceAlias, "vtorc", fmt.Sprintf("implicit acknowledge due to user invocation of recovery on same instance: %+v", analysisEntry.AnalyzedInstanceAlias)) - // The fact we only acknowledge a completed recovery solves the possible case of two DBAs simultaneously - // trying to recover the same instance at the same time + if len(recoveries) > 0 { + errMsg := fmt.Sprintf("AttemptRecoveryRegistration: Active recovery (id:%v) in the cluster %s:%s for %s", recoveries[0].ID, analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard, recoveries[0].AnalysisEntry.Analysis) + log.Errorf(errMsg) + return nil, fmt.Errorf(errMsg) } topologyRecovery := NewTopologyRecovery(*analysisEntry) - topologyRecovery, err := writeTopologyRecovery(topologyRecovery) + topologyRecovery, err = writeTopologyRecovery(topologyRecovery) if err != nil { log.Error(err) return nil, err @@ -176,25 +147,6 @@ func AttemptRecoveryRegistration(analysisEntry *inst.ReplicationAnalysis, failIf return topologyRecovery, nil } -// ClearActiveRecoveries clears the "in_active_period" flag for old-enough recoveries, thereby allowing for -// further recoveries on cleared instances. -func ClearActiveRecoveries() error { - _, err := db.ExecVTOrc(` - update topology_recovery set - in_active_period = 0, - end_active_period_unixtime = UNIX_TIMESTAMP() - where - in_active_period = 1 - AND start_active_period < NOW() - INTERVAL ? SECOND - `, - config.Config.RecoveryPeriodBlockSeconds, - ) - if err != nil { - log.Error(err) - } - return err -} - // RegisterBlockedRecoveries writes down currently blocked recoveries, and indicates what recovery they are blocked on. // Recoveries are blocked thru the in_active_period flag, which comes to avoid flapping. func RegisterBlockedRecoveries(analysisEntry *inst.ReplicationAnalysis, blockingRecoveries []*TopologyRecovery) error { @@ -291,63 +243,6 @@ func ExpireBlockedRecoveries() error { return err } -// acknowledgeRecoveries sets acknowledged* details and clears the in_active_period flags from a set of entries -func acknowledgeRecoveries(owner string, comment string, markEndRecovery bool, whereClause string, args []any) (countAcknowledgedEntries int64, err error) { - additionalSet := `` - if markEndRecovery { - additionalSet = ` - end_recovery=IFNULL(end_recovery, NOW()), - ` - } - query := fmt.Sprintf(` - update topology_recovery set - in_active_period = 0, - end_active_period_unixtime = case when end_active_period_unixtime = 0 then UNIX_TIMESTAMP() else end_active_period_unixtime end, - %s - acknowledged = 1, - acknowledged_at = NOW(), - acknowledged_by = ?, - acknowledge_comment = ? - where - acknowledged = 0 - and - %s - `, additionalSet, whereClause) - args = append(sqlutils.Args(owner, comment), args...) - sqlResult, err := db.ExecVTOrc(query, args...) - if err != nil { - log.Error(err) - return 0, err - } - rows, err := sqlResult.RowsAffected() - if err != nil { - log.Error(err) - } - return rows, err -} - -// AcknowledgeInstanceCompletedRecoveries marks active and COMPLETED recoveries for given instance as acknowledged. -// This also implied clearing their active period, which in turn enables further recoveries on those topologies -func AcknowledgeInstanceCompletedRecoveries(tabletAlias string, owner string, comment string) (countAcknowledgedEntries int64, err error) { - whereClause := ` - alias = ? - and end_recovery is not null - ` - return acknowledgeRecoveries(owner, comment, false, whereClause, sqlutils.Args(tabletAlias)) -} - -// AcknowledgeCrashedRecoveries marks recoveries whose processing nodes has crashed as acknowledged. -func AcknowledgeCrashedRecoveries() (countAcknowledgedEntries int64, err error) { - whereClause := ` - in_active_period = 1 - and end_recovery is null - and concat(processing_node_hostname, ':', processcing_node_token) not in ( - select concat(hostname, ':', token) from node_health - ) - ` - return acknowledgeRecoveries("vtorc", "detected crashed recovery", true, whereClause, sqlutils.Args()) -} - // ResolveRecovery is called on completion of a recovery process and updates the recovery status. // It does not clear the "active period" as this still takes place in order to avoid flapping. func writeResolveRecovery(topologyRecovery *TopologyRecovery) error { @@ -378,9 +273,7 @@ func readRecoveries(whereCondition string, limit string, args []any) ([]*Topolog recovery_id, uid, alias, - (IFNULL(end_active_period_unixtime, 0) = 0) as is_active, - start_active_period, - IFNULL(end_active_period_unixtime, 0) as end_active_period_unixtime, + start_recovery, IFNULL(end_recovery, '') AS end_recovery, is_successful, processing_node_hostname, @@ -408,8 +301,7 @@ func readRecoveries(whereCondition string, limit string, args []any) ([]*Topolog topologyRecovery.ID = m.GetInt64("recovery_id") topologyRecovery.UID = m.GetString("uid") - topologyRecovery.IsActive = m.GetBool("is_active") - topologyRecovery.RecoveryStartTimestamp = m.GetString("start_active_period") + topologyRecovery.RecoveryStartTimestamp = m.GetString("start_recovery") topologyRecovery.RecoveryEndTimestamp = m.GetString("end_recovery") topologyRecovery.IsSuccessful = m.GetBool("is_successful") topologyRecovery.ProcessingNodeHostname = m.GetString("processing_node_hostname") @@ -444,27 +336,14 @@ func readRecoveries(whereCondition string, limit string, args []any) ([]*Topolog return res, err } -// ReadInActivePeriodClusterRecovery reads recoveries (possibly complete!) that are in active period for the analysis. -// (may be used to block further recoveries of the same analysis on this cluster) -func ReadInActivePeriodClusterRecovery(keyspace string, shard, analysis string) ([]*TopologyRecovery, error) { +// ReadActiveClusterRecoveries reads recoveries that are ongoing for the given cluster. +func ReadActiveClusterRecoveries(keyspace string, shard string) ([]*TopologyRecovery, error) { whereClause := ` where - in_active_period=1 + end_recovery IS NULL and keyspace=? - and shard=? - and analysis=?` - return readRecoveries(whereClause, ``, sqlutils.Args(keyspace, shard, analysis)) -} - -// ReadInActivePeriodSuccessorInstanceRecovery reads completed recoveries for a given instance, where said instance -// was promoted as result, still in active period (may be used to block further recoveries should this instance die) -func ReadInActivePeriodSuccessorInstanceRecovery(tabletAlias string) ([]*TopologyRecovery, error) { - whereClause := ` - where - in_active_period=1 - and - successor_alias=?` - return readRecoveries(whereClause, ``, sqlutils.Args(tabletAlias)) + and shard=?` + return readRecoveries(whereClause, ``, sqlutils.Args(keyspace, shard)) } // ReadRecentRecoveries reads latest recovery entries from topology_recovery @@ -512,7 +391,7 @@ func ExpireRecoveryDetectionHistory() error { // ExpireTopologyRecoveryHistory removes old rows from the topology_recovery table func ExpireTopologyRecoveryHistory() error { - return inst.ExpireTableData("topology_recovery", "start_active_period") + return inst.ExpireTableData("topology_recovery", "start_recovery") } // ExpireTopologyRecoveryStepsHistory removes old rows from the topology_recovery_steps table diff --git a/go/vt/vtorc/logic/topology_recovery_test.go b/go/vt/vtorc/logic/topology_recovery_test.go index f636a194283..e95c51e2a17 100644 --- a/go/vt/vtorc/logic/topology_recovery_test.go +++ b/go/vt/vtorc/logic/topology_recovery_test.go @@ -181,12 +181,12 @@ func TestDifferentAnalysescHaveDifferentCooldowns(t *testing.T) { defer cancel() ts = memorytopo.NewServer(ctx, "zone1") - _, err = AttemptRecoveryRegistration(&replicaAnalysisEntry, false, true) + _, err = AttemptRecoveryRegistration(&replicaAnalysisEntry) require.Nil(t, err) // even though this is another recovery on the same cluster, allow it to go through // because the analysis is different (ReplicationStopped vs DeadPrimary) - _, err = AttemptRecoveryRegistration(&primaryAnalysisEntry, true, true) + _, err = AttemptRecoveryRegistration(&primaryAnalysisEntry) require.Nil(t, err) } diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index bf71f9f1efe..891b48b9748 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -382,9 +382,7 @@ func ContinuousDiscovery() { case <-recoveryTick: go func() { if IsLeaderOrActive() { - go ClearActiveRecoveries() go ExpireBlockedRecoveries() - go AcknowledgeCrashedRecoveries() go inst.ExpireInstanceAnalysisChangelog() go func() { From b163fcc91f522f687c61b3df41b97a85811a64aa Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 28 Mar 2024 20:23:17 +0530 Subject: [PATCH 03/13] feat: get rid of blocked recoveries Signed-off-by: Manan Gupta --- go/vt/vtorc/db/generate_base.go | 20 ---- go/vt/vtorc/logic/topology_recovery_dao.go | 96 ------------------- .../vtorc/logic/topology_recovery_dao_test.go | 34 ------- go/vt/vtorc/logic/vtorc.go | 1 - 4 files changed, 151 deletions(-) diff --git a/go/vt/vtorc/db/generate_base.go b/go/vt/vtorc/db/generate_base.go index f631b746d3c..2e960f20eee 100644 --- a/go/vt/vtorc/db/generate_base.go +++ b/go/vt/vtorc/db/generate_base.go @@ -25,7 +25,6 @@ var TableNames = []string{ "database_instance_topology_history", "candidate_database_instance", "recovery_detection", - "blocked_topology_recovery", "database_instance_last_analysis", "database_instance_analysis_changelog", "node_health_history", @@ -241,22 +240,6 @@ CREATE TABLE recovery_detection ( PRIMARY KEY (detection_id) )`, ` -DROP TABLE IF EXISTS blocked_topology_recovery -`, - ` -CREATE TABLE blocked_topology_recovery ( - alias varchar(256) NOT NULL, - keyspace varchar(128) NOT NULL, - shard varchar(128) NOT NULL, - analysis varchar(128) NOT NULL, - last_blocked_timestamp timestamp not null default (''), - blocking_recovery_id bigint, - PRIMARY KEY (alias) -)`, - ` -CREATE INDEX keyspace_shard_blocked_idx_blocked_topology_recovery ON blocked_topology_recovery (keyspace, shard, last_blocked_timestamp) - `, - ` DROP TABLE IF EXISTS database_instance_last_analysis `, ` @@ -401,9 +384,6 @@ CREATE INDEX end_recovery_idx_topology_recovery on topology_recovery (end_recove CREATE INDEX acknowledged_idx_topology_recovery on topology_recovery (acknowledged, acknowledged_at) `, ` -CREATE INDEX last_blocked_idx_blocked_topology_recovery on blocked_topology_recovery (last_blocked_timestamp) - `, - ` CREATE INDEX instance_timestamp_idx_database_instance_analysis_changelog on database_instance_analysis_changelog (alias, analysis_timestamp) `, ` diff --git a/go/vt/vtorc/logic/topology_recovery_dao.go b/go/vt/vtorc/logic/topology_recovery_dao.go index 3112a8b6e8c..8f9ae129896 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao.go +++ b/go/vt/vtorc/logic/topology_recovery_dao.go @@ -147,102 +147,6 @@ func AttemptRecoveryRegistration(analysisEntry *inst.ReplicationAnalysis) (*Topo return topologyRecovery, nil } -// RegisterBlockedRecoveries writes down currently blocked recoveries, and indicates what recovery they are blocked on. -// Recoveries are blocked thru the in_active_period flag, which comes to avoid flapping. -func RegisterBlockedRecoveries(analysisEntry *inst.ReplicationAnalysis, blockingRecoveries []*TopologyRecovery) error { - for _, recovery := range blockingRecoveries { - _, err := db.ExecVTOrc(` - insert - into blocked_topology_recovery ( - alias, - keyspace, - shard, - analysis, - last_blocked_timestamp, - blocking_recovery_id - ) values ( - ?, - ?, - ?, - ?, - NOW(), - ? - ) - on duplicate key update - keyspace=values(keyspace), - shard=values(shard), - analysis=values(analysis), - last_blocked_timestamp=values(last_blocked_timestamp), - blocking_recovery_id=values(blocking_recovery_id) - `, analysisEntry.AnalyzedInstanceAlias, - analysisEntry.ClusterDetails.Keyspace, - analysisEntry.ClusterDetails.Shard, - string(analysisEntry.Analysis), - recovery.ID, - ) - if err != nil { - log.Error(err) - } - } - return nil -} - -// ExpireBlockedRecoveries clears listing of blocked recoveries that are no longer actually blocked. -func ExpireBlockedRecoveries() error { - // Older recovery is acknowledged by now, hence blocked recovery should be released. - // Do NOTE that the data in blocked_topology_recovery is only used for auditing: it is NOT the data - // based on which we make automated decisions. - - query := ` - select - blocked_topology_recovery.alias - from - blocked_topology_recovery - left join topology_recovery on (blocking_recovery_id = topology_recovery.recovery_id and acknowledged = 0) - where - acknowledged is null - ` - var expiredAliases []string - err := db.QueryVTOrc(query, sqlutils.Args(), func(m sqlutils.RowMap) error { - expiredAliases = append(expiredAliases, m.GetString("alias")) - return nil - }) - - for _, expiredAlias := range expiredAliases { - _, err := db.ExecVTOrc(` - delete - from blocked_topology_recovery - where - alias = ? - `, - expiredAlias, - ) - if err != nil { - log.Error(err) - return err - } - } - - if err != nil { - log.Error(err) - return err - } - // Some oversampling, if a problem has not been noticed for some time (e.g. the server came up alive - // before action was taken), expire it. - // Recall that RegisterBlockedRecoveries continuously updates the last_blocked_timestamp column. - _, err = db.ExecVTOrc(` - delete - from blocked_topology_recovery - where - last_blocked_timestamp < NOW() - interval ? second - `, config.Config.RecoveryPollSeconds*2, - ) - if err != nil { - log.Error(err) - } - return err -} - // ResolveRecovery is called on completion of a recovery process and updates the recovery status. // It does not clear the "active period" as this still takes place in order to avoid flapping. func writeResolveRecovery(topologyRecovery *TopologyRecovery) error { diff --git a/go/vt/vtorc/logic/topology_recovery_dao_test.go b/go/vt/vtorc/logic/topology_recovery_dao_test.go index f9a9026a4a1..74c6285f060 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao_test.go +++ b/go/vt/vtorc/logic/topology_recovery_dao_test.go @@ -21,7 +21,6 @@ import ( "github.com/stretchr/testify/require" - "vitess.io/vitess/go/vt/external/golib/sqlutils" "vitess.io/vitess/go/vt/vtorc/db" "vitess.io/vitess/go/vt/vtorc/inst" ) @@ -66,36 +65,3 @@ func TestTopologyRecovery(t *testing.T) { require.EqualValues(t, topologyRecovery.ID, recoveries[0].ID) }) } - -// TestBlockedRecoveryInsertion tests that we are able to insert into the blocked_recovery table. -func TestBlockedRecoveryInsertion(t *testing.T) { - orcDb, err := db.OpenVTOrc() - require.NoError(t, err) - defer func() { - _, err = orcDb.Exec("delete from blocked_topology_recovery") - require.NoError(t, err) - }() - - analysisEntry := &inst.ReplicationAnalysis{ - AnalyzedInstanceAlias: "zone1-0000000100", - ClusterDetails: inst.ClusterInfo{ - Keyspace: "ks", - Shard: "0", - }, - Analysis: inst.DeadPrimaryAndSomeReplicas, - } - blockedRecovery := &TopologyRecovery{ - ID: 1, - } - err = RegisterBlockedRecoveries(analysisEntry, []*TopologyRecovery{blockedRecovery}) - require.NoError(t, err) - - totalBlockedRecoveries := 0 - err = db.QueryVTOrc("select count(*) as blocked_recoveries from blocked_topology_recovery", nil, func(rowMap sqlutils.RowMap) error { - totalBlockedRecoveries = rowMap.GetInt("blocked_recoveries") - return nil - }) - require.NoError(t, err) - // There should be 1 blocked recovery after insertion - require.Equal(t, 1, totalBlockedRecoveries) -} diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index 891b48b9748..b9e5795a31f 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -382,7 +382,6 @@ func ContinuousDiscovery() { case <-recoveryTick: go func() { if IsLeaderOrActive() { - go ExpireBlockedRecoveries() go inst.ExpireInstanceAnalysisChangelog() go func() { From d482482275db4fa420af58eb0d25e36cc26697b4 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 28 Mar 2024 20:47:04 +0530 Subject: [PATCH 04/13] cleanup some unused fields for topology_recovery table Signed-off-by: Manan Gupta --- go/cmd/vtorc/cli/cli.go | 2 +- go/vt/vtorc/db/generate_base.go | 10 ------- go/vt/vtorc/logic/topology_recovery.go | 6 ---- go/vt/vtorc/logic/topology_recovery_dao.go | 30 +------------------ .../vtorc/logic/topology_recovery_dao_test.go | 2 +- 5 files changed, 3 insertions(+), 47 deletions(-) diff --git a/go/cmd/vtorc/cli/cli.go b/go/cmd/vtorc/cli/cli.go index f521ae05e57..312a6d2e67a 100644 --- a/go/cmd/vtorc/cli/cli.go +++ b/go/cmd/vtorc/cli/cli.go @@ -85,7 +85,7 @@ func run(cmd *cobra.Command, args []string) { // addStatusParts adds UI parts to the /debug/status page of VTOrc func addStatusParts() { servenv.AddStatusPart("Recent Recoveries", logic.TopologyRecoveriesTemplate, func() any { - recoveries, _ := logic.ReadRecentRecoveries(false, 0) + recoveries, _ := logic.ReadRecentRecoveries(0) return recoveries }) } diff --git a/go/vt/vtorc/db/generate_base.go b/go/vt/vtorc/db/generate_base.go index 2e960f20eee..c0f565ce148 100644 --- a/go/vt/vtorc/db/generate_base.go +++ b/go/vt/vtorc/db/generate_base.go @@ -173,19 +173,12 @@ CREATE TABLE topology_recovery ( alias varchar(256) NOT NULL, start_recovery timestamp NOT NULL DEFAULT (''), end_recovery timestamp NULL DEFAULT NULL, - processing_node_hostname varchar(128) NOT NULL, - processcing_node_token varchar(128) NOT NULL, successor_alias varchar(256) DEFAULT NULL, analysis varchar(128) not null default '', keyspace varchar(128) NOT NULL, shard varchar(128) NOT NULL, - count_affected_replicas int not null default 0, is_successful TINYint NOT NULL DEFAULT 0, - acknowledged TINYint NOT NULL DEFAULT 0, - acknowledged_by varchar(128) not null default '', - acknowledge_comment text not null default '', all_errors text not null default '', - acknowledged_at TIMESTAMP NULL, last_detection_id bigint not null default 0, uid varchar(128) not null default '', PRIMARY KEY (recovery_id) @@ -381,9 +374,6 @@ CREATE INDEX keyspace_shard_idx_topology_recovery on topology_recovery (keyspace CREATE INDEX end_recovery_idx_topology_recovery on topology_recovery (end_recovery) `, ` -CREATE INDEX acknowledged_idx_topology_recovery on topology_recovery (acknowledged, acknowledged_at) - `, - ` CREATE INDEX instance_timestamp_idx_database_instance_analysis_changelog on database_instance_analysis_changelog (alias, analysis_timestamp) `, ` diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index 7874515a90a..07cef3cabfb 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -111,12 +111,6 @@ type TopologyRecovery struct { AllErrors []string RecoveryStartTimestamp string RecoveryEndTimestamp string - ProcessingNodeHostname string - ProcessingNodeToken string - Acknowledged bool - AcknowledgedAt string - AcknowledgedBy string - AcknowledgedComment string LastDetectionID int64 RelatedRecoveryID int64 Type RecoveryType diff --git a/go/vt/vtorc/logic/topology_recovery_dao.go b/go/vt/vtorc/logic/topology_recovery_dao.go index 8f9ae129896..78ff8dfbc58 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao.go +++ b/go/vt/vtorc/logic/topology_recovery_dao.go @@ -25,8 +25,6 @@ import ( "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/db" "vitess.io/vitess/go/vt/vtorc/inst" - "vitess.io/vitess/go/vt/vtorc/process" - "vitess.io/vitess/go/vt/vtorc/util" ) // InsertRecoveryDetection inserts the recovery analysis that has been detected. @@ -73,12 +71,9 @@ func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecover uid, alias, start_recovery, - processing_node_hostname, - processcing_node_token, analysis, keyspace, shard, - count_affected_replicas, last_detection_id ) values ( ?, @@ -88,20 +83,15 @@ func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecover ?, ?, ?, - ?, - ?, - ?, ? ) `, sqlutils.NilIfZero(topologyRecovery.ID), topologyRecovery.UID, analysisEntry.AnalyzedInstanceAlias, - process.ThisHostname, util.ProcessToken.Hash, string(analysisEntry.Analysis), analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard, - analysisEntry.CountReplicas, analysisEntry.AnalyzedInstanceAlias, analysisEntry.RecoveryId, ) @@ -180,18 +170,11 @@ func readRecoveries(whereCondition string, limit string, args []any) ([]*Topolog start_recovery, IFNULL(end_recovery, '') AS end_recovery, is_successful, - processing_node_hostname, - processcing_node_token, ifnull(successor_alias, '') as successor_alias, analysis, keyspace, shard, - count_affected_replicas, all_errors, - acknowledged, - acknowledged_at, - acknowledged_by, - acknowledge_comment, last_detection_id from topology_recovery @@ -208,14 +191,11 @@ func readRecoveries(whereCondition string, limit string, args []any) ([]*Topolog topologyRecovery.RecoveryStartTimestamp = m.GetString("start_recovery") topologyRecovery.RecoveryEndTimestamp = m.GetString("end_recovery") topologyRecovery.IsSuccessful = m.GetBool("is_successful") - topologyRecovery.ProcessingNodeHostname = m.GetString("processing_node_hostname") - topologyRecovery.ProcessingNodeToken = m.GetString("processcing_node_token") topologyRecovery.AnalysisEntry.AnalyzedInstanceAlias = m.GetString("alias") topologyRecovery.AnalysisEntry.Analysis = inst.AnalysisCode(m.GetString("analysis")) topologyRecovery.AnalysisEntry.ClusterDetails.Keyspace = m.GetString("keyspace") topologyRecovery.AnalysisEntry.ClusterDetails.Shard = m.GetString("shard") - topologyRecovery.AnalysisEntry.CountReplicas = m.GetUint("count_affected_replicas") topologyRecovery.SuccessorAlias = m.GetString("successor_alias") @@ -223,11 +203,6 @@ func readRecoveries(whereCondition string, limit string, args []any) ([]*Topolog topologyRecovery.AllErrors = strings.Split(m.GetString("all_errors"), "\n") - topologyRecovery.Acknowledged = m.GetBool("acknowledged") - topologyRecovery.AcknowledgedAt = m.GetString("acknowledged_at") - topologyRecovery.AcknowledgedBy = m.GetString("acknowledged_by") - topologyRecovery.AcknowledgedComment = m.GetString("acknowledge_comment") - topologyRecovery.LastDetectionID = m.GetInt64("last_detection_id") res = append(res, &topologyRecovery) @@ -251,13 +226,10 @@ func ReadActiveClusterRecoveries(keyspace string, shard string) ([]*TopologyReco } // ReadRecentRecoveries reads latest recovery entries from topology_recovery -func ReadRecentRecoveries(unacknowledgedOnly bool, page int) ([]*TopologyRecovery, error) { +func ReadRecentRecoveries(page int) ([]*TopologyRecovery, error) { whereConditions := []string{} whereClause := "" var args []any - if unacknowledgedOnly { - whereConditions = append(whereConditions, `acknowledged=0`) - } if len(whereConditions) > 0 { whereClause = fmt.Sprintf("where %s", strings.Join(whereConditions, " and ")) } diff --git a/go/vt/vtorc/logic/topology_recovery_dao_test.go b/go/vt/vtorc/logic/topology_recovery_dao_test.go index 74c6285f060..3505f6056ef 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao_test.go +++ b/go/vt/vtorc/logic/topology_recovery_dao_test.go @@ -58,7 +58,7 @@ func TestTopologyRecovery(t *testing.T) { }) t.Run("read recoveries", func(t *testing.T) { - recoveries, err := ReadRecentRecoveries(false, 0) + recoveries, err := ReadRecentRecoveries(0) require.NoError(t, err) require.Len(t, recoveries, 1) // Assert that the ID field matches the one that we just wrote From 75a670e0774b847900f1764638f4b00765e31fa7 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 28 Mar 2024 21:24:00 +0530 Subject: [PATCH 05/13] feat: remove uid from topology_recovery as its not needed Signed-off-by: Manan Gupta --- go/vt/vtorc/db/generate_base.go | 8 ++------ go/vt/vtorc/logic/topology_recovery.go | 18 ++++++++---------- go/vt/vtorc/logic/topology_recovery_dao.go | 13 ++++--------- 3 files changed, 14 insertions(+), 25 deletions(-) diff --git a/go/vt/vtorc/db/generate_base.go b/go/vt/vtorc/db/generate_base.go index c0f565ce148..e063634a6c2 100644 --- a/go/vt/vtorc/db/generate_base.go +++ b/go/vt/vtorc/db/generate_base.go @@ -180,7 +180,6 @@ CREATE TABLE topology_recovery ( is_successful TINYint NOT NULL DEFAULT 0, all_errors text not null default '', last_detection_id bigint not null default 0, - uid varchar(128) not null default '', PRIMARY KEY (recovery_id) )`, ` @@ -302,7 +301,7 @@ DROP TABLE IF EXISTS topology_recovery_steps ` CREATE TABLE topology_recovery_steps ( recovery_step_id integer, - recovery_uid varchar(128) NOT NULL, + recovery_id integer NOT NULL, audit_at timestamp not null default (''), message text NOT NULL, PRIMARY KEY (recovery_step_id) @@ -383,9 +382,6 @@ CREATE INDEX last_detection_idx_topology_recovery on topology_recovery (last_det CREATE INDEX last_seen_active_idx_node_health on node_health (last_seen_active) `, ` -CREATE INDEX uid_idx_topology_recovery ON topology_recovery(uid) - `, - ` -CREATE INDEX recovery_uid_idx_topology_recovery_steps ON topology_recovery_steps(recovery_uid) +CREATE INDEX recovery_id_idx_topology_recovery_steps ON topology_recovery_steps(recovery_id) `, } diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index 07cef3cabfb..8322e0c7db0 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -102,7 +102,6 @@ const ( // TopologyRecovery represents an entry in the topology_recovery table type TopologyRecovery struct { ID int64 - UID string AnalysisEntry inst.ReplicationAnalysis SuccessorHostname string SuccessorPort int @@ -118,7 +117,6 @@ type TopologyRecovery struct { func NewTopologyRecovery(replicationAnalysis inst.ReplicationAnalysis) *TopologyRecovery { topologyRecovery := &TopologyRecovery{} - topologyRecovery.UID = util.PrettyUniqueToken() topologyRecovery.AnalysisEntry = replicationAnalysis topologyRecovery.AllErrors = []string{} return topologyRecovery @@ -138,16 +136,16 @@ func (topologyRecovery *TopologyRecovery) AddErrors(errs []error) { } type TopologyRecoveryStep struct { - ID int64 - RecoveryUID string - AuditAt string - Message string + ID int64 + RecoveryID int64 + AuditAt string + Message string } -func NewTopologyRecoveryStep(uid string, message string) *TopologyRecoveryStep { +func NewTopologyRecoveryStep(id int64, message string) *TopologyRecoveryStep { return &TopologyRecoveryStep{ - RecoveryUID: uid, - Message: message, + RecoveryID: id, + Message: message, } } @@ -166,7 +164,7 @@ func AuditTopologyRecovery(topologyRecovery *TopologyRecovery, message string) e return nil } - recoveryStep := NewTopologyRecoveryStep(topologyRecovery.UID, message) + recoveryStep := NewTopologyRecoveryStep(topologyRecovery.ID, message) return writeTopologyRecoveryStep(recoveryStep) } diff --git a/go/vt/vtorc/logic/topology_recovery_dao.go b/go/vt/vtorc/logic/topology_recovery_dao.go index 78ff8dfbc58..8ca4e8f3ccc 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao.go +++ b/go/vt/vtorc/logic/topology_recovery_dao.go @@ -68,7 +68,6 @@ func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecover insert ignore into topology_recovery ( recovery_id, - uid, alias, start_recovery, analysis, @@ -76,7 +75,6 @@ func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecover shard, last_detection_id ) values ( - ?, ?, ?, NOW(), @@ -87,7 +85,6 @@ func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecover ) `, sqlutils.NilIfZero(topologyRecovery.ID), - topologyRecovery.UID, analysisEntry.AnalyzedInstanceAlias, string(analysisEntry.Analysis), analysisEntry.ClusterDetails.Keyspace, @@ -147,11 +144,11 @@ func writeResolveRecovery(topologyRecovery *TopologyRecovery) error { all_errors = ?, end_recovery = NOW() where - uid = ? + recovery_id = ? `, topologyRecovery.IsSuccessful, topologyRecovery.SuccessorAlias, strings.Join(topologyRecovery.AllErrors, "\n"), - topologyRecovery.UID, + topologyRecovery.ID, ) if err != nil { log.Error(err) @@ -165,7 +162,6 @@ func readRecoveries(whereCondition string, limit string, args []any) ([]*Topolog query := fmt.Sprintf(` select recovery_id, - uid, alias, start_recovery, IFNULL(end_recovery, '') AS end_recovery, @@ -186,7 +182,6 @@ func readRecoveries(whereCondition string, limit string, args []any) ([]*Topolog err := db.QueryVTOrc(query, args, func(m sqlutils.RowMap) error { topologyRecovery := *NewTopologyRecovery(inst.ReplicationAnalysis{}) topologyRecovery.ID = m.GetInt64("recovery_id") - topologyRecovery.UID = m.GetString("uid") topologyRecovery.RecoveryStartTimestamp = m.GetString("start_recovery") topologyRecovery.RecoveryEndTimestamp = m.GetString("end_recovery") @@ -245,9 +240,9 @@ func writeTopologyRecoveryStep(topologyRecoveryStep *TopologyRecoveryStep) error sqlResult, err := db.ExecVTOrc(` insert ignore into topology_recovery_steps ( - recovery_step_id, recovery_uid, audit_at, message + recovery_step_id, recovery_id, audit_at, message ) values (?, ?, now(), ?) - `, sqlutils.NilIfZero(topologyRecoveryStep.ID), topologyRecoveryStep.RecoveryUID, topologyRecoveryStep.Message, + `, sqlutils.NilIfZero(topologyRecoveryStep.ID), topologyRecoveryStep.RecoveryID, topologyRecoveryStep.Message, ) if err != nil { log.Error(err) From 9e8a9d89c75775b3f92a07d665cc213d4723f4fc Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 28 Mar 2024 21:40:43 +0530 Subject: [PATCH 06/13] refactor: rename detection id Signed-off-by: Manan Gupta --- go/vt/vtorc/db/generate_base.go | 4 ++-- go/vt/vtorc/logic/topology_recovery.go | 2 +- go/vt/vtorc/logic/topology_recovery_dao.go | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go/vt/vtorc/db/generate_base.go b/go/vt/vtorc/db/generate_base.go index e063634a6c2..fbb96ef75c0 100644 --- a/go/vt/vtorc/db/generate_base.go +++ b/go/vt/vtorc/db/generate_base.go @@ -179,7 +179,7 @@ CREATE TABLE topology_recovery ( shard varchar(128) NOT NULL, is_successful TINYint NOT NULL DEFAULT 0, all_errors text not null default '', - last_detection_id bigint not null default 0, + detection_id bigint not null default 0, PRIMARY KEY (recovery_id) )`, ` @@ -376,7 +376,7 @@ CREATE INDEX end_recovery_idx_topology_recovery on topology_recovery (end_recove CREATE INDEX instance_timestamp_idx_database_instance_analysis_changelog on database_instance_analysis_changelog (alias, analysis_timestamp) `, ` -CREATE INDEX last_detection_idx_topology_recovery on topology_recovery (last_detection_id) +CREATE INDEX detection_idx_topology_recovery on topology_recovery (detection_id) `, ` CREATE INDEX last_seen_active_idx_node_health on node_health (last_seen_active) diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index 8322e0c7db0..13113453c6e 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -110,7 +110,7 @@ type TopologyRecovery struct { AllErrors []string RecoveryStartTimestamp string RecoveryEndTimestamp string - LastDetectionID int64 + DetectionID int64 RelatedRecoveryID int64 Type RecoveryType } diff --git a/go/vt/vtorc/logic/topology_recovery_dao.go b/go/vt/vtorc/logic/topology_recovery_dao.go index 8ca4e8f3ccc..dd5f8a96430 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao.go +++ b/go/vt/vtorc/logic/topology_recovery_dao.go @@ -73,7 +73,7 @@ func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecover analysis, keyspace, shard, - last_detection_id + detection_id ) values ( ?, ?, @@ -171,7 +171,7 @@ func readRecoveries(whereCondition string, limit string, args []any) ([]*Topolog keyspace, shard, all_errors, - last_detection_id + detection_id from topology_recovery %s @@ -198,7 +198,7 @@ func readRecoveries(whereCondition string, limit string, args []any) ([]*Topolog topologyRecovery.AllErrors = strings.Split(m.GetString("all_errors"), "\n") - topologyRecovery.LastDetectionID = m.GetInt64("last_detection_id") + topologyRecovery.DetectionID = m.GetInt64("detection_id") res = append(res, &topologyRecovery) return nil From 707f7d5c9f6405c4bd872ac9105e842144e8c07b Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 29 Mar 2024 13:05:04 +0530 Subject: [PATCH 07/13] test: fix recovery registration test Signed-off-by: Manan Gupta --- go/vt/vtorc/logic/topology_recovery_test.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/go/vt/vtorc/logic/topology_recovery_test.go b/go/vt/vtorc/logic/topology_recovery_test.go index e95c51e2a17..f7658060b95 100644 --- a/go/vt/vtorc/logic/topology_recovery_test.go +++ b/go/vt/vtorc/logic/topology_recovery_test.go @@ -131,7 +131,7 @@ func TestElectNewPrimaryPanic(t *testing.T) { require.Error(t, err) } -func TestDifferentAnalysescHaveDifferentCooldowns(t *testing.T) { +func TestRecoveryRegistration(t *testing.T) { orcDb, err := db.OpenVTOrc() require.NoError(t, err) oldTs := ts @@ -181,13 +181,20 @@ func TestDifferentAnalysescHaveDifferentCooldowns(t *testing.T) { defer cancel() ts = memorytopo.NewServer(ctx, "zone1") - _, err = AttemptRecoveryRegistration(&replicaAnalysisEntry) - require.Nil(t, err) + tp, err := AttemptRecoveryRegistration(&replicaAnalysisEntry) + require.NoError(t, err) + + // because there is another recovery in progress for this shard, this will fail. + _, err = AttemptRecoveryRegistration(&primaryAnalysisEntry) + require.ErrorContains(t, err, "Active recovery") - // even though this is another recovery on the same cluster, allow it to go through - // because the analysis is different (ReplicationStopped vs DeadPrimary) + // Lets say the recovery finishes after some time. + err = resolveRecovery(tp, nil) + require.NoError(t, err) + + // now this recovery registration should be successful. _, err = AttemptRecoveryRegistration(&primaryAnalysisEntry) - require.Nil(t, err) + require.NoError(t, err) } func TestGetCheckAndRecoverFunctionCode(t *testing.T) { From 2df210bfa4a9c5becde3a09a6c1e424e18721fbb Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 29 Mar 2024 13:09:47 +0530 Subject: [PATCH 08/13] feat: deprecate recovery period block duration Signed-off-by: Manan Gupta --- go/cmd/vtorc/cli/cli.go | 1 - go/flags/endtoend/vtorc.txt | 2 -- go/vt/vtorc/config/config.go | 1 + 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/go/cmd/vtorc/cli/cli.go b/go/cmd/vtorc/cli/cli.go index 312a6d2e67a..1233c1e2ac2 100644 --- a/go/cmd/vtorc/cli/cli.go +++ b/go/cmd/vtorc/cli/cli.go @@ -39,7 +39,6 @@ var ( --topo_global_root /vitess/global \ --log_dir $VTDATAROOT/tmp \ --port 15000 \ - --recovery-period-block-duration "10m" \ --instance-poll-time "1s" \ --topo-information-refresh-duration "30s" \ --alsologtostderr`, diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index 1e14056460e..187426a4afa 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -10,7 +10,6 @@ vtorc \ --topo_global_root /vitess/global \ --log_dir $VTDATAROOT/tmp \ --port 15000 \ - --recovery-period-block-duration "10m" \ --instance-poll-time "1s" \ --topo-information-refresh-duration "30s" \ --alsologtostderr @@ -65,7 +64,6 @@ Flags: --prevent-cross-cell-failover Prevent VTOrc from promoting a primary in a different cell than the current primary in case of a failover --purge_logs_interval duration how often try to remove old logs (default 1h0m0s) --reasonable-replication-lag duration Maximum replication lag on replicas which is deemed to be acceptable (default 10s) - --recovery-period-block-duration duration Duration for which a new recovery is blocked on an instance after running a recovery (default 30s) --recovery-poll-duration duration Timer duration on which VTOrc polls its database to run a recovery (default 1s) --remote_operation_timeout duration time to wait for a remote operation (default 15s) --security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only) diff --git a/go/vt/vtorc/config/config.go b/go/vt/vtorc/config/config.go index 5ef80164cea..402c67870ba 100644 --- a/go/vt/vtorc/config/config.go +++ b/go/vt/vtorc/config/config.go @@ -76,6 +76,7 @@ func RegisterFlags(fs *pflag.FlagSet) { fs.BoolVar(&auditToSyslog, "audit-to-syslog", auditToSyslog, "Whether to store the audit log in the syslog") fs.DurationVar(&auditPurgeDuration, "audit-purge-duration", auditPurgeDuration, "Duration for which audit logs are held before being purged. Should be in multiples of days") fs.DurationVar(&recoveryPeriodBlockDuration, "recovery-period-block-duration", recoveryPeriodBlockDuration, "Duration for which a new recovery is blocked on an instance after running a recovery") + fs.MarkDeprecated("recovery-period-block-duration", "As of v20 this is ignored and will be removed in a future release.") fs.BoolVar(&preventCrossCellFailover, "prevent-cross-cell-failover", preventCrossCellFailover, "Prevent VTOrc from promoting a primary in a different cell than the current primary in case of a failover") fs.DurationVar(&waitReplicasTimeout, "wait-replicas-timeout", waitReplicasTimeout, "Duration for which to wait for replica's to respond when issuing RPCs") fs.DurationVar(&tolerableReplicationLag, "tolerable-replication-lag", tolerableReplicationLag, "Amount of replication lag that is considered acceptable for a tablet to be eligible for promotion when Vitess makes the choice of a new primary in PRS") From c631693c19c372e63413e520f338360d3c767268 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 29 Mar 2024 13:13:12 +0530 Subject: [PATCH 09/13] docs: add summary changes for the deprecated flag Signed-off-by: Manan Gupta --- changelog/20.0/20.0.0/summary.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/changelog/20.0/20.0.0/summary.md b/changelog/20.0/20.0.0/summary.md index 3845f8aced5..8c09723fbc3 100644 --- a/changelog/20.0/20.0.0/summary.md +++ b/changelog/20.0/20.0.0/summary.md @@ -6,6 +6,7 @@ - **[Breaking changes](#breaking-changes)** - [`shutdown_grace_period` Default Change](#shutdown-grace-period-default) - [New `unmanaged` Flag and `disable_active_reparents` deprecation](#unmanaged-flag) + - [`recovery-period-block-duration` Flag deprecation](#recovery-block-deprecation) - [`mysqlctld` `onterm-timeout` Default Change](#mysqlctld-onterm-timeout) - [`Durabler` interface method renaming](#durabler-interface-method-renaming) - **[Query Compatibility](#query-compatibility)** @@ -40,6 +41,13 @@ New flag `--unmanaged` has been introduced in this release to make it easier to Starting this release, all unmanaged tablets should specify this flag. + +#### `recovery-period-block-duration` Flag deprecation + +The flag `--recovery-period-block-duration` has been deprecated in VTOrc from this release. Its value is ignored and will be removed in later releases. +VTOrc no longer blocks recoveries for a certain duration after a previous recovery has completed. Since VTOrc refreshes the required information after +acquiring a shard lock, blocking of recoveries is not required. + #### `mysqlctld` `onterm_timeout` Default Change The `--onterm_timeout` flag default value has changed for `mysqlctld`. It now is by default long enough to be able to wait for the default `--shutdown-wait-time` when shutting down on a `TERM` signal. From dacc719468bca081ec197870ecab6dcf2ee90752 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 1 Apr 2024 15:01:21 +0530 Subject: [PATCH 10/13] refactor: remove unused fields Signed-off-by: Manan Gupta --- go/vt/vtorc/logic/topology_recovery.go | 6 ------ go/vt/vtorc/util/token.go | 6 ------ 2 files changed, 12 deletions(-) diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index 13113453c6e..4c1e4264b5d 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -35,8 +35,6 @@ import ( "vitess.io/vitess/go/vt/vtorc/util" ) -type RecoveryType string - const ( CheckAndRecoverGenericProblemRecoveryName string = "CheckAndRecoverGenericProblem" RecoverDeadPrimaryRecoveryName string = "RecoverDeadPrimary" @@ -103,16 +101,12 @@ const ( type TopologyRecovery struct { ID int64 AnalysisEntry inst.ReplicationAnalysis - SuccessorHostname string - SuccessorPort int SuccessorAlias string IsSuccessful bool AllErrors []string RecoveryStartTimestamp string RecoveryEndTimestamp string DetectionID int64 - RelatedRecoveryID int64 - Type RecoveryType } func NewTopologyRecovery(replicationAnalysis inst.ReplicationAnalysis) *TopologyRecovery { diff --git a/go/vt/vtorc/util/token.go b/go/vt/vtorc/util/token.go index 940f7a44698..b3e61594c29 100644 --- a/go/vt/vtorc/util/token.go +++ b/go/vt/vtorc/util/token.go @@ -20,8 +20,6 @@ import ( "crypto/rand" "crypto/sha256" "encoding/hex" - "fmt" - "time" ) func toHash(input []byte) string { @@ -53,7 +51,3 @@ func NewToken() *Token { Hash: RandomHash(), } } - -func PrettyUniqueToken() string { - return fmt.Sprintf("%d:%s", time.Now().UnixNano(), NewToken().Hash) -} From 1b41c260f7e8db77c3ca80ec8b2e506fd44393e0 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 1 Apr 2024 15:52:19 +0530 Subject: [PATCH 11/13] test: add tests for Expire table functions Signed-off-by: Manan Gupta --- go/vt/vtorc/inst/instance_dao_test.go | 57 +++++++++++++++ .../vtorc/logic/topology_recovery_dao_test.go | 70 +++++++++++++++++++ 2 files changed, 127 insertions(+) diff --git a/go/vt/vtorc/inst/instance_dao_test.go b/go/vt/vtorc/inst/instance_dao_test.go index c6020ec52d8..6acc22134de 100644 --- a/go/vt/vtorc/inst/instance_dao_test.go +++ b/go/vt/vtorc/inst/instance_dao_test.go @@ -762,3 +762,60 @@ func TestGetDatabaseState(t *testing.T) { require.NoError(t, err) require.Contains(t, ds, `"alias": "zone1-0000000112"`) } + +func TestExpireTableData(t *testing.T) { + oldVal := config.Config.AuditPurgeDays + config.Config.AuditPurgeDays = 10 + defer func() { + config.Config.AuditPurgeDays = oldVal + }() + + tests := []struct { + name string + tableName string + insertQuery string + timestampColumn string + expectedRowCount int + }{ + { + name: "ExpireAudit", + tableName: "audit", + timestampColumn: "audit_timestamp", + expectedRowCount: 1, + insertQuery: `insert into audit (audit_id, audit_timestamp, audit_type, alias, message, keyspace, shard) values +(1, NOW() - INTERVAL 50 DAY, 'a','a','a','a','a'), +(2, NOW() - INTERVAL 5 DAY, 'a','a','a','a','a')`, + }, + { + name: "ExpireRecoveryDetectionHistory", + tableName: "recovery_detection", + timestampColumn: "detection_timestamp", + expectedRowCount: 2, + insertQuery: `insert into recovery_detection (detection_id, detection_timestamp, alias, analysis, keyspace, shard) values +(1, NOW() - INTERVAL 3 DAY,'a','a','a','a'), +(2, NOW() - INTERVAL 5 DAY,'a','a','a','a'), +(3, NOW() - INTERVAL 15 DAY,'a','a','a','a')`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. + defer func() { + db.ClearVTOrcDatabase() + }() + _, err := db.ExecVTOrc(tt.insertQuery) + require.NoError(t, err) + + err = ExpireTableData(tt.tableName, tt.timestampColumn) + require.NoError(t, err) + + rowsCount := 0 + err = db.QueryVTOrc(`select * from `+tt.tableName, nil, func(rowMap sqlutils.RowMap) error { + rowsCount++ + return nil + }) + require.NoError(t, err) + require.EqualValues(t, tt.expectedRowCount, rowsCount) + }) + } +} diff --git a/go/vt/vtorc/logic/topology_recovery_dao_test.go b/go/vt/vtorc/logic/topology_recovery_dao_test.go index 3505f6056ef..4d74475112d 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao_test.go +++ b/go/vt/vtorc/logic/topology_recovery_dao_test.go @@ -21,6 +21,8 @@ import ( "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/external/golib/sqlutils" + "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/db" "vitess.io/vitess/go/vt/vtorc/inst" ) @@ -65,3 +67,71 @@ func TestTopologyRecovery(t *testing.T) { require.EqualValues(t, topologyRecovery.ID, recoveries[0].ID) }) } + +func TestExpireTableData(t *testing.T) { + oldVal := config.Config.AuditPurgeDays + config.Config.AuditPurgeDays = 10 + defer func() { + config.Config.AuditPurgeDays = oldVal + }() + + tests := []struct { + name string + tableName string + insertQuery string + expectedRowCount int + expireFunc func() error + }{ + { + name: "ExpireRecoveryDetectionHistory", + tableName: "recovery_detection", + expectedRowCount: 2, + insertQuery: `insert into recovery_detection (detection_id, detection_timestamp, alias, analysis, keyspace, shard) values +(1, NOW() - INTERVAL 3 DAY,'a','a','a','a'), +(2, NOW() - INTERVAL 5 DAY,'a','a','a','a'), +(3, NOW() - INTERVAL 15 DAY,'a','a','a','a')`, + expireFunc: ExpireRecoveryDetectionHistory, + }, + { + name: "ExpireTopologyRecoveryHistory", + tableName: "topology_recovery", + expectedRowCount: 1, + insertQuery: `insert into topology_recovery (recovery_id, start_recovery, alias, analysis, keyspace, shard) values +(1, NOW() - INTERVAL 13 DAY,'a','a','a','a'), +(2, NOW() - INTERVAL 5 DAY,'a','a','a','a'), +(3, NOW() - INTERVAL 15 DAY,'a','a','a','a')`, + expireFunc: ExpireTopologyRecoveryHistory, + }, + { + name: "ExpireTopologyRecoveryStepsHistory", + tableName: "topology_recovery_steps", + expectedRowCount: 1, + insertQuery: `insert into topology_recovery_steps (recovery_step_id, audit_at, recovery_id, message) values +(1, NOW() - INTERVAL 13 DAY, 1, 'a'), +(2, NOW() - INTERVAL 5 DAY, 2, 'a'), +(3, NOW() - INTERVAL 15 DAY, 3, 'a')`, + expireFunc: ExpireTopologyRecoveryStepsHistory, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. + defer func() { + db.ClearVTOrcDatabase() + }() + _, err := db.ExecVTOrc(tt.insertQuery) + require.NoError(t, err) + + err = tt.expireFunc() + require.NoError(t, err) + + rowsCount := 0 + err = db.QueryVTOrc(`select * from `+tt.tableName, nil, func(rowMap sqlutils.RowMap) error { + rowsCount++ + return nil + }) + require.NoError(t, err) + require.EqualValues(t, tt.expectedRowCount, rowsCount) + }) + } +} From d9b7410c02cffe370c290ad31e806998ab750047 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 1 Apr 2024 16:11:30 +0530 Subject: [PATCH 12/13] test: add tests for insertion into the new table Signed-off-by: Manan Gupta --- .../vtorc/logic/topology_recovery_dao_test.go | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/go/vt/vtorc/logic/topology_recovery_dao_test.go b/go/vt/vtorc/logic/topology_recovery_dao_test.go index 4d74475112d..354af82e2b3 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao_test.go +++ b/go/vt/vtorc/logic/topology_recovery_dao_test.go @@ -17,6 +17,7 @@ limitations under the License. package logic import ( + "strconv" "testing" "github.com/stretchr/testify/require" @@ -135,3 +136,35 @@ func TestExpireTableData(t *testing.T) { }) } } + +func TestInsertRecoveryDetection(t *testing.T) { + // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. + defer func() { + db.ClearVTOrcDatabase() + }() + ra := &inst.ReplicationAnalysis{ + AnalyzedInstanceAlias: "alias-1", + Analysis: inst.ClusterHasNoPrimary, + ClusterDetails: inst.ClusterInfo{ + Keyspace: keyspace, + Shard: shard, + }, + } + err := InsertRecoveryDetection(ra) + require.NoError(t, err) + require.NotEqual(t, 0, ra.RecoveryId) + + var rows []map[string]sqlutils.CellData + err = db.QueryVTOrc("select * from recovery_detection", nil, func(rowMap sqlutils.RowMap) error { + rows = append(rows, rowMap) + return nil + }) + require.NoError(t, err) + require.Len(t, rows, 1) + require.EqualValues(t, ra.AnalyzedInstanceAlias, rows[0]["alias"].String) + require.EqualValues(t, ra.Analysis, rows[0]["analysis"].String) + require.EqualValues(t, keyspace, rows[0]["keyspace"].String) + require.EqualValues(t, shard, rows[0]["shard"].String) + require.EqualValues(t, strconv.Itoa(int(ra.RecoveryId)), rows[0]["detection_id"].String) + require.NotEqual(t, "", rows[0]["detection_timestamp"].String) +} From 72a61377a0386ff42c7b0129a3e65bc2fc67dee2 Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Tue, 2 Apr 2024 10:28:37 +0530 Subject: [PATCH 13/13] docs: fix summary changes Co-authored-by: Deepthi Sigireddi Signed-off-by: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> --- changelog/20.0/20.0.0/summary.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog/20.0/20.0.0/summary.md b/changelog/20.0/20.0.0/summary.md index 8c09723fbc3..4665069e1fe 100644 --- a/changelog/20.0/20.0.0/summary.md +++ b/changelog/20.0/20.0.0/summary.md @@ -44,7 +44,7 @@ Starting this release, all unmanaged tablets should specify this flag. #### `recovery-period-block-duration` Flag deprecation -The flag `--recovery-period-block-duration` has been deprecated in VTOrc from this release. Its value is ignored and will be removed in later releases. +The flag `--recovery-period-block-duration` has been deprecated in VTOrc from this release. Its value is now ignored and the flag will be removed in later releases. VTOrc no longer blocks recoveries for a certain duration after a previous recovery has completed. Since VTOrc refreshes the required information after acquiring a shard lock, blocking of recoveries is not required.