Skip to content

Commit

Permalink
etcdserver: add watchdog to monitor inactive activities
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Mar 13, 2023
1 parent e2a5df5 commit 79cce3a
Showing 1 changed file with 125 additions and 0 deletions.
125 changes: 125 additions & 0 deletions server/watchdog/watchdog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package watchdog

import (
"sync"
"time"

"go.uber.org/zap"
)

const (
tickMs = 100
)

var (
wdInstance *watchdog
nextActivityId uint64
)

type activity struct {
id uint64
name string
inactiveElapsed int
}

type watchdog struct {
lg *zap.Logger

ticker *time.Ticker
inactiveTimeoutTick int

mu sync.Mutex
activities map[uint64]*activity

stopC <-chan struct{}
cleanup func()
}

func (wd *watchdog) run() {
inactiveTimeout := time.Duration(wd.inactiveTimeoutTick*tickMs) * time.Millisecond
wd.lg.Info("Watchdog is running", zap.Duration("inactiveTimeout", inactiveTimeout))
for {
select {
case <-wd.ticker.C:
wd.mu.Lock()
for _, v := range wd.activities {
v.inactiveElapsed++
if v.inactiveElapsed > wd.inactiveTimeoutTick/2 {
elapsedTime := time.Duration(v.inactiveElapsed*tickMs) * time.Millisecond
wd.lg.Warn("Slow activity detected", zap.String("activity", v.name), zap.Duration("duration", elapsedTime))
if v.inactiveElapsed > wd.inactiveTimeoutTick {
wd.mu.Unlock()
wd.cleanup()
wd.lg.Panic("Inactive activity detected", zap.String("activity", v.name), zap.Duration("duration", elapsedTime))
}
}
}
wd.mu.Unlock()
case <-wd.stopC:
wd.lg.Info("Watchdog stopped")
return
}
}
}

func (wd *watchdog) register(name string) func() {
wd.mu.Lock()
defer wd.mu.Unlock()

id := nextActivityId
wd.activities[nextActivityId] = &activity{
id: id,
name: name,
inactiveElapsed: 0,
}
nextActivityId++

return func() {
wd.reset(id)
}
}

func (wd *watchdog) reset(id uint64) {
wd.mu.Lock()
defer wd.mu.Unlock()
delete(wd.activities, id)
}

func Start(lg *zap.Logger, stopC <-chan struct{}, cleanup func(), inactiveTimeoutMs int64) {
if wdInstance != nil {
wdInstance.lg.Warn("Watchdog has already been started")
return
}
wdInstance = &watchdog{
lg: lg,
stopC: stopC,
cleanup: cleanup,

inactiveTimeoutTick: int(inactiveTimeoutMs / tickMs),
activities: make(map[uint64]*activity),
ticker: time.NewTicker(tickMs * time.Millisecond),
}
go wdInstance.run()
}

func Register(name string) func() {
if wdInstance == nil {
// watchdog not enabled
return func() {}
}
return wdInstance.register(name)
}

0 comments on commit 79cce3a

Please sign in to comment.