Skip to content

Commit

Permalink
More aggressive refetching when full (#664)
Browse files Browse the repository at this point in the history
* Immediately fetch another batch when a full set of job is returned

Jobs can often be stuck on queues for a long while when jobs are
sporadically enqueued in large batches. Prior to this change jobs would
be fetched every FetchPollInterval (default 1s) unless new jobs were
enqueued. This change reduces that to FetchCooldown (default 100ms) when
a full set of jobs is returned implying that there may be more in the
queue.

* trigger fetch when worker freed up

Tune the client to be more aggressive about fetching when it either just
fetched a full batch of jobs, or when it skipped its previously
triggered fetch because it was already full. Rather than immediately
calling the fetch limiter as in the previous commit, note this situation
with a boolean so that we can trigger the fetch limiter _after_ a worker
slot has opened up.

This should bring more consistent throughput to poll-only mode and in
cases where there is a backlog of existing jobs but new ones aren't
being actively inserted.

This will result in increased fetch load on many installations, with the
benefit of increased throughput. As before, `FetchCooldown` still limits
how frequently these fetches can occur on each client and can be
increased to reduce the amount of fetch querying.

Finally, don't run the fetch query if there are no worker slots
available.

---------

Co-authored-by: Chris Gaffney <[email protected]>
  • Loading branch information
bgentry and gaffneyc authored Nov 2, 2024
1 parent fb9bad6 commit ce141fa
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 9 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- Tune the client to be more aggressive about fetching when it just fetched a full batch of jobs, or when it skipped its previous triggered fetch because it was already full. This should bring more consistent throughput to poll-only mode and in cases where there is a backlog of existing jobs but new ones aren't being actively inserted. This will result in increased fetch load on many installations, with the benefit of increased throughput. As before, `FetchCooldown` still limits how frequently these fetches can occur on each client and can be increased to reduce the amount of fetch querying. Thanks Chris Gaffney ([@gaffneyc](https://github.com/gaffneyc)) for the idea, initial implementation, and benchmarks. [PR #663](https://github.com/riverqueue/river/pull/663).

### Fixed

- `riverpgxv5` driver: `Hijack()` the underlying listener connection as soon as it is acquired from the `pgxpool.Pool` in order to prevent the pool from automatically closing it after it reaches its max age. A max lifetime makes sense in the context of a pool with many conns, but a long-lived listener does not need a max lifetime as long as it can ensure the conn remains healthy. [PR #661](https://github.com/riverqueue/river/pull/661).
Expand Down
5 changes: 2 additions & 3 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5643,9 +5643,8 @@ func TestInsert(t *testing.T) {
AddWorker(workers, &noOpWorker{})

config := &Config{
FetchCooldown: 2 * time.Millisecond,
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
Workers: workers,
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
Workers: workers,
}

client, err := NewClient(riverpgxv5.New(dbPool), config)
Expand Down
2 changes: 1 addition & 1 deletion example_client_from_context_dbsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (w *ContextClientSQLWorker) Work(ctx context.Context, job *river.Job[Contex

// ExampleClientFromContext_databaseSQL demonstrates how to extract the River
// client from the worker context when using the [database/sql] driver.
// ([github.com/riverqueue/river/riverdriver/riverdatabasesql])
// ([github.com/riverqueue/river/riverdriver/riverdatabasesql]).
func ExampleClientFromContext_databaseSQL() {
ctx := context.Background()

Expand Down
2 changes: 1 addition & 1 deletion example_client_from_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (w *ContextClientWorker) Work(ctx context.Context, job *river.Job[ContextCl

// ExampleClientFromContext_pgx demonstrates how to extract the River client
// from the worker context when using the pgx/v5 driver.
// ([github.com/riverqueue/river/riverdriver/riverpgxv5])
// ([github.com/riverqueue/river/riverdriver/riverpgxv5]).
func ExampleClientFromContext_pgx() {
ctx := context.Background()

Expand Down
8 changes: 4 additions & 4 deletions internal/util/chanutil/debounced_chan.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"time"
)

// DebouncedChan is a function that will only be called once per cooldown
// period, at the leading edge. If it is called again during the cooldown, the
// subsequent calls are delayed until the cooldown period has elapsed and are
// also coalesced into a single call.
// DebouncedChan is a channel that will only fire once per cooldown period, at
// the leading edge. If it is called again during the cooldown, the subsequent
// calls are delayed until the cooldown period has elapsed and are also
// coalesced into a single call.
type DebouncedChan struct {
c chan struct{}
cooldown time.Duration
Expand Down
26 changes: 26 additions & 0 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ type producer struct {
// main goroutine.
cancelCh chan int64

// Set to true when the producer thinks it should trigger another fetch as
// soon as slots are available. This is written and read by the main
// goroutine.
fetchWhenSlotsAreAvailable bool

// Receives completed jobs from workers. Written by completed workers, only
// read from main goroutine.
jobResultCh chan *rivertype.JobRow
Expand Down Expand Up @@ -457,12 +462,28 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit
}
case result := <-p.jobResultCh:
p.removeActiveJob(result.ID)
if p.fetchWhenSlotsAreAvailable {
// If we missed a fetch because all worker slots were full, or if we
// fetched the maximum number of jobs on the last attempt, get a little
// more aggressive triggering the fetch limiter now that we have a slot
// available.
p.fetchWhenSlotsAreAvailable = false
fetchLimiter.Call()
}
}
}
}

func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) {
limit := p.maxJobsToFetch()
if limit <= 0 {
// We have no slots for new jobs, so don't bother fetching. However, since
// we knew it was time to fetch, we keep track of what happened so we can
// trigger another fetch as soon as we have open slots.
p.fetchWhenSlotsAreAvailable = true
return
}

go p.dispatchWork(workCtx, limit, fetchResultCh)

for {
Expand All @@ -472,6 +493,11 @@ func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan pr
p.Logger.ErrorContext(workCtx, p.Name+": Error fetching jobs", slog.String("err", result.err.Error()))
} else if len(result.jobs) > 0 {
p.startNewExecutors(workCtx, result.jobs)

// Fetch returned the maximum number of jobs that were requested,
// implying there may be more in the queue. Trigger another fetch when
// slots are available.
p.fetchWhenSlotsAreAvailable = true
}
return
case result := <-p.jobResultCh:
Expand Down

0 comments on commit ce141fa

Please sign in to comment.