diff --git a/server/embed/config.go b/server/embed/config.go index 75bcbc34152e..8e97ff2551e7 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -74,6 +74,8 @@ const ( DefaultDiscoveryKeepAliveTime = 2 * time.Second DefaultDiscoveryKeepAliveTimeOut = 6 * time.Second + DefaultWatchdogTolerateInactiveTimeout = 10 * time.Second + DefaultListenPeerURLs = "http://localhost:2380" DefaultListenClientURLs = "http://localhost:2379" @@ -380,6 +382,9 @@ type Config struct { // Defaults to 0. ExperimentalDistributedTracingSamplingRatePerMillion int `json:"experimental-distributed-tracing-sampling-rate"` + EnableWatchDog bool `json:"experimental-enable-watchdog"` + WatchDogTolerateInactiveTimeout time.Duration `json:"experimental-watchdog-tolerate-inactive-timeout"` + // Logger is logger options: currently only supports "zap". // "capnslog" is removed in v3.5. Logger string `json:"logger"` @@ -532,6 +537,8 @@ func NewConfig() *Config { ExperimentalCompactHashCheckEnabled: false, ExperimentalCompactHashCheckTime: time.Minute, + WatchDogTolerateInactiveTimeout: DefaultWatchdogTolerateInactiveTimeout, + V2Deprecation: config.V2_DEPR_DEFAULT, DiscoveryCfg: v3discovery.DiscoveryConfig{ @@ -750,6 +757,17 @@ func (cfg *Config) Validate() error { return fmt.Errorf("--election-timeout[%vms] is too long, and should be set less than %vms", cfg.ElectionMs, maxElectionMs) } + if cfg.EnableWatchDog { + if cfg.WatchDogTolerateInactiveTimeout == 0 { + return fmt.Errorf("--experimental-watchdog-tolerate-inactive-timeout must be >0 (set to %s)", cfg.WatchDogTolerateInactiveTimeout) + } + + if int64(2*cfg.ElectionMs) > cfg.WatchDogTolerateInactiveTimeout.Milliseconds() { + return fmt.Errorf("--experimental-watchdog-tolerate-inactive-timeout[%s] should be at least as 2 times as --election-timeout[%dms]", + cfg.WatchDogTolerateInactiveTimeout, cfg.ElectionMs) + } + } + // check this last since proxying in etcdmain may make this OK if cfg.LCUrls != nil && cfg.ACUrls == nil { return ErrUnsetAdvertiseClientURLsFlag diff --git a/server/embed/etcd.go b/server/embed/etcd.go index a03b4f1c9fe9..ba3748f5a80d 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -40,6 +40,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/storage" "go.etcd.io/etcd/server/v3/verify" + "go.etcd.io/etcd/server/v3/watchdog" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/soheilhy/cmux" @@ -263,6 +264,12 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { return e, err } } + + if cfg.EnableWatchDog { + inactiveTimeoutMs := cfg.WatchDogTolerateInactiveTimeout.Milliseconds() + watchdog.Start(e.cfg.logger, e.Server.StoppingNotify(), e.Stop, inactiveTimeoutMs) + } + e.Server.Start() if err = e.servePeers(); err != nil { @@ -400,6 +407,27 @@ func (e *Etcd) Close() { lg.Sync() }() + e.Stop() + + if e.Server != nil { + e.Server.Stop() + } + + if e.errc != nil { + close(e.errc) + } +} + +func (e *Etcd) Stop() { + fields := []zap.Field{ + zap.String("name", e.cfg.Name), + zap.String("data-dir", e.cfg.Dir), + zap.Strings("advertise-peer-urls", e.cfg.getAPURLs()), + zap.Strings("advertise-client-urls", e.cfg.getACURLs()), + } + lg := e.GetLogger() + lg.Info("stopping etcd server", fields...) + e.closeOnce.Do(func() { close(e.stopc) }) @@ -436,11 +464,6 @@ func (e *Etcd) Close() { e.tracingExporterShutdown() } - // close rafthttp transports - if e.Server != nil { - e.Server.Stop() - } - // close all idle connections in peer handler (wait up to 1-second) for i := range e.Peers { if e.Peers[i] != nil && e.Peers[i].close != nil { @@ -449,9 +472,6 @@ func (e *Etcd) Close() { cancel() } } - if e.errc != nil { - close(e.errc) - } } func stopServers(ctx context.Context, ss *servers) { diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 84763dd9a6db..1e204d9f55e3 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -284,6 +284,9 @@ func newConfig() *config { fs.DurationVar(&cfg.ec.ExperimentalWaitClusterReadyTimeout, "experimental-wait-cluster-ready-timeout", cfg.ec.ExperimentalWaitClusterReadyTimeout, "Maximum duration to wait for the cluster to be ready.") fs.Uint64Var(&cfg.ec.SnapshotCatchUpEntries, "experimental-snapshot-catchup-entries", cfg.ec.SnapshotCatchUpEntries, "Number of entries for a slow follower to catch up after compacting the the raft storage entries.") + fs.BoolVar(&cfg.ec.EnableWatchDog, "experimental-enable-watchdog", cfg.ec.EnableWatchDog, "Enable watchdog to detect inactive activities.") + fs.DurationVar(&cfg.ec.WatchDogTolerateInactiveTimeout, "experimental-watchdog-tolerate-inactive-timeout", cfg.ec.WatchDogTolerateInactiveTimeout, "Maximum inactive duration of each activity that the watchdog tolerates.") + // unsafe fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.") fs.BoolVar(&cfg.ec.ForceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster.") diff --git a/server/etcdserver/api/snap/snapshotter.go b/server/etcdserver/api/snap/snapshotter.go index 093ab6bc9149..ffc299bb7dee 100644 --- a/server/etcdserver/api/snap/snapshotter.go +++ b/server/etcdserver/api/snap/snapshotter.go @@ -30,6 +30,7 @@ import ( "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/server/v3/etcdserver/api/snap/snappb" "go.etcd.io/etcd/server/v3/storage/wal/walpb" + "go.etcd.io/etcd/server/v3/watchdog" "go.etcd.io/raft/v3" "go.etcd.io/raft/v3/raftpb" @@ -88,7 +89,9 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error { spath := filepath.Join(s.dir, fname) fsyncStart := time.Now() + cancel := watchdog.Register("save v2 snapshot") err = pioutil.WriteAndSyncFile(spath, d, 0666) + cancel() snapFsyncSec.Observe(time.Since(fsyncStart).Seconds()) if err != nil { diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index 0d12a0868ddc..91bb7067e7db 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" bolt "go.etcd.io/bbolt" + "go.etcd.io/etcd/server/v3/watchdog" ) type BucketID int @@ -114,7 +115,9 @@ func (t *batchTx) RUnlock() { } func (t *batchTx) UnsafeCreateBucket(bucket Bucket) { + cancel := watchdog.Register("batchTx createBucket") _, err := t.tx.CreateBucket(bucket.Name()) + cancel() if err != nil && err != bolt.ErrBucketExists { t.backend.lg.Fatal( "failed to create a bucket", @@ -126,7 +129,9 @@ func (t *batchTx) UnsafeCreateBucket(bucket Bucket) { } func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) { + cancel := watchdog.Register("batchTx deleteBucket") err := t.tx.DeleteBucket(bucket.Name()) + cancel() if err != nil && err != bolt.ErrBucketNotFound { t.backend.lg.Fatal( "failed to delete a bucket", @@ -161,7 +166,11 @@ func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq boo // this can delay the page split and reduce space usage. bucket.FillPercent = 0.9 } - if err := bucket.Put(key, value); err != nil { + + cancel := watchdog.Register("batchTx put") + err := bucket.Put(key, value) + cancel() + if err != nil { t.backend.lg.Fatal( "failed to write to a bucket", zap.Stringer("bucket-name", bucketType), @@ -216,7 +225,9 @@ func (t *batchTx) UnsafeDelete(bucketType Bucket, key []byte) { zap.Stack("stack"), ) } + cancel := watchdog.Register("batchTx delete") err := bucket.Delete(key) + cancel() if err != nil { t.backend.lg.Fatal( "failed to delete a key", @@ -268,9 +279,11 @@ func (t *batchTx) commit(stop bool) { start := time.Now() + cancel := watchdog.Register("batchTx commit") // gofail: var beforeCommit struct{} err := t.tx.Commit() // gofail: var afterCommit struct{} + cancel() rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds()) spillSec.Observe(t.tx.Stats().SpillTime.Seconds()) diff --git a/server/storage/wal/wal.go b/server/storage/wal/wal.go index 7f8b25f5ddd2..6d47a9a15347 100644 --- a/server/storage/wal/wal.go +++ b/server/storage/wal/wal.go @@ -29,6 +29,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/server/v3/storage/wal/walpb" + "go.etcd.io/etcd/server/v3/watchdog" "go.etcd.io/raft/v3" "go.etcd.io/raft/v3/raftpb" @@ -798,6 +799,8 @@ func (w *WAL) cut() error { } func (w *WAL) sync() error { + cancel := watchdog.Register("WAL sync") + defer cancel() if w.encoder != nil { if err := w.encoder.flush(); err != nil { return err @@ -935,12 +938,19 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error { mustSync := raft.MustSync(st, w.state, len(ents)) // TODO(xiangli): no more reference operator + cancel := watchdog.Register("WAL saveEntry") for i := range ents { if err := w.saveEntry(&ents[i]); err != nil { + cancel() return err } } - if err := w.saveState(&st); err != nil { + cancel() + + cancel = watchdog.Register("WAL saveState") + err := w.saveState(&st) + cancel() + if err != nil { return err } @@ -972,9 +982,13 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { defer w.mu.Unlock() rec := &walpb.Record{Type: SnapshotType, Data: b} - if err := w.encoder.encode(rec); err != nil { + cancel := watchdog.Register("WAL saveSnapshot") + err := w.encoder.encode(rec) + cancel() + if err != nil { return err } + // update enti only when snapshot is ahead of last index if w.enti < e.Index { w.enti = e.Index