diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index d5fe38f1a..5fa54071c 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -325,7 +325,10 @@ func run() int { cancelHactx() case <-hactx.Done(): - // Nothing to do here, surrounding loop will terminate now. + if ctx.Err() != nil { + logger.Fatalf("%+v", errors.New("main context closed unexpectedly")) + } + // Otherwise, there is nothing to do here, surrounding loop will terminate now. case <-ha.Done(): if err := ha.Err(); err != nil { logger.Fatalf("%+v", errors.Wrap(err, "HA exited with an error")) @@ -337,8 +340,6 @@ func run() int { cancelHactx() return ExitFailure - case <-ctx.Done(): - logger.Fatalf("%+v", errors.New("main context closed unexpectedly")) case s := <-sig: logger.Infow("Exiting due to signal", zap.String("signal", s.String())) cancelHactx() diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 6460ac32d..dcc7d6623 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -170,7 +170,7 @@ func (h *HA) controller() { } tt := t.Time() if tt.After(now.Add(1 * time.Second)) { - h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt)) + h.logger.Warnw("Received heartbeat from the future", zap.Time("time", tt)) } if tt.Before(now.Add(-1 * peerTimeout)) { h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt)) @@ -218,7 +218,7 @@ func (h *HA) controller() { // Ensure that updating/inserting the instance row is completed by the current heartbeat's expiry time. realizeCtx, cancelRealizeCtx := context.WithDeadline(h.ctx, m.ExpiryTime()) - err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents) + err = h.realize(realizeCtx, s, envId, shouldLogRoutineEvents) cancelRealizeCtx() if errors.Is(err, context.DeadlineExceeded) { h.signalHandover("instance update/insert deadline exceeded heartbeat expiry time") @@ -264,11 +264,16 @@ func (h *HA) controller() { // realize a HA cycle triggered by a heartbeat event. // +// The context passed is expected to have a deadline, otherwise the method will panic. This deadline is strictly +// enforced to abort the realization logic the moment the context expires. +// // shouldLogRoutineEvents indicates if recurrent events should be logged. +// +// The internal, retryable function always fetches the last received heartbeat's timestamp instead of reusing the one +// from the calling controller loop. Doing so results in inserting a more accurate timestamp if a retry happens. func (h *HA) realize( ctx context.Context, s *icingaredisv1.IcingaStatus, - t *types.UnixMilli, envId types.Binary, shouldLogRoutineEvents bool, ) error { @@ -300,6 +305,7 @@ func (h *HA) realize( if errBegin != nil { return errors.Wrap(errBegin, "can't start transaction") } + defer func() { _ = tx.Rollback() }() query := h.db.Rebind("SELECT id, heartbeat FROM icingadb_instance "+ "WHERE environment_id = ? AND responsible = ? AND id <> ?") + selectLock @@ -350,7 +356,7 @@ func (h *HA) realize( EnvironmentMeta: v1.EnvironmentMeta{ EnvironmentId: envId, }, - Heartbeat: *t, + Heartbeat: types.UnixMilli(time.UnixMilli(h.heartbeat.LastMessage())), Responsible: types.Bool{Bool: takeover != "" || h.responsible, Valid: true}, EndpointId: s.EndpointId, Icinga2Version: s.Version, @@ -370,15 +376,40 @@ func (h *HA) realize( if takeover != "" { stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND id <> ?") - _, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId) + if _, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId); err != nil { + return database.CantPerformQuery(err, stmt) + } - if err != nil { + // Insert the environment after each heartbeat takeover if it does not already exist in the database + // as the environment may have changed, although this is likely to happen very rarely. + stmt, _ = h.db.BuildInsertIgnoreStmt(h.environment) + if _, err := h.db.NamedExecContext(ctx, stmt, h.environment); err != nil { return database.CantPerformQuery(err, stmt) } } - if err := tx.Commit(); err != nil { - return errors.Wrap(err, "can't commit transaction") + // In general, cancellation does not work for commit and rollback. Some database drivers may support a + // context-based abort, but only if the DBMS allows it. This was also discussed in the initial PR adding + // context support to sql: https://github.com/golang/go/issues/15123#issuecomment-245882486 + // + // The following is implementation knowledge, not covered by the API specification. Go's sql.Tx.Commit() - + // which is not being overridden by sqlx.Tx - performs a preflight check on the context before handing over + // to the driver's Commit method. The drivers may behave differently. For example, the used + // github.com/go-sql-driver/mysql package calls its internal exec method with a "COMMIT" query, writing and + // reading packets without honoring the context. + // + // In a nutshell, one cannot expect a Commit() call to be covered by the transaction context. Thus, an + // additional check wraps the call, ensuring that this function will be left when the context is done. + commitErrCh := make(chan error, 1) + go func() { commitErrCh <- tx.Commit() }() + + select { + case err := <-commitErrCh: + if err != nil { + return errors.Wrap(err, "can't commit transaction") + } + case <-ctx.Done(): + return ctx.Err() } return nil @@ -386,7 +417,7 @@ func (h *HA) realize( retry.Retryable, backoff.NewExponentialWithJitter(256*time.Millisecond, 3*time.Second), retry.Settings{ - // Intentionally no timeout is set, as we use a context with a deadline. + // Intentionally, no timeout is set because a context with a deadline is used and QuickContextExit is set. OnRetryableError: func(_ time.Duration, attempt uint64, err, lastErr error) { if lastErr == nil || err.Error() != lastErr.Error() { log := h.logger.Debugw @@ -420,12 +451,6 @@ func (h *HA) realize( } if takeover != "" { - // Insert the environment after each heartbeat takeover if it does not already exist in the database - // as the environment may have changed, although this is likely to happen very rarely. - if err := h.insertEnvironment(); err != nil { - return errors.Wrap(err, "can't insert environment") - } - h.signalTakeover(takeover) } else if otherResponsible { if state, _ := h.state.Load(); !state.otherResponsible { @@ -445,18 +470,6 @@ func (h *HA) realizeLostHeartbeat() { } } -// insertEnvironment inserts the environment from the specified state into the database if it does not already exist. -func (h *HA) insertEnvironment() error { - // Instead of checking whether the environment already exists, use an INSERT statement that does nothing if it does. - stmt, _ := h.db.BuildInsertIgnoreStmt(h.environment) - - if _, err := h.db.NamedExecContext(h.ctx, stmt, h.environment); err != nil { - return database.CantPerformQuery(err, stmt) - } - - return nil -} - func (h *HA) removeInstance(ctx context.Context) { h.logger.Debugw("Removing our row from icingadb_instance", zap.String("instance_id", hex.EncodeToString(h.instanceId))) // Intentionally not using h.ctx here as it's already cancelled. diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index 840445a23..bcda4d47d 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -25,6 +25,7 @@ type Heartbeat struct { active bool events chan *HeartbeatMessage lastReceivedMs int64 + lastMessageMs int64 cancelCtx context.CancelFunc client *redis.Client done chan struct{} @@ -62,6 +63,11 @@ func (h *Heartbeat) LastReceived() int64 { return atomic.LoadInt64(&h.lastReceivedMs) } +// LastMessage returns the last message's time in ms. +func (h *Heartbeat) LastMessage() int64 { + return atomic.LoadInt64(&h.lastMessageMs) +} + // Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any. // Implements the io.Closer interface. func (h *Heartbeat) Close() error { @@ -139,6 +145,15 @@ func (h *Heartbeat) controller(ctx context.Context) { } atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli()) + + statsT, err := m.stats.Time() + if err != nil { + h.logger.Warnw("Received Icinga heartbeat with invalid stats time", zap.Error(err)) + atomic.StoreInt64(&h.lastMessageMs, 0) + } else { + atomic.StoreInt64(&h.lastMessageMs, statsT.Time().UnixMilli()) + } + h.sendEvent(m) case <-time.After(Timeout): if h.active { @@ -150,6 +165,7 @@ func (h *Heartbeat) controller(ctx context.Context) { } atomic.StoreInt64(&h.lastReceivedMs, 0) + atomic.StoreInt64(&h.lastMessageMs, 0) case <-ctx.Done(): return ctx.Err() }