Skip to content

Commit

Permalink
etcdserver: integrate watchdog with all storage read/write operations"
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Mar 13, 2023
1 parent 79cce3a commit b8e2988
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 11 deletions.
18 changes: 18 additions & 0 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ const (
DefaultDiscoveryKeepAliveTime = 2 * time.Second
DefaultDiscoveryKeepAliveTimeOut = 6 * time.Second

DefaultWatchdogTolerateInactiveTimeout = 10 * time.Second

DefaultListenPeerURLs = "http://localhost:2380"
DefaultListenClientURLs = "http://localhost:2379"

Expand Down Expand Up @@ -380,6 +382,9 @@ type Config struct {
// Defaults to 0.
ExperimentalDistributedTracingSamplingRatePerMillion int `json:"experimental-distributed-tracing-sampling-rate"`

EnableWatchDog bool `json:"experimental-enable-watchdog"`
WatchDogTolerateInactiveTimeout time.Duration `json:"experimental-watchdog-tolerate-inactive-timeout"`

// Logger is logger options: currently only supports "zap".
// "capnslog" is removed in v3.5.
Logger string `json:"logger"`
Expand Down Expand Up @@ -532,6 +537,8 @@ func NewConfig() *Config {
ExperimentalCompactHashCheckEnabled: false,
ExperimentalCompactHashCheckTime: time.Minute,

WatchDogTolerateInactiveTimeout: DefaultWatchdogTolerateInactiveTimeout,

V2Deprecation: config.V2_DEPR_DEFAULT,

DiscoveryCfg: v3discovery.DiscoveryConfig{
Expand Down Expand Up @@ -750,6 +757,17 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("--election-timeout[%vms] is too long, and should be set less than %vms", cfg.ElectionMs, maxElectionMs)
}

if cfg.EnableWatchDog {
if cfg.WatchDogTolerateInactiveTimeout == 0 {
return fmt.Errorf("--experimental-watchdog-tolerate-inactive-timeout must be >0 (set to %s)", cfg.WatchDogTolerateInactiveTimeout)
}

if int64(2*cfg.ElectionMs) > cfg.WatchDogTolerateInactiveTimeout.Milliseconds() {
return fmt.Errorf("--experimental-watchdog-tolerate-inactive-timeout[%s] should be at least as 2 times as --election-timeout[%dms]",
cfg.WatchDogTolerateInactiveTimeout, cfg.ElectionMs)
}
}

// check this last since proxying in etcdmain may make this OK
if cfg.LCUrls != nil && cfg.ACUrls == nil {
return ErrUnsetAdvertiseClientURLsFlag
Expand Down
36 changes: 28 additions & 8 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
"go.etcd.io/etcd/server/v3/storage"
"go.etcd.io/etcd/server/v3/verify"
"go.etcd.io/etcd/server/v3/watchdog"

grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/soheilhy/cmux"
Expand Down Expand Up @@ -263,6 +264,12 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
return e, err
}
}

if cfg.EnableWatchDog {
inactiveTimeoutMs := cfg.WatchDogTolerateInactiveTimeout.Milliseconds()
watchdog.Start(e.cfg.logger, e.Server.StoppingNotify(), e.Stop, inactiveTimeoutMs)
}

e.Server.Start()

if err = e.servePeers(); err != nil {
Expand Down Expand Up @@ -400,6 +407,27 @@ func (e *Etcd) Close() {
lg.Sync()
}()

e.Stop()

if e.Server != nil {
e.Server.Stop()
}

if e.errc != nil {
close(e.errc)
}
}

func (e *Etcd) Stop() {
fields := []zap.Field{
zap.String("name", e.cfg.Name),
zap.String("data-dir", e.cfg.Dir),
zap.Strings("advertise-peer-urls", e.cfg.getAPURLs()),
zap.Strings("advertise-client-urls", e.cfg.getACURLs()),
}
lg := e.GetLogger()
lg.Info("stopping etcd server", fields...)

e.closeOnce.Do(func() {
close(e.stopc)
})
Expand Down Expand Up @@ -436,11 +464,6 @@ func (e *Etcd) Close() {
e.tracingExporterShutdown()
}

// close rafthttp transports
if e.Server != nil {
e.Server.Stop()
}

// close all idle connections in peer handler (wait up to 1-second)
for i := range e.Peers {
if e.Peers[i] != nil && e.Peers[i].close != nil {
Expand All @@ -449,9 +472,6 @@ func (e *Etcd) Close() {
cancel()
}
}
if e.errc != nil {
close(e.errc)
}
}

func stopServers(ctx context.Context, ss *servers) {
Expand Down
3 changes: 3 additions & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ func newConfig() *config {
fs.DurationVar(&cfg.ec.ExperimentalWaitClusterReadyTimeout, "experimental-wait-cluster-ready-timeout", cfg.ec.ExperimentalWaitClusterReadyTimeout, "Maximum duration to wait for the cluster to be ready.")
fs.Uint64Var(&cfg.ec.SnapshotCatchUpEntries, "experimental-snapshot-catchup-entries", cfg.ec.SnapshotCatchUpEntries, "Number of entries for a slow follower to catch up after compacting the the raft storage entries.")

fs.BoolVar(&cfg.ec.EnableWatchDog, "experimental-enable-watchdog", cfg.ec.EnableWatchDog, "Enable watchdog to detect inactive activities.")
fs.DurationVar(&cfg.ec.WatchDogTolerateInactiveTimeout, "experimental-watchdog-tolerate-inactive-timeout", cfg.ec.WatchDogTolerateInactiveTimeout, "Maximum inactive duration of each activity that the watchdog tolerates.")

// unsafe
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
fs.BoolVar(&cfg.ec.ForceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster.")
Expand Down
3 changes: 3 additions & 0 deletions server/etcdserver/api/snap/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap/snappb"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/etcd/server/v3/watchdog"
"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"

Expand Down Expand Up @@ -88,7 +89,9 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
spath := filepath.Join(s.dir, fname)

fsyncStart := time.Now()
cancel := watchdog.Register("save v2 snapshot")
err = pioutil.WriteAndSyncFile(spath, d, 0666)
cancel()
snapFsyncSec.Observe(time.Since(fsyncStart).Seconds())

if err != nil {
Expand Down
15 changes: 14 additions & 1 deletion server/storage/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.uber.org/zap"

bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/server/v3/watchdog"
)

type BucketID int
Expand Down Expand Up @@ -114,7 +115,9 @@ func (t *batchTx) RUnlock() {
}

func (t *batchTx) UnsafeCreateBucket(bucket Bucket) {
cancel := watchdog.Register("batchTx createBucket")
_, err := t.tx.CreateBucket(bucket.Name())
cancel()
if err != nil && err != bolt.ErrBucketExists {
t.backend.lg.Fatal(
"failed to create a bucket",
Expand All @@ -126,7 +129,9 @@ func (t *batchTx) UnsafeCreateBucket(bucket Bucket) {
}

func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) {
cancel := watchdog.Register("batchTx deleteBucket")
err := t.tx.DeleteBucket(bucket.Name())
cancel()
if err != nil && err != bolt.ErrBucketNotFound {
t.backend.lg.Fatal(
"failed to delete a bucket",
Expand Down Expand Up @@ -161,7 +166,11 @@ func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq boo
// this can delay the page split and reduce space usage.
bucket.FillPercent = 0.9
}
if err := bucket.Put(key, value); err != nil {

cancel := watchdog.Register("batchTx put")
err := bucket.Put(key, value)
cancel()
if err != nil {
t.backend.lg.Fatal(
"failed to write to a bucket",
zap.Stringer("bucket-name", bucketType),
Expand Down Expand Up @@ -216,7 +225,9 @@ func (t *batchTx) UnsafeDelete(bucketType Bucket, key []byte) {
zap.Stack("stack"),
)
}
cancel := watchdog.Register("batchTx delete")
err := bucket.Delete(key)
cancel()
if err != nil {
t.backend.lg.Fatal(
"failed to delete a key",
Expand Down Expand Up @@ -268,9 +279,11 @@ func (t *batchTx) commit(stop bool) {

start := time.Now()

cancel := watchdog.Register("batchTx commit")
// gofail: var beforeCommit struct{}
err := t.tx.Commit()
// gofail: var afterCommit struct{}
cancel()

rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
Expand Down
18 changes: 16 additions & 2 deletions server/storage/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/etcd/server/v3/watchdog"
"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"

Expand Down Expand Up @@ -798,6 +799,8 @@ func (w *WAL) cut() error {
}

func (w *WAL) sync() error {
cancel := watchdog.Register("WAL sync")
defer cancel()
if w.encoder != nil {
if err := w.encoder.flush(); err != nil {
return err
Expand Down Expand Up @@ -935,12 +938,19 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
mustSync := raft.MustSync(st, w.state, len(ents))

// TODO(xiangli): no more reference operator
cancel := watchdog.Register("WAL saveEntry")
for i := range ents {
if err := w.saveEntry(&ents[i]); err != nil {
cancel()
return err
}
}
if err := w.saveState(&st); err != nil {
cancel()

cancel = watchdog.Register("WAL saveState")
err := w.saveState(&st)
cancel()
if err != nil {
return err
}

Expand Down Expand Up @@ -972,9 +982,13 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
defer w.mu.Unlock()

rec := &walpb.Record{Type: SnapshotType, Data: b}
if err := w.encoder.encode(rec); err != nil {
cancel := watchdog.Register("WAL saveSnapshot")
err := w.encoder.encode(rec)
cancel()
if err != nil {
return err
}

// update enti only when snapshot is ahead of last index
if w.enti < e.Index {
w.enti = e.Index
Expand Down

0 comments on commit b8e2988

Please sign in to comment.