diff --git a/changelog/22.0/22.0.0/summary.md b/changelog/22.0/22.0.0/summary.md
index e63ffcc3547..958ad8ce5a3 100644
--- a/changelog/22.0/22.0.0/summary.md
+++ b/changelog/22.0/22.0.0/summary.md
@@ -12,6 +12,7 @@
- **[Support for More Efficient JSON Replication](#efficient-json-replication)**
- **[Support for LAST_INSERT_ID(x)](#last-insert-id)**
- **[Support for Maximum Idle Connections in the Pool](#max-idle-connections)**
+ - **[Stalled Disk Recovery in VTOrc](#stall-disk-recovery)**
- **[Minor Changes](#minor-changes)**
- **[VTTablet Flags](#flags-vttablet)**
- **[Topology read concurrency behaviour changes](#topo-read-concurrency-changes)**
@@ -100,6 +101,11 @@ You can control idle connection retention for the query server’s query pool, s
This feature ensures that, during traffic spikes, idle connections are available for faster responses, while minimizing overhead in low-traffic periods by limiting the number of idle connections retained. It helps strike a balance between performance, efficiency, and cost.
+### Stalled Disk Recovery in VTOrc
+VTOrc has been augmented to be able to identify and recover from stalled disk errors. This is done by polling that the disk is writable by the vttablets and they send this information in the full status output to VTOrc. If the disk is not writable on the primary tablet, VTOrc will attempt to recover the cluster by reparenting to a different primary. This is useful in scenarios where the disk is stalled and the primary vttablet is unable to accept writes because of it.
+
+To opt into this feature, `--enable-stalled-disk-primary-recovery` flag has to be specified on VTOrc, and `--stalled-disk-write-dir` flag has to be specified on the vttablets. `--stalled-disk-write-interval` and `--stalled-disk-write-timeout` flags can be used to configure the polling interval and timeout respectively.
+
## Minor Changes
#### VTTablet Flags
diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt
index 052c19ecaae..a6140263ce6 100644
--- a/go/flags/endtoend/vtcombo.txt
+++ b/go/flags/endtoend/vtcombo.txt
@@ -328,6 +328,9 @@ Flags:
--srv_topo_cache_refresh duration how frequently to refresh the topology for cached entries (default 1s)
--srv_topo_cache_ttl duration how long to use cached entries for topology (default 1s)
--srv_topo_timeout duration topo server timeout (default 5s)
+ --stalled-disk-write-dir string if provided, tablet will attempt to write a file to this directory to check if the disk is stalled
+ --stalled-disk-write-interval duration how often to write to the disk to check whether it is stalled (default 5s)
+ --stalled-disk-write-timeout duration if writes exceed this duration, the disk is considered stalled (default 30s)
--start_mysql Should vtcombo also start mysql
--stats_backend string The name of the registered push-based monitoring/stats backend to use
--stats_combine_dimensions string List of dimensions to be combined into a single "all" value in exported stats vars
diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt
index c2799a72dc1..b4ec79996f4 100644
--- a/go/flags/endtoend/vtorc.txt
+++ b/go/flags/endtoend/vtorc.txt
@@ -33,6 +33,7 @@ Flags:
--config-type string Config file type (omit to infer config type from file extension).
--consul_auth_static_file string JSON File to read the topos/tokens from.
--emit_stats If set, emit stats to push-based monitoring and stats backends
+ --enable-stalled-disk-primary-recovery Whether VTOrc should be analyzing and recovering stalled disk primary failures
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.
--grpc_compression string Which protocol to use for compressing gRPC. Default: nothing. Supported: snappy
diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt
index e2b0c30db7f..44e242c5ae5 100644
--- a/go/flags/endtoend/vttablet.txt
+++ b/go/flags/endtoend/vttablet.txt
@@ -328,6 +328,9 @@ Flags:
--srv_topo_cache_refresh duration how frequently to refresh the topology for cached entries (default 1s)
--srv_topo_cache_ttl duration how long to use cached entries for topology (default 1s)
--srv_topo_timeout duration topo server timeout (default 5s)
+ --stalled-disk-write-dir string if provided, tablet will attempt to write a file to this directory to check if the disk is stalled
+ --stalled-disk-write-interval duration how often to write to the disk to check whether it is stalled (default 5s)
+ --stalled-disk-write-timeout duration if writes exceed this duration, the disk is considered stalled (default 30s)
--stats_backend string The name of the registered push-based monitoring/stats backend to use
--stats_combine_dimensions string List of dimensions to be combined into a single "all" value in exported stats vars
--stats_common_tags strings Comma-separated list of common tags for the stats backend. It provides both label and values. Example: label1:value1,label2:value2
diff --git a/go/vt/vtorc/config/config.go b/go/vt/vtorc/config/config.go
index cafff5acce8..6a4b40503e1 100644
--- a/go/vt/vtorc/config/config.go
+++ b/go/vt/vtorc/config/config.go
@@ -174,6 +174,15 @@ var (
Dynamic: true,
},
)
+
+ enableStalledDiskPrimaryRecovery = viperutil.Configure(
+ "enable-stalled-disk-primary-recovery",
+ viperutil.Options[bool]{
+ FlagName: "enable-stalled-disk-primary-recovery",
+ Default: false,
+ Dynamic: true,
+ },
+ )
)
func init() {
@@ -197,6 +206,7 @@ func registerFlags(fs *pflag.FlagSet) {
fs.Duration("recovery-poll-duration", recoveryPollDuration.Default(), "Timer duration on which VTOrc polls its database to run a recovery")
fs.Bool("allow-emergency-reparent", ersEnabled.Default(), "Whether VTOrc should be allowed to run emergency reparent operation when it detects a dead primary")
fs.Bool("change-tablets-with-errant-gtid-to-drained", convertTabletsWithErrantGTIDs.Default(), "Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED")
+ fs.Bool("enable-stalled-disk-primary-recovery", enableStalledDiskPrimaryRecovery.Default(), "Whether VTOrc should be analyzing and recovering stalled disk primary failures")
viperutil.BindFlags(fs,
instancePollTime,
@@ -214,6 +224,7 @@ func registerFlags(fs *pflag.FlagSet) {
recoveryPollDuration,
ersEnabled,
convertTabletsWithErrantGTIDs,
+ enableStalledDiskPrimaryRecovery,
)
}
@@ -332,6 +343,11 @@ func SetConvertTabletWithErrantGTIDs(val bool) {
convertTabletsWithErrantGTIDs.Set(val)
}
+// GetStalledDiskPrimaryRecovery reports whether VTOrc is allowed to check for and recovery stalled disk problems.
+func GetStalledDiskPrimaryRecovery() bool {
+ return enableStalledDiskPrimaryRecovery.Get()
+}
+
// MarkConfigurationLoaded is called once configuration has first been loaded.
// Listeners on ConfigurationLoaded will get a notification
func MarkConfigurationLoaded() {
diff --git a/go/vt/vtorc/db/generate_base.go b/go/vt/vtorc/db/generate_base.go
index 21375fb8eb3..f4a62577ee8 100644
--- a/go/vt/vtorc/db/generate_base.go
+++ b/go/vt/vtorc/db/generate_base.go
@@ -105,6 +105,7 @@ CREATE TABLE database_instance (
semi_sync_primary_status TINYint NOT NULL DEFAULT 0,
semi_sync_replica_status TINYint NOT NULL DEFAULT 0,
semi_sync_primary_clients int NOT NULL DEFAULT 0,
+ stalled_disk TINYint NOT NULL DEFAULT 0,
PRIMARY KEY (alias)
)`,
`
diff --git a/go/vt/vtorc/inst/analysis.go b/go/vt/vtorc/inst/analysis.go
index fa2e1a4ec95..06435b4a6d7 100644
--- a/go/vt/vtorc/inst/analysis.go
+++ b/go/vt/vtorc/inst/analysis.go
@@ -56,6 +56,7 @@ const (
LockedSemiSyncPrimaryHypothesis AnalysisCode = "LockedSemiSyncPrimaryHypothesis"
LockedSemiSyncPrimary AnalysisCode = "LockedSemiSyncPrimary"
ErrantGTIDDetected AnalysisCode = "ErrantGTIDDetected"
+ StalledDiskPrimary AnalysisCode = "StalledDiskPrimary"
)
type StructureAnalysisCode string
@@ -129,6 +130,7 @@ type ReplicationAnalysis struct {
MaxReplicaGTIDMode string
MaxReplicaGTIDErrant string
IsReadOnly bool
+ IsStalledDisk bool
}
func (replicationAnalysis *ReplicationAnalysis) MarshalJSON() ([]byte, error) {
diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go
index fc91c28b021..a1a92baa20e 100644
--- a/go/vt/vtorc/inst/analysis_dao.go
+++ b/go/vt/vtorc/inst/analysis_dao.go
@@ -79,7 +79,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
vitess_keyspace.durability_policy AS durability_policy,
vitess_shard.primary_timestamp AS shard_primary_term_timestamp,
primary_instance.read_only AS read_only,
- MIN(primary_instance.gtid_errant) AS gtid_errant,
+ MIN(primary_instance.gtid_errant) AS gtid_errant,
MIN(primary_instance.alias) IS NULL AS is_invalid,
MIN(primary_instance.binary_log_file) AS binary_log_file,
MIN(primary_instance.binary_log_pos) AS binary_log_pos,
@@ -233,7 +233,8 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
COUNT(
DISTINCT case when replica_instance.log_bin
AND replica_instance.log_replica_updates then replica_instance.major_version else NULL end
- ) AS count_distinct_logging_major_versions
+ ) AS count_distinct_logging_major_versions,
+ primary_instance.stalled_disk != 0 AS is_stalled_disk
FROM
vitess_tablet
JOIN vitess_keyspace ON (
@@ -354,6 +355,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
a.HeartbeatInterval = m.GetFloat64("heartbeat_interval")
a.IsReadOnly = m.GetUint("read_only") == 1
+ a.IsStalledDisk = m.GetBool("is_stalled_disk")
if !a.LastCheckValid {
analysisMessage := fmt.Sprintf("analysis: Alias: %+v, Keyspace: %+v, Shard: %+v, IsPrimary: %+v, LastCheckValid: %+v, LastCheckPartialSuccess: %+v, CountReplicas: %+v, CountValidReplicas: %+v, CountValidReplicatingReplicas: %+v, CountLaggingReplicas: %+v, CountDelayedReplicas: %+v",
@@ -401,6 +403,11 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
} else if isInvalid {
a.Analysis = InvalidReplica
a.Description = "VTOrc hasn't been able to reach the replica even once since restart/shutdown"
+ } else if a.IsClusterPrimary && !a.LastCheckValid && a.IsStalledDisk {
+ a.Analysis = StalledDiskPrimary
+ a.Description = "Primary has a stalled disk"
+ ca.hasClusterwideAction = true
+ //
} else if a.IsClusterPrimary && !a.LastCheckValid && a.CountReplicas == 0 {
a.Analysis = DeadPrimaryWithoutReplicas
a.Description = "Primary cannot be reached by vtorc and has no replica"
diff --git a/go/vt/vtorc/inst/analysis_dao_test.go b/go/vt/vtorc/inst/analysis_dao_test.go
index ae4f7279403..c04938e378c 100644
--- a/go/vt/vtorc/inst/analysis_dao_test.go
+++ b/go/vt/vtorc/inst/analysis_dao_test.go
@@ -34,10 +34,10 @@ var (
// The initialSQL is a set of insert commands copied from a dump of an actual running VTOrc instances. The relevant insert commands are here.
// This is a dump taken from a test running 4 tablets, zone1-101 is the primary, zone1-100 is a replica, zone1-112 is a rdonly and zone2-200 is a cross-cell replica.
initialSQL = []string{
- `INSERT INTO database_instance VALUES('zone1-0000000112','localhost',6747,'2022-12-28 07:26:04','2022-12-28 07:26:04',213696377,'8.0.31','ROW',1,1,'vt-0000000112-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000112-relay-bin.000002',15815,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-9240-92a06c3be3c2','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10816929,0,0,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-9240-92a06c3be3c2',1,1,'',1000000000000000000,1,0,0,0);`,
- `INSERT INTO database_instance VALUES('zone1-0000000100','localhost',6711,'2022-12-28 07:26:04','2022-12-28 07:26:04',1094500338,'8.0.31','ROW',1,1,'vt-0000000100-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000100-relay-bin.000002',15815,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-acf8-d6b0ef9f4eaa','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10103920,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-acf8-d6b0ef9f4eaa',1,1,'',1000000000000000000,1,0,1,0);`,
- `INSERT INTO database_instance VALUES('zone1-0000000101','localhost',6714,'2022-12-28 07:26:04','2022-12-28 07:26:04',390954723,'8.0.31','ROW',1,1,'vt-0000000101-bin.000001',15583,'',0,0,0,0,0,'',0,'',0,NULL,NULL,0,'','',0,'',0,0,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a4cc4-8680-11ed-a104-47706090afbd','2022-12-28 07:26:04','',0,0,0,'Homebrew','8.0','FULL',11366095,1,1,'ON',1,'','','729a4cc4-8680-11ed-a104-47706090afbd',-1,-1,'',1000000000000000000,1,1,0,2);`,
- `INSERT INTO database_instance VALUES('zone2-0000000200','localhost',6756,'2022-12-28 07:26:05','2022-12-28 07:26:05',444286571,'8.0.31','ROW',1,1,'vt-0000000200-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000200-relay-bin.000002',15815,1,0,'zone2','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a497c-8680-11ed-8ad4-3f51d747db75','2022-12-28 07:26:05','',1,0,0,'Homebrew','8.0','FULL',10443112,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a497c-8680-11ed-8ad4-3f51d747db75',1,1,'',1000000000000000000,1,0,1,0);`,
+ `INSERT INTO database_instance VALUES('zone1-0000000112','localhost',6747,'2022-12-28 07:26:04','2022-12-28 07:26:04',213696377,'8.0.31','ROW',1,1,'vt-0000000112-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000112-relay-bin.000002',15815,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-9240-92a06c3be3c2','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10816929,0,0,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-9240-92a06c3be3c2',1,1,'',1000000000000000000,1,0,0,0,false);`,
+ `INSERT INTO database_instance VALUES('zone1-0000000100','localhost',6711,'2022-12-28 07:26:04','2022-12-28 07:26:04',1094500338,'8.0.31','ROW',1,1,'vt-0000000100-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000100-relay-bin.000002',15815,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-acf8-d6b0ef9f4eaa','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10103920,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-acf8-d6b0ef9f4eaa',1,1,'',1000000000000000000,1,0,1,0,false);`,
+ `INSERT INTO database_instance VALUES('zone1-0000000101','localhost',6714,'2022-12-28 07:26:04','2022-12-28 07:26:04',390954723,'8.0.31','ROW',1,1,'vt-0000000101-bin.000001',15583,'',0,0,0,0,0,'',0,'',0,NULL,NULL,0,'','',0,'',0,0,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a4cc4-8680-11ed-a104-47706090afbd','2022-12-28 07:26:04','',0,0,0,'Homebrew','8.0','FULL',11366095,1,1,'ON',1,'','','729a4cc4-8680-11ed-a104-47706090afbd',-1,-1,'',1000000000000000000,1,1,0,2,false);`,
+ `INSERT INTO database_instance VALUES('zone2-0000000200','localhost',6756,'2022-12-28 07:26:05','2022-12-28 07:26:05',444286571,'8.0.31','ROW',1,1,'vt-0000000200-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000200-relay-bin.000002',15815,1,0,'zone2','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a497c-8680-11ed-8ad4-3f51d747db75','2022-12-28 07:26:05','',1,0,0,'Homebrew','8.0','FULL',10443112,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a497c-8680-11ed-8ad4-3f51d747db75',1,1,'',1000000000000000000,1,0,1,0,false);`,
`INSERT INTO vitess_tablet VALUES('zone1-0000000100','localhost',6711,'ks','0','zone1',2,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3130307d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363731307d20706f72745f6d61703a7b6b65793a227674222076616c75653a363730397d206b657973706163653a226b73222073686172643a22302220747970653a5245504c494341206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363731312064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`,
`INSERT INTO vitess_tablet VALUES('zone1-0000000101','localhost',6714,'ks','0','zone1',1,'2022-12-28 07:23:25.129898+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3130317d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363731337d20706f72745f6d61703a7b6b65793a227674222076616c75653a363731327d206b657973706163653a226b73222073686172643a22302220747970653a5052494d415259206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a36373134207072696d6172795f7465726d5f73746172745f74696d653a7b7365636f6e64733a31363732323132323035206e616e6f7365636f6e64733a3132393839383030307d2064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`,
`INSERT INTO vitess_tablet VALUES('zone1-0000000112','localhost',6747,'ks','0','zone1',3,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3131327d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363734367d20706f72745f6d61703a7b6b65793a227674222076616c75653a363734357d206b657973706163653a226b73222073686172643a22302220747970653a52444f4e4c59206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363734372064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`,
@@ -96,6 +96,29 @@ func TestGetReplicationAnalysisDecision(t *testing.T) {
keyspaceWanted: "ks",
shardWanted: "0",
codeWanted: PrimaryTabletDeleted,
+ }, {
+ name: "StalledDiskPrimary",
+ info: []*test.InfoForRecoveryAnalysis{{
+ TabletInfo: &topodatapb.Tablet{
+ Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 100},
+ Hostname: "localhost",
+ Keyspace: "ks",
+ Shard: "0",
+ Type: topodatapb.TabletType_PRIMARY,
+ MysqlHostname: "localhost",
+ MysqlPort: 6709,
+ },
+ DurabilityPolicy: "none",
+ LastCheckValid: 0,
+ CountReplicas: 4,
+ CountValidReplicas: 4,
+ CountValidReplicatingReplicas: 0,
+ IsPrimary: 1,
+ IsStalledDisk: 1,
+ }},
+ keyspaceWanted: "ks",
+ shardWanted: "0",
+ codeWanted: StalledDiskPrimary,
}, {
name: "DeadPrimary",
info: []*test.InfoForRecoveryAnalysis{{
diff --git a/go/vt/vtorc/inst/instance.go b/go/vt/vtorc/inst/instance.go
index fef1e90acce..b7b097bb14d 100644
--- a/go/vt/vtorc/inst/instance.go
+++ b/go/vt/vtorc/inst/instance.go
@@ -91,6 +91,7 @@ type Instance struct {
IsUpToDate bool
IsRecentlyChecked bool
SecondsSinceLastSeen sql.NullInt64
+ StalledDisk bool
AllowTLS bool
diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go
index 66aef7c8a78..6fde60bcf36 100644
--- a/go/vt/vtorc/inst/instance_dao.go
+++ b/go/vt/vtorc/inst/instance_dao.go
@@ -175,6 +175,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named
var tablet *topodatapb.Tablet
var fs *replicationdatapb.FullStatus
readingStartTime := time.Now()
+ stalledDisk := false
instance := NewInstance()
instanceFound := false
partialSuccess := false
@@ -205,6 +206,9 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named
fs, err = fullStatus(tabletAlias)
if err != nil {
+ if config.GetStalledDiskPrimaryRecovery() && strings.Contains(err.Error(), "stalled disk") {
+ stalledDisk = true
+ }
goto Cleanup
}
partialSuccess = true // We at least managed to read something from the server.
@@ -381,9 +385,10 @@ Cleanup:
// Something is wrong, could be network-wise. Record that we
// tried to check the instance. last_attempted_check is also
- // updated on success by writeInstance.
+ // updated on success by writeInstance. If the reason is a
+ // stalled disk, we can record that as well.
latency.Start("backend")
- _ = UpdateInstanceLastChecked(tabletAlias, partialSuccess)
+ _ = UpdateInstanceLastChecked(tabletAlias, partialSuccess, stalledDisk)
latency.Stop("backend")
return nil, err
}
@@ -874,6 +879,7 @@ func mkInsertForInstances(instances []*Instance, instanceWasActuallyFound bool,
"semi_sync_primary_clients",
"semi_sync_replica_status",
"last_discovery_latency",
+ "stalled_disk",
}
values := make([]string, len(columns))
@@ -953,6 +959,7 @@ func mkInsertForInstances(instances []*Instance, instanceWasActuallyFound bool,
args = append(args, instance.SemiSyncPrimaryClients)
args = append(args, instance.SemiSyncReplicaStatus)
args = append(args, instance.LastDiscoveryLatency.Nanoseconds())
+ args = append(args, instance.StalledDisk)
}
sql, err := mkInsert("database_instance", columns, values, len(instances), insertIgnore)
@@ -998,16 +1005,18 @@ func WriteInstance(instance *Instance, instanceWasActuallyFound bool, lastError
// UpdateInstanceLastChecked updates the last_check timestamp in the vtorc backed database
// for a given instance
-func UpdateInstanceLastChecked(tabletAlias string, partialSuccess bool) error {
+func UpdateInstanceLastChecked(tabletAlias string, partialSuccess bool, stalledDisk bool) error {
writeFunc := func() error {
_, err := db.ExecVTOrc(`UPDATE database_instance
SET
last_checked = DATETIME('now'),
- last_check_partial_success = ?
+ last_check_partial_success = ?,
+ stalled_disk = ?
WHERE
alias = ?
`,
partialSuccess,
+ stalledDisk,
tabletAlias,
)
if err != nil {
diff --git a/go/vt/vtorc/inst/instance_dao_test.go b/go/vt/vtorc/inst/instance_dao_test.go
index 1a14041450c..235c9b2664e 100644
--- a/go/vt/vtorc/inst/instance_dao_test.go
+++ b/go/vt/vtorc/inst/instance_dao_test.go
@@ -64,13 +64,13 @@ func TestMkInsertSingle(t *testing.T) {
version, major_version, version_comment, binlog_server, read_only, binlog_format,
binlog_row_image, log_bin, log_replica_updates, binary_log_file, binary_log_pos, source_host, source_port, replica_net_timeout, heartbeat_interval,
replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant,
- source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, last_seen)
+ source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, stalled_disk, last_seen)
VALUES
- (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now'))
+ (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now'))
`
a1 := `zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT,
FULL, false, false, , 0, , 0, 0, 0,
- false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0,`
+ false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, false,`
sql1, args1, err := mkInsertForInstances(instances[:1], false, true)
require.NoError(t, err)
@@ -87,16 +87,16 @@ func TestMkInsertThree(t *testing.T) {
version, major_version, version_comment, binlog_server, read_only, binlog_format,
binlog_row_image, log_bin, log_replica_updates, binary_log_file, binary_log_pos, source_host, source_port, replica_net_timeout, heartbeat_interval,
replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant,
- source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, last_seen)
+ source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, stalled_disk, last_seen)
VALUES
- (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')),
- (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')),
- (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now'))
+ (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')),
+ (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')),
+ (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now'))
`
a3 := `
- zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0,
- zone1-i720, i720, 3306, 720, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 20, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0,
- zone1-i730, i730, 3306, 730, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 30, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0,
+ zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, false,
+ zone1-i720, i720, 3306, 720, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 20, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, false,
+ zone1-i730, i730, 3306, 730, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 30, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, false,
`
sql3, args3, err := mkInsertForInstances(instances[:3], true, true)
@@ -483,9 +483,9 @@ func TestReadOutdatedInstanceKeys(t *testing.T) {
tabletAliases, err := ReadOutdatedInstanceKeys()
- errInDataCollection := db.QueryVTOrcRowsMap(`select alias,
-last_checked,
-last_attempted_check,
+ errInDataCollection := db.QueryVTOrcRowsMap(`select alias,
+last_checked,
+last_attempted_check,
ROUND((JULIANDAY(DATETIME('now')) - JULIANDAY(last_checked)) * 86400) AS difference,
last_attempted_check <= last_checked as use1,
last_checked < DATETIME('now', '-1500 second') as is_outdated1,
@@ -507,22 +507,32 @@ func TestUpdateInstanceLastChecked(t *testing.T) {
name string
tabletAlias string
partialSuccess bool
+ stalledDisk bool
conditionToCheck string
}{
{
name: "Verify updated last checked",
tabletAlias: "zone1-0000000100",
partialSuccess: false,
- conditionToCheck: "last_checked >= DATETIME('now', '-30 second') and last_check_partial_success = false",
+ stalledDisk: false,
+ conditionToCheck: "last_checked >= DATETIME('now', '-30 second') and last_check_partial_success = false and stalled_disk = false",
}, {
name: "Verify partial success",
tabletAlias: "zone1-0000000100",
partialSuccess: true,
- conditionToCheck: "last_checked >= DATETIME('now', '-30 second') and last_check_partial_success = true",
+ stalledDisk: false,
+ conditionToCheck: "last_checked >= datetime('now', '-30 second') and last_check_partial_success = true and stalled_disk = false",
+ }, {
+ name: "Verify stalled disk",
+ tabletAlias: "zone1-0000000100",
+ partialSuccess: false,
+ stalledDisk: true,
+ conditionToCheck: "last_checked >= DATETIME('now', '-30 second') and last_check_partial_success = false and stalled_disk = true",
}, {
name: "Verify no error on unknown tablet",
tabletAlias: "unknown tablet",
partialSuccess: true,
+ stalledDisk: true,
},
}
@@ -537,7 +547,7 @@ func TestUpdateInstanceLastChecked(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- err := UpdateInstanceLastChecked(tt.tabletAlias, tt.partialSuccess)
+ err := UpdateInstanceLastChecked(tt.tabletAlias, tt.partialSuccess, tt.stalledDisk)
require.NoError(t, err)
if tt.conditionToCheck != "" {
diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go
index 0d0bbff5b53..1bf69098b65 100644
--- a/go/vt/vtorc/logic/topology_recovery.go
+++ b/go/vt/vtorc/logic/topology_recovery.go
@@ -285,7 +285,7 @@ func checkAndRecoverGenericProblem(ctx context.Context, analysisEntry *inst.Repl
func getCheckAndRecoverFunctionCode(analysisCode inst.AnalysisCode, tabletAlias string) recoveryFunction {
switch analysisCode {
// primary
- case inst.DeadPrimary, inst.DeadPrimaryAndSomeReplicas:
+ case inst.DeadPrimary, inst.DeadPrimaryAndSomeReplicas, inst.StalledDiskPrimary:
// If ERS is disabled, we have no way of repairing the cluster.
if !config.ERSEnabled() {
log.Infof("VTOrc not configured to run ERS, skipping recovering %v", analysisCode)
diff --git a/go/vt/vtorc/logic/topology_recovery_test.go b/go/vt/vtorc/logic/topology_recovery_test.go
index f7658060b95..e539468dda4 100644
--- a/go/vt/vtorc/logic/topology_recovery_test.go
+++ b/go/vt/vtorc/logic/topology_recovery_test.go
@@ -42,6 +42,11 @@ func TestAnalysisEntriesHaveSameRecovery(t *testing.T) {
prevAnalysisCode: inst.DeadPrimary,
newAnalysisCode: inst.DeadPrimaryAndSomeReplicas,
shouldBeEqual: true,
+ }, {
+ // DeadPrimary and StalledDiskPrimary have the same recovery
+ prevAnalysisCode: inst.DeadPrimary,
+ newAnalysisCode: inst.StalledDiskPrimary,
+ shouldBeEqual: true,
}, {
// DeadPrimary and PrimaryTabletDeleted are different recoveries.
prevAnalysisCode: inst.DeadPrimary,
@@ -215,6 +220,16 @@ func TestGetCheckAndRecoverFunctionCode(t *testing.T) {
ersEnabled: false,
analysisCode: inst.DeadPrimary,
wantRecoveryFunction: noRecoveryFunc,
+ }, {
+ name: "StalledDiskPrimary with ERS enabled",
+ ersEnabled: true,
+ analysisCode: inst.StalledDiskPrimary,
+ wantRecoveryFunction: recoverDeadPrimaryFunc,
+ }, {
+ name: "StalledDiskPrimary with ERS disabled",
+ ersEnabled: false,
+ analysisCode: inst.StalledDiskPrimary,
+ wantRecoveryFunction: noRecoveryFunc,
}, {
name: "PrimaryTabletDeleted with ERS enabled",
ersEnabled: true,
diff --git a/go/vt/vtorc/test/recovery_analysis.go b/go/vt/vtorc/test/recovery_analysis.go
index 218a679bdb0..3a0bdb70b03 100644
--- a/go/vt/vtorc/test/recovery_analysis.go
+++ b/go/vt/vtorc/test/recovery_analysis.go
@@ -80,6 +80,7 @@ type InfoForRecoveryAnalysis struct {
MaxReplicaGTIDMode string
MaxReplicaGTIDErrant string
ReadOnly uint
+ IsStalledDisk uint
}
func (info *InfoForRecoveryAnalysis) ConvertToRowMap() sqlutils.RowMap {
@@ -145,6 +146,7 @@ func (info *InfoForRecoveryAnalysis) ConvertToRowMap() sqlutils.RowMap {
rowMap["semi_sync_replica_enabled"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.SemiSyncReplicaEnabled), Valid: true}
res, _ := prototext.Marshal(info.TabletInfo)
rowMap["tablet_info"] = sqlutils.CellData{String: string(res), Valid: true}
+ rowMap["is_stalled_disk"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.IsStalledDisk), Valid: true}
return rowMap
}
diff --git a/go/vt/vttablet/tabletmanager/disk_health_monitor.go b/go/vt/vttablet/tabletmanager/disk_health_monitor.go
new file mode 100644
index 00000000000..e35bc662a12
--- /dev/null
+++ b/go/vt/vttablet/tabletmanager/disk_health_monitor.go
@@ -0,0 +1,131 @@
+package tabletmanager
+
+import (
+ "context"
+ "os"
+ "path"
+ "strconv"
+ "sync"
+ "time"
+)
+
+type DiskHealthMonitor interface {
+ // IsDiskStalled returns true if the disk is stalled or rejecting writes.
+ IsDiskStalled() bool
+}
+
+func newDiskHealthMonitor(ctx context.Context) DiskHealthMonitor {
+ if stalledDiskWriteDir == "" {
+ return newNoopDiskHealthMonitor()
+ }
+
+ return newPollingDiskHealthMonitor(ctx, attemptFileWrite, stalledDiskWriteInterval, stalledDiskWriteTimeout)
+}
+
+type writeFunction func() error
+
+func attemptFileWrite() error {
+ file, err := os.Create(path.Join(stalledDiskWriteDir, ".stalled_disk_check"))
+ if err != nil {
+ return err
+ }
+ _, err = file.WriteString(strconv.FormatInt(time.Now().UnixNano(), 10))
+ if err != nil {
+ return err
+ }
+ err = file.Sync()
+ if err != nil {
+ return err
+ }
+ return file.Close()
+}
+
+type pollingDiskHealthMonitor struct {
+ stalledMutex sync.RWMutex
+ stalled bool
+ writeInProgressMutex sync.RWMutex
+ writeInProgress bool
+ writeFunc writeFunction
+ pollingInterval time.Duration
+ writeTimeout time.Duration
+}
+
+var _ DiskHealthMonitor = &pollingDiskHealthMonitor{}
+
+func newPollingDiskHealthMonitor(ctx context.Context, writeFunc writeFunction, pollingInterval, writeTimeout time.Duration) *pollingDiskHealthMonitor {
+ fs := &pollingDiskHealthMonitor{
+ stalledMutex: sync.RWMutex{},
+ stalled: false,
+ writeInProgressMutex: sync.RWMutex{},
+ writeInProgress: false,
+ writeFunc: writeFunc,
+ pollingInterval: pollingInterval,
+ writeTimeout: writeTimeout,
+ }
+ go fs.poll(ctx)
+ return fs
+}
+
+func (fs *pollingDiskHealthMonitor) poll(ctx context.Context) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(fs.pollingInterval):
+ if fs.isWriteInProgress() {
+ continue
+ }
+
+ ch := make(chan error, 1)
+ go func() {
+ fs.setIsWriteInProgress(true)
+ err := fs.writeFunc()
+ fs.setIsWriteInProgress(false)
+ ch <- err
+ }()
+
+ select {
+ case <-time.After(fs.writeTimeout):
+ fs.setIsDiskStalled(true)
+ case err := <-ch:
+ fs.setIsDiskStalled(err != nil)
+ }
+ }
+ }
+}
+
+func (fs *pollingDiskHealthMonitor) IsDiskStalled() bool {
+ fs.stalledMutex.RLock()
+ defer fs.stalledMutex.RUnlock()
+ return fs.stalled
+}
+
+func (fs *pollingDiskHealthMonitor) setIsDiskStalled(isStalled bool) {
+ fs.stalledMutex.Lock()
+ defer fs.stalledMutex.Unlock()
+ fs.stalled = isStalled
+}
+
+func (fs *pollingDiskHealthMonitor) isWriteInProgress() bool {
+ fs.writeInProgressMutex.RLock()
+ defer fs.writeInProgressMutex.RUnlock()
+ return fs.writeInProgress
+}
+
+func (fs *pollingDiskHealthMonitor) setIsWriteInProgress(isInProgress bool) {
+ fs.writeInProgressMutex.Lock()
+ defer fs.writeInProgressMutex.Unlock()
+ fs.writeInProgress = isInProgress
+}
+
+type noopDiskHealthMonitor struct{}
+
+var _ DiskHealthMonitor = &noopDiskHealthMonitor{}
+
+func newNoopDiskHealthMonitor() DiskHealthMonitor {
+ return &noopDiskHealthMonitor{}
+}
+
+func (fs *noopDiskHealthMonitor) IsDiskStalled() bool {
+ return false
+}
diff --git a/go/vt/vttablet/tabletmanager/disk_health_monitor_test.go b/go/vt/vttablet/tabletmanager/disk_health_monitor_test.go
new file mode 100644
index 00000000000..fcb3b0ed258
--- /dev/null
+++ b/go/vt/vttablet/tabletmanager/disk_health_monitor_test.go
@@ -0,0 +1,103 @@
+package tabletmanager
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestDiskHealthMonitor_noStall(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ mockFileWriter := &sequencedMockWriter{}
+ diskHealthMonitor := newPollingDiskHealthMonitor(ctx, mockFileWriter.mockWriteFunction, 50*time.Millisecond, 25*time.Millisecond)
+
+ time.Sleep(300 * time.Millisecond)
+ if totalCreateCalls := mockFileWriter.getTotalCreateCalls(); totalCreateCalls != 5 {
+ t.Fatalf("expected 5 calls to createFile, got %d", totalCreateCalls)
+ }
+ if isStalled := diskHealthMonitor.IsDiskStalled(); isStalled {
+ t.Fatalf("expected isStalled to be false")
+ }
+}
+
+func TestDiskHealthMonitor_stallAndRecover(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ mockFileWriter := &sequencedMockWriter{sequencedWriteFunctions: []writeFunction{delayedWriteFunction(10*time.Millisecond, nil), delayedWriteFunction(300*time.Millisecond, nil)}}
+ diskHealthMonitor := newPollingDiskHealthMonitor(ctx, mockFileWriter.mockWriteFunction, 50*time.Millisecond, 25*time.Millisecond)
+
+ time.Sleep(300 * time.Millisecond)
+ if totalCreateCalls := mockFileWriter.getTotalCreateCalls(); totalCreateCalls != 2 {
+ t.Fatalf("expected 2 calls to createFile, got %d", totalCreateCalls)
+ }
+ if isStalled := diskHealthMonitor.IsDiskStalled(); !isStalled {
+ t.Fatalf("expected isStalled to be true")
+ }
+
+ time.Sleep(300 * time.Millisecond)
+ if totalCreateCalls := mockFileWriter.getTotalCreateCalls(); totalCreateCalls < 5 {
+ t.Fatalf("expected at least 5 calls to createFile, got %d", totalCreateCalls)
+ }
+ if isStalled := diskHealthMonitor.IsDiskStalled(); isStalled {
+ t.Fatalf("expected isStalled to be false")
+ }
+}
+
+func TestDiskHealthMonitor_errorIsStall(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ mockFileWriter := &sequencedMockWriter{defaultWriteFunction: delayedWriteFunction(10*time.Millisecond, errors.New("test error"))}
+ diskHealthMonitor := newPollingDiskHealthMonitor(ctx, mockFileWriter.mockWriteFunction, 50*time.Millisecond, 25*time.Millisecond)
+
+ time.Sleep(300 * time.Millisecond)
+ if totalCreateCalls := mockFileWriter.getTotalCreateCalls(); totalCreateCalls != 5 {
+ t.Fatalf("expected 5 calls to createFile, got %d", totalCreateCalls)
+ }
+ if isStalled := diskHealthMonitor.IsDiskStalled(); !isStalled {
+ t.Fatalf("expected isStalled to be true")
+ }
+}
+
+type sequencedMockWriter struct {
+ defaultWriteFunction writeFunction
+ sequencedWriteFunctions []writeFunction
+
+ totalCreateCalls int
+ totalCreateCallsMutex sync.RWMutex
+}
+
+func (smw *sequencedMockWriter) mockWriteFunction() error {
+ functionIndex := smw.getTotalCreateCalls()
+ smw.incrementTotalCreateCalls()
+
+ if functionIndex >= len(smw.sequencedWriteFunctions) {
+ if smw.defaultWriteFunction != nil {
+ return smw.defaultWriteFunction()
+ }
+ return delayedWriteFunction(10*time.Millisecond, nil)()
+ }
+
+ return smw.sequencedWriteFunctions[functionIndex]()
+}
+
+func (smw *sequencedMockWriter) incrementTotalCreateCalls() {
+ smw.totalCreateCallsMutex.Lock()
+ defer smw.totalCreateCallsMutex.Unlock()
+ smw.totalCreateCalls += 1
+}
+
+func (smw *sequencedMockWriter) getTotalCreateCalls() int {
+ smw.totalCreateCallsMutex.RLock()
+ defer smw.totalCreateCallsMutex.RUnlock()
+ return smw.totalCreateCalls
+}
+
+func delayedWriteFunction(delay time.Duration, err error) writeFunction {
+ return func() error {
+ time.Sleep(delay)
+ return err
+ }
+}
diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go
index 47794e92b9a..bce172b52d4 100644
--- a/go/vt/vttablet/tabletmanager/rpc_replication.go
+++ b/go/vt/vttablet/tabletmanager/rpc_replication.go
@@ -18,6 +18,7 @@ package tabletmanager
import (
"context"
+ "errors"
"fmt"
"runtime"
"strings"
@@ -60,6 +61,13 @@ func (tm *TabletManager) FullStatus(ctx context.Context) (*replicationdatapb.Ful
if err := tm.waitForGrantsToHaveApplied(ctx); err != nil {
return nil, err
}
+
+ // Return error if the disk is stalled or rejecting writes.
+ // Noop by default, must be enabled with the flag "stalled-disk-write-dir".
+ if tm.dhMonitor.IsDiskStalled() {
+ return nil, errors.New("stalled disk")
+ }
+
// Server ID - "select @@global.server_id"
serverID, err := tm.MysqlDaemon.GetServerID(ctx)
if err != nil {
diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go
index fbef04de357..6da165ab330 100644
--- a/go/vt/vttablet/tabletmanager/tm_init.go
+++ b/go/vt/vttablet/tabletmanager/tm_init.go
@@ -95,8 +95,11 @@ var (
skipBuildInfoTags = "/.*/"
initTags flagutil.StringMapValue
- initTimeout = 1 * time.Minute
- mysqlShutdownTimeout = mysqlctl.DefaultShutdownTimeout
+ initTimeout = 1 * time.Minute
+ mysqlShutdownTimeout = mysqlctl.DefaultShutdownTimeout
+ stalledDiskWriteDir = ""
+ stalledDiskWriteTimeout = 30 * time.Second
+ stalledDiskWriteInterval = 5 * time.Second
)
func registerInitFlags(fs *pflag.FlagSet) {
@@ -109,6 +112,9 @@ func registerInitFlags(fs *pflag.FlagSet) {
fs.Var(&initTags, "init_tags", "(init parameter) comma separated list of key:value pairs used to tag the tablet")
fs.DurationVar(&initTimeout, "init_timeout", initTimeout, "(init parameter) timeout to use for the init phase.")
fs.DurationVar(&mysqlShutdownTimeout, "mysql-shutdown-timeout", mysqlShutdownTimeout, "timeout to use when MySQL is being shut down.")
+ fs.StringVar(&stalledDiskWriteDir, "stalled-disk-write-dir", stalledDiskWriteDir, "if provided, tablet will attempt to write a file to this directory to check if the disk is stalled")
+ fs.DurationVar(&stalledDiskWriteTimeout, "stalled-disk-write-timeout", stalledDiskWriteTimeout, "if writes exceed this duration, the disk is considered stalled")
+ fs.DurationVar(&stalledDiskWriteInterval, "stalled-disk-write-interval", stalledDiskWriteInterval, "how often to write to the disk to check whether it is stalled")
}
var (
@@ -164,6 +170,7 @@ type TabletManager struct {
VREngine *vreplication.Engine
VDiffEngine *vdiff.Engine
Env *vtenv.Environment
+ dhMonitor DiskHealthMonitor
// tmc is used to run an RPC against other vttablets.
tmc tmclient.TabletManagerClient
@@ -372,6 +379,7 @@ func (tm *TabletManager) Start(tablet *topodatapb.Tablet, config *tabletenv.Tabl
tm.tmc = tmclient.NewTabletManagerClient()
tm.tmState = newTMState(tm, tablet)
tm.actionSema = semaphore.NewWeighted(1)
+ tm.dhMonitor = newDiskHealthMonitor(tm.BatchCtx)
tm._waitForGrantsComplete = make(chan struct{})
tm.baseTabletType = tablet.Type