-
Notifications
You must be signed in to change notification settings - Fork 93
Replies: 1 comment · 5 replies
-
I came up with a basic proof of concept that has greatly increased job throughput in local testing even when increasing diff --git a/vendor/github.com/riverqueue/river/producer.go b/vendor/github.com/riverqueue/river/producer.go
index 6d6392ad..66b35b81 100644
--- a/vendor/github.com/riverqueue/river/producer.go
+++ b/vendor/github.com/riverqueue/river/producer.go
@@ -447,7 +447,11 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit
if p.paused {
continue
}
- p.innerFetchLoop(workCtx, fetchResultCh)
+ more := p.innerFetchLoop(workCtx, fetchResultCh)
+ if more {
+ fetchLimiter.Call()
+ }
+
// Ensure we can't start another fetch when fetchCtx is done, even if
// the fetchLimiter is also ready to fire:
select {
@@ -461,7 +465,7 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit
}
}
-func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) {
+func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) bool {
limit := p.maxJobsToFetch()
go p.dispatchWork(workCtx, limit, fetchResultCh)
@@ -473,7 +477,7 @@ func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan pr
} else if len(result.jobs) > 0 {
p.startNewExecutors(workCtx, result.jobs)
}
- return
+ return len(result.jobs) == limit
case result := <-p.jobResultCh:
p.removeActiveJob(result.ID)
case jobID := <-p.cancelCh: |
Beta Was this translation helpful? Give feedback.
All reactions
-
@gaffneyc converting this to a discussion for now, though I think your suggestion can still come out of it as a PR. First, I wanted to confirm that you saw the There are a couple of relevant config knobs here which you've discovered:
These three work together to tune a queue's maximum throughput and its impact on the database. Beyond that, the impact scales with the number of individual nodes you're running with these settings. Simply by turning down the intervals and increasing the number of jobs, or by adding more nodes/clients, you can increase your throughput up to what your database can handle. My initial concern with this suggestion is it causing less predictable and more spiky load impact from River. The reason is that it means you're going to be more frequently fetching smaller batches of jobs. Of course I can see how that would increase utilization and throughput. Thinking further though, I think what you've proposed here is actually a nice improvement and helps address an issue I've noticed before and haven't yet spent the time to address. The behavior and throughput of the system right now is different depending on whether new jobs are actively being inserted vs whether there's merely a large backlog that's being cleared out. That's because we use the This problem is more impactful for clients that don't have Since you already have this change locally, would you want to try running |
Beta Was this translation helpful? Give feedback.
All reactions
-
@bgentry I think I had seen the bench command but hadn't looked into it yet. I did some digging and I'm going to include numbers and thoughts below. I ran all of the tests on my development system (Ryzen 5950x with 32 threads and 64GB of memory) which was mostly idle but not completely. None of the tests seemed to tax the system all that much. Anything labeled before is what is included in commit 56ddf09 and anything labeled after included the change above. The numbers mostly speak for themselves and overall I think this (or something like it) is likely a good change. I haven't tested different worker counts but my gut is telling me River would benefit from fewer large machines with more workers than a larger number of smaller machines to take advantage of batching and reduce queries. Test 1: No ChangesThis test was just Before
After
Test 2: DefaultsI took a look in In this test I reset Before
After
Test 3: Defaults + Polling OnlyI set River 0.13.0 is really limited here because the throughput is always going to be worker size * fetches per second. We see a 9.85x speed up as we're fetching closer to the Before
After
Raw Data Before - No changes
After - No changes
Before - Default Fetch Times
After - Default Fetch Times
Before - Defaults Fetch Times + Poll
After - Default Fetch Times + Poll
|
Beta Was this translation helpful? Give feedback.
All reactions
-
@gaffneyc this is super helpful, thank you! Do you want to prepare a PR with this change? I can help with getting it across the finish line as necessary including with additional test coverage. |
Beta Was this translation helpful? Give feedback.
All reactions
-
Yeah, I’ll get a PR up when I get a chance. We’ve been running it in production for a couple days now and things have been looking good. |
Beta Was this translation helpful? Give feedback.
All reactions
-
🚀 1
-
Fixed in #664 🚀 |
Beta Was this translation helpful? Give feedback.
All reactions
This discussion was converted from issue #651 on October 18, 2024 21:24.
-
We're looking at River as a replacement for an existing Redis based worker system. We often enqueue a large batch of jobs (using
InsertMany
) that we want processed quickly. In testing it looks like River always waitsFetchPollInterval
between fetches even if there are jobs available in the queue.The test we've set up is we have a single worker process with 32 workers pulling from the default queue. Connections are direct to Postgres 17. We are logging all of the jobs that are run and they each take ~25ms. We then batch produce 5k records and it prints batches of completions every second. If we increase the
FetchPollInterval
to 5 seconds the batches start to come in every 5 seconds instead. When we dropFetchPollInterval
to 100ms then everything processes quickly but the batches happen far enough apart (small every minute larger ones every hour) that it using a small interval would cause a lot of unnecessary load on the database.What knobs do we have here other than
FetchPollInterval
? I'm assuming we should (roughly) be matching worker counts to available core.Would it make sense for River to ignore the interval and do an immediate fetch when the previous fetch had a full set of records? That is only apply FetchPollInterval when fewer records were returned than available workers.
Beta Was this translation helpful? Give feedback.
All reactions