diff --git a/pkg/watchdog/watchdog.go b/pkg/watchdog/watchdog.go new file mode 100644 index 00000000000..7fb4d36fc89 --- /dev/null +++ b/pkg/watchdog/watchdog.go @@ -0,0 +1,133 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package watchdog + +import ( + "sync" + "time" + + "go.uber.org/zap" +) + +// Interface defines the watchdog interface. +type Interface interface { + // Register registers an activity with the given name in watchdog. + // A callback function is returned, and which is supposed to be + // invoked to unregister the activity when it is done. + // The watchdog keeps monitoring the activity until the callback + // function is called. + Register(name string) func() + + // Execute executes a function under the watchdog monitoring. + // It's just a syntactic sugar of `Register`, because it calls + // the callback function automatically on behalf of the callers. + Execute(name string, fn func()) +} + +const ( + tickMs = 100 +) + +type activity struct { + id uint64 + name string + inactiveElapsed int +} + +type watchdog struct { + lg *zap.Logger + nextActivityId uint64 + + ticker *time.Ticker + inactiveTimeoutTick int + + mu sync.Mutex + activities map[uint64]*activity + + stopC <-chan struct{} + cleanup func() +} + +func New(lg *zap.Logger, stopC <-chan struct{}, cleanup func(), inactiveTimeoutMs int64) Interface { + wdInstance := &watchdog{ + lg: lg, + stopC: stopC, + cleanup: cleanup, + + inactiveTimeoutTick: int(inactiveTimeoutMs / tickMs), + activities: make(map[uint64]*activity), + ticker: time.NewTicker(tickMs * time.Millisecond), + } + go wdInstance.run() + + return wdInstance +} + +func (wd *watchdog) run() { + inactiveTimeout := time.Duration(wd.inactiveTimeoutTick*tickMs) * time.Millisecond + wd.lg.Info("Watchdog is running", zap.Duration("inactiveTimeout", inactiveTimeout)) + for { + select { + case <-wd.ticker.C: + wd.mu.Lock() + for _, v := range wd.activities { + v.inactiveElapsed++ + if v.inactiveElapsed > wd.inactiveTimeoutTick/2 { + elapsedTime := time.Duration(v.inactiveElapsed*tickMs) * time.Millisecond + wd.lg.Warn("Slow activity detected", zap.String("activity", v.name), zap.Duration("duration", elapsedTime)) + if v.inactiveElapsed > wd.inactiveTimeoutTick { + wd.mu.Unlock() + wd.cleanup() + wd.lg.Panic("Inactive activity detected", zap.String("activity", v.name), zap.Duration("duration", elapsedTime)) + } + } + } + wd.mu.Unlock() + case <-wd.stopC: + wd.lg.Info("Watchdog stopped") + return + } + } +} + +func (wd *watchdog) Register(name string) func() { + wd.mu.Lock() + defer wd.mu.Unlock() + + id := wd.nextActivityId + wd.activities[id] = &activity{ + id: id, + name: name, + inactiveElapsed: 0, + } + wd.nextActivityId++ + + return func() { + wd.reset(id) + } +} + +func (wd *watchdog) Execute(name string, fn func()) { + unregister := wd.Register(name) + defer unregister() + + fn() +} + +func (wd *watchdog) reset(id uint64) { + wd.mu.Lock() + defer wd.mu.Unlock() + delete(wd.activities, id) +} diff --git a/server/embed/config.go b/server/embed/config.go index 2f4a3416742..5c54472aa12 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -74,6 +74,8 @@ const ( DefaultDiscoveryKeepAliveTime = 2 * time.Second DefaultDiscoveryKeepAliveTimeOut = 6 * time.Second + DefaultStorageWatchdogTimeout = 10 * time.Second + DefaultListenPeerURLs = "http://localhost:2380" DefaultListenClientURLs = "http://localhost:2379" @@ -380,6 +382,9 @@ type Config struct { // Defaults to 0. ExperimentalDistributedTracingSamplingRatePerMillion int `json:"experimental-distributed-tracing-sampling-rate"` + EnableStorageWatchDog bool `json:"experimental-enable-storage-watchdog"` + StorageWatchDogTimeout time.Duration `json:"experimental-storage-watchdog-timeout"` + // Logger is logger options: currently only supports "zap". // "capnslog" is removed in v3.5. Logger string `json:"logger"` @@ -532,6 +537,8 @@ func NewConfig() *Config { ExperimentalCompactHashCheckEnabled: false, ExperimentalCompactHashCheckTime: time.Minute, + StorageWatchDogTimeout: DefaultStorageWatchdogTimeout, + V2Deprecation: config.V2_DEPR_DEFAULT, DiscoveryCfg: v3discovery.DiscoveryConfig{ @@ -750,6 +757,17 @@ func (cfg *Config) Validate() error { return fmt.Errorf("--election-timeout[%vms] is too long, and should be set less than %vms", cfg.ElectionMs, maxElectionMs) } + if cfg.EnableStorageWatchDog { + if cfg.StorageWatchDogTimeout == 0 { + return fmt.Errorf("--experimental-storage-watchdog-timeout must be >0 (set to %s)", cfg.StorageWatchDogTimeout) + } + + if int64(2*cfg.ElectionMs) > cfg.StorageWatchDogTimeout.Milliseconds() { + return fmt.Errorf("--experimental-storage-watchdog-timeout[%s] should be at least as 2 times as --election-timeout[%dms]", + cfg.StorageWatchDogTimeout, cfg.ElectionMs) + } + } + // check this last since proxying in etcdmain may make this OK if cfg.ListenClientUrls != nil && cfg.AdvertiseClientUrls == nil { return ErrUnsetAdvertiseClientURLsFlag diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 3b0e4db7c01..7e154a1c2d7 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -34,12 +34,14 @@ import ( "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/pkg/v3/debugutil" runtimeutil "go.etcd.io/etcd/pkg/v3/runtime" + pkg_watchdog "go.etcd.io/etcd/pkg/v3/watchdog" "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/storage" "go.etcd.io/etcd/server/v3/verify" + server_watchdog "go.etcd.io/etcd/server/v3/watchdog" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/soheilhy/cmux" @@ -263,6 +265,13 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { return e, err } } + + if cfg.EnableStorageWatchDog { + inactiveTimeoutMs := cfg.StorageWatchDogTimeout.Milliseconds() + wd := pkg_watchdog.New(e.cfg.logger, e.Server.StoppingNotify(), e.Stop, inactiveTimeoutMs) + server_watchdog.SetStorageWatchdog(wd) + } + e.Server.Start() if err = e.servePeers(); err != nil { @@ -400,6 +409,28 @@ func (e *Etcd) Close() { lg.Sync() }() + e.Stop() + + if e.Server != nil { + e.Server.Stop() + } + + if e.errc != nil { + close(e.errc) + } +} + +func (e *Etcd) Stop() { + fields := []zap.Field{ + zap.String("name", e.cfg.Name), + zap.String("data-dir", e.cfg.Dir), + zap.Strings("advertise-peer-urls", e.cfg.getAdvertisePeerUrls()), + zap.Strings("advertise-client-urls", e.cfg.getAdvertiseClientUrls()), + } + + lg := e.GetLogger() + lg.Info("stopping etcd server", fields...) + e.closeOnce.Do(func() { close(e.stopc) }) @@ -436,11 +467,6 @@ func (e *Etcd) Close() { e.tracingExporterShutdown() } - // close rafthttp transports - if e.Server != nil { - e.Server.Stop() - } - // close all idle connections in peer handler (wait up to 1-second) for i := range e.Peers { if e.Peers[i] != nil && e.Peers[i].close != nil { @@ -449,9 +475,6 @@ func (e *Etcd) Close() { cancel() } } - if e.errc != nil { - close(e.errc) - } } func stopServers(ctx context.Context, ss *servers) { diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 954a08727d1..520cfed6c14 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -284,6 +284,9 @@ func newConfig() *config { fs.DurationVar(&cfg.ec.ExperimentalWaitClusterReadyTimeout, "experimental-wait-cluster-ready-timeout", cfg.ec.ExperimentalWaitClusterReadyTimeout, "Maximum duration to wait for the cluster to be ready.") fs.Uint64Var(&cfg.ec.SnapshotCatchUpEntries, "experimental-snapshot-catchup-entries", cfg.ec.SnapshotCatchUpEntries, "Number of entries for a slow follower to catch up after compacting the the raft storage entries.") + fs.BoolVar(&cfg.ec.EnableStorageWatchDog, "experimental-enable-storage-watchdog", cfg.ec.EnableStorageWatchDog, "Enable watchdog to detect inactive storage activities.") + fs.DurationVar(&cfg.ec.StorageWatchDogTimeout, "experimental-storage-watchdog-timeout", cfg.ec.StorageWatchDogTimeout, "Maximum inactive duration of each storage activity that the watchdog tolerates.") + // unsafe fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.") fs.BoolVar(&cfg.ec.ForceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster.") diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index 67b67add816..74c22a2ebf5 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -275,6 +275,10 @@ Experimental feature: Set the maximum time duration to wait for the cluster to be ready. --experimental-snapshot-catch-up-entries '5000' Number of entries for a slow follower to catch up after compacting the the raft storage entries. + --experimental-enable-storage-watchdog 'false' + Enable watchdog to detect inactive activities. + --experimental-storage-watchdog-timeout '10s' + Maximum inactive duration of each activity that the watchdog tolerates. Unsafe feature: --force-new-cluster 'false' diff --git a/server/etcdserver/api/snap/snapshotter.go b/server/etcdserver/api/snap/snapshotter.go index 093ab6bc914..4903c1367ac 100644 --- a/server/etcdserver/api/snap/snapshotter.go +++ b/server/etcdserver/api/snap/snapshotter.go @@ -30,6 +30,7 @@ import ( "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/server/v3/etcdserver/api/snap/snappb" "go.etcd.io/etcd/server/v3/storage/wal/walpb" + "go.etcd.io/etcd/server/v3/watchdog" "go.etcd.io/raft/v3" "go.etcd.io/raft/v3/raftpb" @@ -88,7 +89,9 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error { spath := filepath.Join(s.dir, fname) fsyncStart := time.Now() - err = pioutil.WriteAndSyncFile(spath, d, 0666) + watchdog.StorageWatchdog().Execute("save v2 snapshot", func() { + err = pioutil.WriteAndSyncFile(spath, d, 0666) + }) snapFsyncSec.Observe(time.Since(fsyncStart).Seconds()) if err != nil { diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index 0d12a0868dd..a2253e3261e 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" bolt "go.etcd.io/bbolt" + "go.etcd.io/etcd/server/v3/watchdog" ) type BucketID int @@ -114,7 +115,10 @@ func (t *batchTx) RUnlock() { } func (t *batchTx) UnsafeCreateBucket(bucket Bucket) { - _, err := t.tx.CreateBucket(bucket.Name()) + var err error + watchdog.StorageWatchdog().Execute("batchTx createBucket", func() { + _, err = t.tx.CreateBucket(bucket.Name()) + }) if err != nil && err != bolt.ErrBucketExists { t.backend.lg.Fatal( "failed to create a bucket", @@ -126,7 +130,10 @@ func (t *batchTx) UnsafeCreateBucket(bucket Bucket) { } func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) { - err := t.tx.DeleteBucket(bucket.Name()) + var err error + watchdog.StorageWatchdog().Execute("batchTx deleteBucket", func() { + err = t.tx.DeleteBucket(bucket.Name()) + }) if err != nil && err != bolt.ErrBucketNotFound { t.backend.lg.Fatal( "failed to delete a bucket", @@ -161,7 +168,12 @@ func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq boo // this can delay the page split and reduce space usage. bucket.FillPercent = 0.9 } - if err := bucket.Put(key, value); err != nil { + + var err error + watchdog.StorageWatchdog().Execute("batchTx put", func() { + err = bucket.Put(key, value) + }) + if err != nil { t.backend.lg.Fatal( "failed to write to a bucket", zap.Stringer("bucket-name", bucketType), @@ -216,7 +228,10 @@ func (t *batchTx) UnsafeDelete(bucketType Bucket, key []byte) { zap.Stack("stack"), ) } - err := bucket.Delete(key) + var err error + watchdog.StorageWatchdog().Execute("batchTx delete", func() { + err = bucket.Delete(key) + }) if err != nil { t.backend.lg.Fatal( "failed to delete a key", @@ -268,9 +283,12 @@ func (t *batchTx) commit(stop bool) { start := time.Now() - // gofail: var beforeCommit struct{} - err := t.tx.Commit() - // gofail: var afterCommit struct{} + var err error + watchdog.StorageWatchdog().Execute("batchTx commit", func() { + // gofail: var beforeCommit struct{} + err = t.tx.Commit() + // gofail: var afterCommit struct{} + }) rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds()) spillSec.Observe(t.tx.Stats().SpillTime.Seconds()) diff --git a/server/storage/wal/wal.go b/server/storage/wal/wal.go index 7f8b25f5ddd..e9d7b30b553 100644 --- a/server/storage/wal/wal.go +++ b/server/storage/wal/wal.go @@ -29,6 +29,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/server/v3/storage/wal/walpb" + "go.etcd.io/etcd/server/v3/watchdog" "go.etcd.io/raft/v3" "go.etcd.io/raft/v3/raftpb" @@ -798,28 +799,31 @@ func (w *WAL) cut() error { } func (w *WAL) sync() error { - if w.encoder != nil { - if err := w.encoder.flush(); err != nil { - return err + var err error + watchdog.StorageWatchdog().Execute("WAL sync", func() { + if w.encoder != nil { + if err = w.encoder.flush(); err != nil { + return + } } - } - if w.unsafeNoSync { - return nil - } + if w.unsafeNoSync { + return + } - start := time.Now() - err := fileutil.Fdatasync(w.tail().File) - - took := time.Since(start) - if took > warnSyncDuration { - w.lg.Warn( - "slow fdatasync", - zap.Duration("took", took), - zap.Duration("expected-duration", warnSyncDuration), - ) - } - walFsyncSec.Observe(took.Seconds()) + start := time.Now() + err = fileutil.Fdatasync(w.tail().File) + + took := time.Since(start) + if took > warnSyncDuration { + w.lg.Warn( + "slow fdatasync", + zap.Duration("took", took), + zap.Duration("expected-duration", warnSyncDuration), + ) + } + walFsyncSec.Observe(took.Seconds()) + }) return err } @@ -934,13 +938,22 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error { mustSync := raft.MustSync(st, w.state, len(ents)) - // TODO(xiangli): no more reference operator - for i := range ents { - if err := w.saveEntry(&ents[i]); err != nil { - return err + var err error + watchdog.StorageWatchdog().Execute("WAL saveEntry", func() { + for i := range ents { + if err = w.saveEntry(&ents[i]); err != nil { + break + } } + }) + if err != nil { + return err } - if err := w.saveState(&st); err != nil { + + watchdog.StorageWatchdog().Execute("WAL saveState", func() { + err = w.saveState(&st) + }) + if err != nil { return err } @@ -972,9 +985,14 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { defer w.mu.Unlock() rec := &walpb.Record{Type: SnapshotType, Data: b} - if err := w.encoder.encode(rec); err != nil { + var err error + watchdog.StorageWatchdog().Execute("WAL saveSnapshot", func() { + err = w.encoder.encode(rec) + }) + if err != nil { return err } + // update enti only when snapshot is ahead of last index if w.enti < e.Index { w.enti = e.Index diff --git a/server/watchdog/watchdog.go b/server/watchdog/watchdog.go new file mode 100644 index 00000000000..de7b00923fa --- /dev/null +++ b/server/watchdog/watchdog.go @@ -0,0 +1,43 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package watchdog + +import "go.etcd.io/etcd/pkg/v3/watchdog" + +var ( + storageWatchdog watchdog.Interface +) + +func SetStorageWatchdog(wd watchdog.Interface) { + storageWatchdog = wd +} + +func StorageWatchdog() watchdog.Interface { + if storageWatchdog != nil { + return storageWatchdog + } + // Storage watchdog isn't enabled. + return &NoopWatchdog{} +} + +type NoopWatchdog struct{} + +func (wd *NoopWatchdog) Register(name string) func() { + return func() {} +} + +func (wd *NoopWatchdog) Execute(name string, fn func()) { + fn() +}