Skip to content

Commit

Permalink
Avoid starvation of NSQ event processing (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwindower authored Aug 22, 2024
1 parent 6fe8436 commit 51532a0
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 8 deletions.
24 changes: 22 additions & 2 deletions internal/bmc/nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"os"
"time"

"github.com/metal-stack/go-hal"
"github.com/nsqio/go-nsq"
Expand Down Expand Up @@ -46,13 +47,32 @@ func (b *BMCService) InitConsumer() error {
}
config.TlsV1 = true

// Deadlines for network reads and writes
config.ReadTimeout = 10 * time.Second
config.WriteTimeout = 10 * time.Second

// Duration of time between heartbeats. This must be less than ReadTimeout
config.HeartbeatInterval = 5 * time.Second

// Maximum duration when REQueueing (for doubling of deferred requeue)
config.MaxRequeueDelay = 5 * time.Second
config.DefaultRequeueDelay = 3 * time.Second

// Maximum amount of time to backoff when processing fails 0 == no backoff
config.MaxBackoffDuration = 0 * time.Second // no need for backing off, just requeue

// Maximum number of times this consumer will attempt to process a message before giving up
config.MaxAttempts = 2 // we do not try very often, if it doesn't work it's probably for a reason

// Maximum number of messages to allow in flight (concurrency knob)
config.MaxInFlight = 10 // handling 10 machines in parallel should be enough

consumer, err := nsq.NewConsumer(b.machineTopic, mqChannel, config)
if err != nil {
return err
}

consumer.SetLogger(nsqLogger{log: b.log}, nsqMapLevel(b.log))

consumer.AddHandler(b)

err = consumer.ConnectToNSQD(b.mqAddress)
Expand All @@ -70,7 +90,7 @@ func (b *BMCService) HandleMessage(message *nsq.Message) error {
return err
}

b.log.Debug("got message", "topic", b.machineTopic, "channel", mqChannel, "event", event)
b.log.Info("got message from nsq", "topic", b.machineTopic, "event", event, "attempt", message.Attempts)

if event.Cmd.IPMI == nil {
return fmt.Errorf("event does not contain ipmi details:%v", event)
Expand Down
13 changes: 7 additions & 6 deletions internal/bmc/nsq_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@ func (n nsqLogger) Output(calldepth int, s string) error {

func nsqMapLevel(log *slog.Logger) nsq.LogLevel {
ctx := context.Background()
if log.Enabled(ctx, slog.LevelDebug) {
return nsq.LogLevelDebug
}
if log.Enabled(ctx, slog.LevelInfo) {
return nsq.LogLevelInfo
}
if log.Enabled(ctx, slog.LevelError) {
return nsq.LogLevelError
}
if log.Enabled(ctx, slog.LevelWarn) {
return nsq.LogLevelWarning
}
if log.Enabled(ctx, slog.LevelInfo) {
return nsq.LogLevelInfo
}
if log.Enabled(ctx, slog.LevelDebug) {
return nsq.LogLevelDebug
}

return nsq.LogLevelInfo
}

0 comments on commit 51532a0

Please sign in to comment.