Skip to content

Commit

Permalink
bug(pg): Adding test to demonstrate bug with uppercase queue names
Browse files Browse the repository at this point in the history
When the queue name is uppercase, the listener will never receive the
notification when the job is enqueued. This is done specifically with
the enqueuer and consumer being separate neoq instances (server A kicks
off a job, server B is listening to perform the work).
  • Loading branch information
elliotcourant authored and acaloiaro committed Oct 15, 2023
1 parent 84caae3 commit 3a10351
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 0 deletions.
1 change: 1 addition & 0 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re

for {
notification, waitErr := conn.Conn().WaitForNotification(ctx)
p.logger.Debug("job notification for queue", "queue", queue, "notification", notification)
if waitErr != nil {
if errors.Is(waitErr, context.Canceled) {
return
Expand Down
91 changes: 91 additions & 0 deletions backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,3 +575,94 @@ func Test_MoveJobsToDeadQueue(t *testing.T) {
t.Error("should be dead")
}
}

func TestJobEnqueuedSeparately(t *testing.T) {
connString, conn := prepareAndCleanupDB(t)
const queue = "SyncThing"
maxRetries := 5
done := make(chan bool)
defer close(done)

timeoutTimer := time.After(30 * time.Second)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
enqueuer, err := neoq.New(ctx,
neoq.WithBackend(postgres.Backend),
postgres.WithConnectionString(connString),
postgres.WithSynchronousCommit(false),
neoq.WithLogLevel(logging.LogLevelDebug),
)
if err != nil {
t.Fatal(err)
}
defer enqueuer.Shutdown(ctx)

consumer, err := neoq.New(ctx,
neoq.WithBackend(postgres.Backend),
postgres.WithConnectionString(connString),
postgres.WithSynchronousCommit(false),
neoq.WithLogLevel(logging.LogLevelDebug),
)
if err != nil {
t.Fatal(err)
}
defer consumer.Shutdown(ctx)
h := handler.New(queue, func(_ context.Context) (err error) {
done <- true
return
})

go func() {
err = consumer.Start(ctx, h)
if err != nil {
t.Error(err)
}
}()

// Wait a bit more before enqueueing
time.Sleep(10 * time.Second)
deadline := time.Now().UTC().Add(5 * time.Second)
jid, e := enqueuer.Enqueue(ctx, &jobs.Job{
Queue: queue,
Payload: map[string]interface{}{
"message": "hello world",
},
Deadline: &deadline,
MaxRetries: &maxRetries,
})
if e != nil || jid == jobs.DuplicateJobID {
t.Error(e)
}

select {
case <-timeoutTimer:
err = jobs.ErrJobTimeout
case <-done:
}
if err != nil {
t.Error(err)
}

// ensure job has fields set correctly
var jdl time.Time
var jmxrt int

err = conn.
QueryRow(context.Background(), "SELECT deadline,max_retries FROM neoq_jobs WHERE id = $1", jid).
Scan(&jdl, &jmxrt)
if err != nil {
t.Error(err)
}

jdl = jdl.In(time.UTC)
// dates from postgres come out with only 6 decimal places of millisecond precision, naively format dates as
// strings for comparison reasons. Ref https://www.postgresql.org/docs/current/datatype-datetime.html
if jdl.Format(time.RFC3339) != deadline.Format(time.RFC3339) {
t.Error(fmt.Errorf("job deadline does not match its expected value: %v != %v", jdl, deadline)) // nolint: goerr113
}

if jmxrt != maxRetries {
t.Error(fmt.Errorf("job MaxRetries does not match its expected value: %v != %v", jmxrt, maxRetries)) // nolint: goerr113
}
}

0 comments on commit 3a10351

Please sign in to comment.