Skip to content

Commit

Permalink
chore(log): Tweaking logging again, making everything consistent (#101)
Browse files Browse the repository at this point in the history
  • Loading branch information
elliotcourant authored Nov 6, 2023
1 parent e95311e commit e6ece3c
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 46 deletions.
38 changes: 29 additions & 9 deletions backends/memory/memory_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string,
job.Status = internal.JobStatusNew
}

m.logger.Debug("adding a new job", "queue", job.Queue)
m.logger.Debug("adding a new job", slog.String("queue", job.Queue))

if qc, ok = m.queues.Load(job.Queue); !ok {
return jobs.UnqueuedJobID, fmt.Errorf("%w: %s", handler.ErrNoProcessorForQueue, job.Queue)
Expand Down Expand Up @@ -213,12 +213,12 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) {

if ht, ok = m.handlers.Load(queue); !ok {
err = fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue)
m.logger.Error("error loading handler for queue", "queue", queue)
m.logger.Error("error loading handler for queue", slog.String("queue", queue))
return
}

if qc, ok = m.queues.Load(queue); !ok {
m.logger.Error("error loading channel for queue", "queue", queue, "error", handler.ErrNoHandlerForQueue)
m.logger.Error("error loading channel for queue", slog.String("queue", queue), slog.Any("error", handler.ErrNoHandlerForQueue))
return err
}

Expand Down Expand Up @@ -247,7 +247,7 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) {
return
}

m.logger.Error("job failed", "job_id", job.ID, "error", err)
m.logger.Error("job failed", slog.Int64("job_id", job.ID), slog.Any("error", err))

