From 3c6f82c9e3e8a80b300713895885ca9262f84d04 Mon Sep 17 00:00:00 2001 From: Craig Peterson <192540+captncraig@users.noreply.github.com> Date: Fri, 10 May 2024 07:26:45 -0400 Subject: [PATCH] watch remote write logs to mark component unhealthy on failure --- .../prometheus/remotewrite/remote_write.go | 30 ++++--- .../prometheus/remotewrite/status.go | 78 +++++++++++++++++++ 2 files changed, 96 insertions(+), 12 deletions(-) create mode 100644 internal/component/prometheus/remotewrite/status.go diff --git a/internal/component/prometheus/remotewrite/remote_write.go b/internal/component/prometheus/remotewrite/remote_write.go index 8bfc905b90..50f9bd7e15 100644 --- a/internal/component/prometheus/remotewrite/remote_write.go +++ b/internal/component/prometheus/remotewrite/remote_write.go @@ -53,10 +53,11 @@ type Component struct { log log.Logger opts component.Options - walStore *wal.Storage - remoteStore *remote.Storage - storage storage.Storage - exited atomic.Bool + walStore *wal.Storage + remoteStore *remote.Storage + storage storage.Storage + exited atomic.Bool + statusWatcher *statusWatcher mut sync.RWMutex cfg Arguments @@ -80,9 +81,9 @@ func New(o component.Options, c Arguments) (*Component, error) { if err != nil { return nil, err } - remoteLogger := log.With(o.Logger, "subcomponent", "rw") - remoteStore := remote.NewStorage(remoteLogger, o.Registerer, startTime, o.DataPath, remoteFlushDeadline, nil) + statusWatcher := &statusWatcher{logger: remoteLogger} + remoteStore := remote.NewStorage(statusWatcher, o.Registerer, startTime, o.DataPath, remoteFlushDeadline, nil) walStorage.SetNotifier(remoteStore) @@ -93,11 +94,12 @@ func New(o component.Options, c Arguments) (*Component, error) { ls := service.(labelstore.LabelStore) res := &Component{ - log: o.Logger, - opts: o, - walStore: walStorage, - remoteStore: remoteStore, - storage: storage.NewFanout(o.Logger, walStorage, remoteStore), + log: o.Logger, + opts: o, + walStore: walStorage, + remoteStore: remoteStore, + storage: storage.NewFanout(o.Logger, walStorage, remoteStore), + statusWatcher: statusWatcher, } res.receiver = prometheus.NewInterceptor( res.storage, @@ -251,7 +253,7 @@ func (c *Component) truncateFrequency() time.Duration { // Update implements Component. func (c *Component) Update(newConfig component.Arguments) error { cfg := newConfig.(Arguments) - + c.statusWatcher.reset() c.mut.Lock() defer c.mut.Unlock() @@ -275,3 +277,7 @@ func (c *Component) Update(newConfig component.Arguments) error { c.cfg = cfg return nil } + +func (c *Component) CurrentHealth() component.Health { + return c.statusWatcher.CurrentHealth() +} diff --git a/internal/component/prometheus/remotewrite/status.go b/internal/component/prometheus/remotewrite/status.go new file mode 100644 index 0000000000..b6eacfd394 --- /dev/null +++ b/internal/component/prometheus/remotewrite/status.go @@ -0,0 +1,78 @@ +package remotewrite + +import ( + "fmt" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component" +) + +type statusWatcher struct { + logger log.Logger + + lastChange time.Time + lastMessage string + currentHealth component.HealthType + + mut sync.Mutex +} + +// Log intercepts log messages to the remote write to check component status +func (s *statusWatcher) Log(keyvals ...interface{}) error { + + msg, err := "", "" + // look specifically for msg = "non-recoverable error" + for i := 0; i < len(keyvals)-1; i += 2 { + k := fmt.Sprint(keyvals[i]) + v := fmt.Sprint(keyvals[i+1]) + switch k { + case "msg": + msg = v + case "err": + err = v + } + } + if msg == "non-recoverable error" { + s.mut.Lock() + s.lastChange = time.Now() + s.lastMessage = err + s.currentHealth = component.HealthTypeUnhealthy + s.mut.Unlock() + } + // pass through to real logger no matter what + return s.logger.Log(keyvals...) +} + +func (s *statusWatcher) CurrentHealth() component.Health { + // time after wich we assume we are healthy if no more errors have happened + // its hard to get a clear signal things are fully working, so this is an ok substitute. + // TODO: perhaps this could be heuristically inferred from the frequency of failures, or by observing various metrics + // as they pass through the remote write component's append hook + const resetTime = 2 * time.Minute + // time on startup we take to go from unknown to healthy + const initTime = 10 * time.Second + s.mut.Lock() + defer s.mut.Unlock() + if (s.currentHealth == component.HealthTypeUnhealthy && time.Since(s.lastChange) > resetTime) || + (s.currentHealth == component.HealthTypeUnknown && time.Since(s.lastChange) > initTime) { + s.currentHealth = component.HealthTypeHealthy + s.lastChange = time.Now() + s.lastMessage = "Healthy" + } + fmt.Println(s.currentHealth, s.lastChange) + return component.Health{ + Health: s.currentHealth, + Message: s.lastMessage, + UpdateTime: s.lastChange, + } +} + +func (s *statusWatcher) reset() { + s.mut.Lock() + defer s.mut.Unlock() + s.currentHealth = component.HealthTypeUnknown + s.lastChange = time.Now() + s.lastMessage = "reloaded" +}