From 989cce81d4a3cef88137841892b91a35dde313e7 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Mon, 13 Mar 2023 08:57:44 +0800 Subject: [PATCH] etcdserver: integrate watchdog with all storage read/write operations Signed-off-by: Benjamin Wang --- server/etcdserver/api/snap/snapshotter.go | 3 +++ server/storage/backend/batch_tx.go | 15 ++++++++++++++- server/storage/wal/wal.go | 18 ++++++++++++++++-- 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/server/etcdserver/api/snap/snapshotter.go b/server/etcdserver/api/snap/snapshotter.go index 093ab6bc9149..ffc299bb7dee 100644 --- a/server/etcdserver/api/snap/snapshotter.go +++ b/server/etcdserver/api/snap/snapshotter.go @@ -30,6 +30,7 @@ import ( "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/server/v3/etcdserver/api/snap/snappb" "go.etcd.io/etcd/server/v3/storage/wal/walpb" + "go.etcd.io/etcd/server/v3/watchdog" "go.etcd.io/raft/v3" "go.etcd.io/raft/v3/raftpb" @@ -88,7 +89,9 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error { spath := filepath.Join(s.dir, fname) fsyncStart := time.Now() + cancel := watchdog.Register("save v2 snapshot") err = pioutil.WriteAndSyncFile(spath, d, 0666) + cancel() snapFsyncSec.Observe(time.Since(fsyncStart).Seconds()) if err != nil { diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index 0d12a0868ddc..91bb7067e7db 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" bolt "go.etcd.io/bbolt" + "go.etcd.io/etcd/server/v3/watchdog" ) type BucketID int @@ -114,7 +115,9 @@ func (t *batchTx) RUnlock() { } func (t *batchTx) UnsafeCreateBucket(bucket Bucket) { + cancel := watchdog.Register("batchTx createBucket") _, err := t.tx.CreateBucket(bucket.Name()) + cancel() if err != nil && err != bolt.ErrBucketExists { t.backend.lg.Fatal( "failed to create a bucket", @@ -126,7 +129,9 @@ func (t *batchTx) UnsafeCreateBucket(bucket Bucket) { } func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) { + cancel := watchdog.Register("batchTx deleteBucket") err := t.tx.DeleteBucket(bucket.Name()) + cancel() if err != nil && err != bolt.ErrBucketNotFound { t.backend.lg.Fatal( "failed to delete a bucket", @@ -161,7 +166,11 @@ func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq boo // this can delay the page split and reduce space usage. bucket.FillPercent = 0.9 } - if err := bucket.Put(key, value); err != nil { + + cancel := watchdog.Register("batchTx put") + err := bucket.Put(key, value) + cancel() + if err != nil { t.backend.lg.Fatal( "failed to write to a bucket", zap.Stringer("bucket-name", bucketType), @@ -216,7 +225,9 @@ func (t *batchTx) UnsafeDelete(bucketType Bucket, key []byte) { zap.Stack("stack"), ) } + cancel := watchdog.Register("batchTx delete") err := bucket.Delete(key) + cancel() if err != nil { t.backend.lg.Fatal( "failed to delete a key", @@ -268,9 +279,11 @@ func (t *batchTx) commit(stop bool) { start := time.Now() + cancel := watchdog.Register("batchTx commit") // gofail: var beforeCommit struct{} err := t.tx.Commit() // gofail: var afterCommit struct{} + cancel() rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds()) spillSec.Observe(t.tx.Stats().SpillTime.Seconds()) diff --git a/server/storage/wal/wal.go b/server/storage/wal/wal.go index 7f8b25f5ddd2..6d47a9a15347 100644 --- a/server/storage/wal/wal.go +++ b/server/storage/wal/wal.go @@ -29,6 +29,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/server/v3/storage/wal/walpb" + "go.etcd.io/etcd/server/v3/watchdog" "go.etcd.io/raft/v3" "go.etcd.io/raft/v3/raftpb" @@ -798,6 +799,8 @@ func (w *WAL) cut() error { } func (w *WAL) sync() error { + cancel := watchdog.Register("WAL sync") + defer cancel() if w.encoder != nil { if err := w.encoder.flush(); err != nil { return err @@ -935,12 +938,19 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error { mustSync := raft.MustSync(st, w.state, len(ents)) // TODO(xiangli): no more reference operator + cancel := watchdog.Register("WAL saveEntry") for i := range ents { if err := w.saveEntry(&ents[i]); err != nil { + cancel() return err } } - if err := w.saveState(&st); err != nil { + cancel() + + cancel = watchdog.Register("WAL saveState") + err := w.saveState(&st) + cancel() + if err != nil { return err } @@ -972,9 +982,13 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { defer w.mu.Unlock() rec := &walpb.Record{Type: SnapshotType, Data: b} - if err := w.encoder.encode(rec); err != nil { + cancel := watchdog.Register("WAL saveSnapshot") + err := w.encoder.encode(rec) + cancel() + if err != nil { return err } + // update enti only when snapshot is ahead of last index if w.enti < e.Index { w.enti = e.Index