runAfter := internal.CalculateBackoff(job.Retries)
job.RunAfter = runAfter
Expand Down Expand Up @@ -286,11 +286,17 @@ func (m *MemBackend) scheduleFutureJobs(ctx context.Context) {
timeUntilRunAfter := time.Until(job.RunAfter)
if timeUntilRunAfter <= m.config.FutureJobWindow {
m.removeFutureJob(job.ID)
m.logger.Debug("dequeued job", "queue", job.Queue, "retries", job.Retries, "next_run", timeUntilRunAfter, "job_id", job.ID)
m.logger.Debug(
"dequeued job",
slog.String("queue", job.Queue),
slog.Int("retries", job.Retries),
slog.String("next_run", timeUntilRunAfter.String()),
slog.Int64("job_id", job.ID),
)
go func(j *jobs.Job) {
scheduleCh := time.After(timeUntilRunAfter)
<-scheduleCh
m.logger.Debug("loading job for queue", "queue", j.Queue)
m.logger.Debug("loading job for queue", slog.String("queue", j.Queue))
if qc, ok := m.queues.Load(j.Queue); ok {
queueChan = qc.(chan *jobs.Job)
queueChan <- j
Expand All @@ -314,14 +320,23 @@ func (m *MemBackend) scheduleFutureJobs(ctx context.Context) {
func (m *MemBackend) handleJob(ctx context.Context, job *jobs.Job, h handler.Handler) (err error) {
ctx = withJobContext(ctx, job)

m.logger.Debug("handling job", "status", job.Status, "retries", job.Retries, "job_id", job.ID)
m.logger.Debug(
"handling job",
slog.String("status", job.Status),
slog.Int("retries", job.Retries),
slog.Int64("job_id", job.ID),
)

if job.Status != internal.JobStatusNew {
job.Retries++
}

if job.Deadline != nil && job.Deadline.UTC().Before(time.Now().UTC()) {
m.logger.Debug("job deadline is in the past, skipping", "deadline", job.Deadline, "job_id", job.ID)
m.logger.Debug(
"job deadline is in the past, skipping",
slog.Time("deadline", *job.Deadline),
slog.Int64("job_id", job.ID),
)
err = jobs.ErrJobExceededDeadline
return
}
Expand All @@ -337,7 +352,12 @@ func (m *MemBackend) handleJob(ctx context.Context, job *jobs.Job, h handler.Han
}

if job.MaxRetries != nil && job.Retries >= *job.MaxRetries {
m.logger.Debug("job exceeded max retries", "retries", job.Retries, "max_retries", *job.MaxRetries, "job_id", job.ID)
m.logger.Debug(
"job exceeded max retries",
slog.Int("retries", job.Retries),
slog.Int("max_retries", *job.MaxRetries),
slog.Int64("job_id", job.ID),
)
err = jobs.ErrJobExceededMaxRetries
}

Expand Down
96 changes: 67 additions & 29 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func txFromContext(ctx context.Context) (t pgx.Tx, err error) {
func (p *PgBackend) initializeDB() (err error) {
migrations, err := iofs.New(migrationsFS, "migrations")
if err != nil {
p.logger.Error("unable to run migrations", "error", err)
p.logger.Error("unable to run migrations", slog.Any("error", err))
return
}

Expand All @@ -238,7 +238,7 @@ func (p *PgBackend) initializeDB() (err error) {
var pgxCfg *pgx.ConnConfig
pgxCfg, err = pgx.ParseConfig(p.config.ConnectionString)
if err != nil {
p.logger.Error("unable to run migrations", "error", err)
p.logger.Error("unable to run migrations", slog.Any("error", err))
return
}

Expand All @@ -256,15 +256,15 @@ func (p *PgBackend) initializeDB() (err error) {
sslMode)
m, err := migrate.NewWithSourceInstance("iofs", migrations, pqConnectionString)
if err != nil {
p.logger.Error("unable to run migrations", "error", err)
p.logger.Error("unable to run migrations", slog.Any("error", err))
return
}
// We don't need the migration tooling to hold it's connections to the DB once it has been completed.
defer m.Close()

err = m.Up()
if err != nil && !errors.Is(err, migrate.ErrNoChange) {
p.logger.Error("unable to run migrations", "error", err)
p.logger.Error("unable to run migrations", slog.Any("error", err))
return
}

Expand All @@ -278,17 +278,17 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
return
}

p.logger.Debug("enqueueing job payload", "queue", job.Queue, slog.Any("job_payload", job.Payload))
p.logger.Debug("enqueueing job payload", slog.String("queue", job.Queue), slog.Any("job_payload", job.Payload))

p.logger.Debug("acquiring new connection from connection pool", "queue", job.Queue)
p.logger.Debug("acquiring new connection from connection pool", slog.String("queue", job.Queue))
conn, err := p.pool.Acquire(ctx)
if err != nil {
err = fmt.Errorf("error acquiring connection: %w", err)
return
}
defer conn.Release()

p.logger.Debug("beginning new transaction to enqueue job", "queue", job.Queue)
p.logger.Debug("beginning new transaction to enqueue job", slog.String("queue", job.Queue))
tx, err := conn.Begin(ctx)
if err != nil {
err = fmt.Errorf("error creating transaction: %w", err)
Expand All @@ -307,7 +307,7 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
return
}
}
p.logger.Error("error enqueueing job", "queue", job.Queue, "error", err)
p.logger.Error("error enqueueing job", slog.String("queue", job.Queue), slog.Any("error", err))
err = fmt.Errorf("error enqueuing job: %w", err)
}

Expand All @@ -316,14 +316,19 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
err = fmt.Errorf("error committing transaction: %w", err)
return
}
p.logger.Debug("job added to queue:", "queue", job.Queue, "job_id", jobID)
p.logger.Debug("job added to queue:", slog.String("queue", job.Queue), slog.String("job_id", jobID))

// add future jobs to the future job list
if job.RunAfter.After(time.Now().UTC()) {
p.mu.Lock()
p.futureJobs[jobID] = job
p.mu.Unlock()
p.logger.Debug("added job to future jobs list", "queue", job.Queue, "job_id", jobID, "run_after", job.RunAfter)
p.logger.Debug(
"added job to future jobs list",
slog.String("queue", job.Queue),
slog.String("job_id", jobID),
slog.Time("run_after", job.RunAfter),
)
}

return jobID, nil
Expand All @@ -333,15 +338,15 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error) {
ctx, cancel := context.WithCancel(ctx)

p.logger.Debug("starting job processing", "queue", h.Queue)
p.logger.Debug("starting job processing", slog.String("queue", h.Queue))
p.mu.Lock()
p.cancelFuncs = append(p.cancelFuncs, cancel)
p.handlers[h.Queue] = h
p.mu.Unlock()

err = p.start(ctx, h)
if err != nil {
p.logger.Error("unable to start processing queue", "queue", h.Queue, "error", err)
p.logger.Error("unable to start processing queue", slog.String("queue", h.Queue), slog.Any("error", err))
return
}
return
Expand All @@ -353,13 +358,23 @@ func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error) {
func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Handler) (err error) {
cd, err := crondescriptor.NewCronDescriptor(cronSpec)
if err != nil {
p.logger.Error("error creating cron descriptor", "queue", h.Queue, "cronspec", cronSpec, "error", err)
p.logger.Error(
"error creating cron descriptor",
slog.String("queue", h.Queue),
slog.String("cronspec", cronSpec),
slog.Any("error", err),
)
return fmt.Errorf("error creating cron descriptor: %w", err)
}

cdStr, err := cd.GetDescription(crondescriptor.Full)
if err != nil {
p.logger.Error("error getting cron descriptor", "queue", h.Queue, "descriptor", crondescriptor.Full, "error", err)
p.logger.Error(
"error getting cron descriptor",
slog.String("queue", h.Queue),
slog.Any("descriptor", crondescriptor.Full),
slog.Any("error", err),
)
return fmt.Errorf("error getting cron description: %w", err)
}

Expand All @@ -382,7 +397,7 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha
return
}

p.logger.Error("error queueing cron job", "queue", h.Queue, "error", err)
p.logger.Error("error queueing cron job", slog.String("queue", h.Queue), slog.Any("error", err))
}
}); err != nil {
return fmt.Errorf("error adding cron: %w", err)
Expand All @@ -398,7 +413,7 @@ func (p *PgBackend) SetLogger(logger logging.Logger) {

// Shutdown shuts this backend down
func (p *PgBackend) Shutdown(ctx context.Context) {
p.logger.Debug("starting shutdown.")
p.logger.Debug("starting shutdown")
for queue := range p.handlers {
p.announceJob(ctx, queue, shutdownJobID)
}
Expand Down Expand Up @@ -427,7 +442,7 @@ func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job) (job
return
}

p.logger.Debug("adding job to the queue", "queue", j.Queue)
p.logger.Debug("adding job to the queue", slog.String("queue", j.Queue))
err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline, max_retries)
VALUES ($1, $2, $3, $4, $5, $6) RETURNING id`,
j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID)
Expand Down Expand Up @@ -474,7 +489,7 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {
errMsg := ""

if jobErr != nil {
p.logger.Error("job failed", "job_error", jobErr)
p.logger.Error("job failed", slog.Any("job_error", jobErr))
status = internal.JobStatusFailed
errMsg = jobErr.Error()
}
Expand Down Expand Up @@ -558,7 +573,12 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) {
continue
}

p.logger.Error("job failed", "queue", h.Queue, "error", err, "job_id", jobID)
p.logger.Error(
"job failed",
slog.String("queue", h.Queue),
slog.Any("error", err),
slog.String("job_id", jobID),
)

continue
}
Expand All @@ -574,7 +594,7 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) {
func (p *PgBackend) initFutureJobs(ctx context.Context, queue string) (err error) {
rows, err := p.pool.Query(ctx, FutureJobQuery, queue)
if err != nil {
p.logger.Error("failed to fetch future jobs list", "queue", queue, "error", err)
p.logger.Error("failed to fetch future jobs list", slog.String("queue", queue), slog.Any("error", err))
return
}

Expand Down Expand Up @@ -666,7 +686,11 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan

conn, err := p.pool.Acquire(ctx)
if err != nil {
p.logger.Error("failed to acquire database connection to listen for pending queue items", "queue", queue, "error", err)
p.logger.Error(
"failed to acquire database connection to listen for pending queue items",
slog.String("queue", queue),
slog.Any("error", err),
)
return
}

Expand All @@ -680,7 +704,12 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan
break
}

p.logger.Error("failed to fetch pending job", "queue", queue, "error", err, "job_id", jobID)
p.logger.Error(
"failed to fetch pending job",
slog.String("queue", queue),
slog.Any("error", err),
slog.String("job_id", jobID),
)
} else {
jobsCh <- jobID
}
Expand Down Expand Up @@ -716,7 +745,7 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl

if job.Deadline != nil && job.Deadline.Before(time.Now().UTC()) {
err = jobs.ErrJobExceededDeadline
p.logger.Debug("job deadline is in the past, skipping", "queue", h.Queue, "job_id", job.ID)
p.logger.Debug("job deadline is in the past, skipping", slog.String("queue", h.Queue), slog.Int64("job_id", job.ID))
err = p.updateJob(ctx, err)
return
}
Expand Down Expand Up @@ -744,7 +773,7 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl
err = tx.Commit(ctx)
if err != nil {
errMsg := "unable to commit job transaction. retrying this job may dupliate work:"
p.logger.Error(errMsg, "queue", h.Queue, "error", err, "job_id", job.ID)
p.logger.Error(errMsg, slog.String("queue", h.Queue), slog.Any("error", err), slog.Int64("job_id", job.ID))
return fmt.Errorf("%s %w", errMsg, err)
}

Expand All @@ -762,7 +791,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re
go func(ctx context.Context) {
conn, err := p.pool.Acquire(ctx)
if err != nil {
p.logger.Error("unable to acquire new listener connection", "queue", queue, "error", err)
p.logger.Error("unable to acquire new listener connection", slog.String("queue", queue), slog.Any("error", err))
return
}
defer p.release(ctx, conn, queue)
Expand All @@ -771,7 +800,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re
_, err = conn.Exec(ctx, fmt.Sprintf(`SET idle_in_transaction_session_timeout = '0'; LISTEN %q`, queue))
if err != nil {
err = fmt.Errorf("unable to configure listener connection: %w", err)
p.logger.Error("unable to configure listener connection", "queue", queue, "error", err)
p.logger.Error("unable to configure listener connection", slog.String("queue", queue), slog.Any("error", err))
return
}

Expand All @@ -780,13 +809,18 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re

for {
notification, waitErr := conn.Conn().WaitForNotification(ctx)
p.logger.Debug("job notification for queue", "queue", queue, "notification", notification, "err", err)
p.logger.Debug(
"job notification for queue",
slog.String("queue", queue),
slog.Any("notification", notification),
slog.Any("err", err),
)
if waitErr != nil {
if errors.Is(waitErr, context.Canceled) {
return
}

p.logger.Error("failed to wait for notification", "queue", queue, "error", waitErr)
p.logger.Error("failed to wait for notification", slog.String("queue", queue), slog.Any("error", waitErr))
continue
}

Expand All @@ -810,7 +844,11 @@ func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue strin
return
}

p.logger.Error("unable to reset connection config before release", "queue", queue, "error", err)
p.logger.Error(
"unable to reset connection config before release",
slog.String("queue", queue),
slog.Any("error", err),
)
}

conn.Release()
Expand Down
Loading

0 comments on commit e6ece3c

Please sign in to comment.