Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prometheus.remote_write: mark component unhealthy if sending samples fails #823

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions internal/component/prometheus/remotewrite/remote_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ type Component struct {
log log.Logger
opts component.Options

walStore *wal.Storage
remoteStore *remote.Storage
storage storage.Storage
exited atomic.Bool
walStore *wal.Storage
remoteStore *remote.Storage
storage storage.Storage
exited atomic.Bool
statusWatcher *statusWatcher

mut sync.RWMutex
cfg Arguments
Expand All @@ -80,9 +81,9 @@ func New(o component.Options, c Arguments) (*Component, error) {
if err != nil {
return nil, err
}

remoteLogger := log.With(o.Logger, "subcomponent", "rw")
remoteStore := remote.NewStorage(remoteLogger, o.Registerer, startTime, o.DataPath, remoteFlushDeadline, nil)
statusWatcher := &statusWatcher{logger: remoteLogger}
remoteStore := remote.NewStorage(statusWatcher, o.Registerer, startTime, o.DataPath, remoteFlushDeadline, nil)

walStorage.SetNotifier(remoteStore)

Expand All @@ -93,11 +94,12 @@ func New(o component.Options, c Arguments) (*Component, error) {
ls := service.(labelstore.LabelStore)

res := &Component{
log: o.Logger,
opts: o,
walStore: walStorage,
remoteStore: remoteStore,
storage: storage.NewFanout(o.Logger, walStorage, remoteStore),
log: o.Logger,
opts: o,
walStore: walStorage,
remoteStore: remoteStore,
storage: storage.NewFanout(o.Logger, walStorage, remoteStore),
statusWatcher: statusWatcher,
}
res.receiver = prometheus.NewInterceptor(
res.storage,
Expand Down Expand Up @@ -251,7 +253,7 @@ func (c *Component) truncateFrequency() time.Duration {
// Update implements Component.
func (c *Component) Update(newConfig component.Arguments) error {
cfg := newConfig.(Arguments)

c.statusWatcher.reset()
c.mut.Lock()
defer c.mut.Unlock()

Expand All @@ -275,3 +277,7 @@ func (c *Component) Update(newConfig component.Arguments) error {
c.cfg = cfg
return nil
}

func (c *Component) CurrentHealth() component.Health {
return c.statusWatcher.CurrentHealth()
}
78 changes: 78 additions & 0 deletions internal/component/prometheus/remotewrite/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package remotewrite

import (
"fmt"
"sync"
"time"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/component"
)

type statusWatcher struct {
logger log.Logger

lastChange time.Time
lastMessage string
currentHealth component.HealthType

mut sync.Mutex
}

// Log intercepts log messages to the remote write to check component status
func (s *statusWatcher) Log(keyvals ...interface{}) error {

msg, err := "", ""
// look specifically for msg = "non-recoverable error"
for i := 0; i < len(keyvals)-1; i += 2 {
k := fmt.Sprint(keyvals[i])
v := fmt.Sprint(keyvals[i+1])
switch k {
case "msg":
msg = v
case "err":
err = v
}
}
if msg == "non-recoverable error" {
s.mut.Lock()
s.lastChange = time.Now()
s.lastMessage = err
s.currentHealth = component.HealthTypeUnhealthy
s.mut.Unlock()
}
// pass through to real logger no matter what
return s.logger.Log(keyvals...)
}

func (s *statusWatcher) CurrentHealth() component.Health {
// time after wich we assume we are healthy if no more errors have happened
// its hard to get a clear signal things are fully working, so this is an ok substitute.
// TODO: perhaps this could be heuristically inferred from the frequency of failures, or by observing various metrics
// as they pass through the remote write component's append hook
const resetTime = 2 * time.Minute
// time on startup we take to go from unknown to healthy
const initTime = 10 * time.Second
s.mut.Lock()
defer s.mut.Unlock()
if (s.currentHealth == component.HealthTypeUnhealthy && time.Since(s.lastChange) > resetTime) ||
(s.currentHealth == component.HealthTypeUnknown && time.Since(s.lastChange) > initTime) {
s.currentHealth = component.HealthTypeHealthy
s.lastChange = time.Now()
s.lastMessage = "Healthy"
}
fmt.Println(s.currentHealth, s.lastChange)
return component.Health{
Health: s.currentHealth,
Message: s.lastMessage,
UpdateTime: s.lastChange,
}
}

func (s *statusWatcher) reset() {
s.mut.Lock()
defer s.mut.Unlock()
s.currentHealth = component.HealthTypeUnknown
s.lastChange = time.Now()
s.lastMessage = "reloaded"
}
Loading