Skip to content

Commit

Permalink
fix(delayed): avoid using jobId in order to schedule delayed jobs (#2587
Browse files Browse the repository at this point in the history
) (python)
  • Loading branch information
roggervalf authored Jul 11, 2024
1 parent aecac66 commit 228db2c
Show file tree
Hide file tree
Showing 13 changed files with 69 additions and 76 deletions.
2 changes: 1 addition & 1 deletion python/bullmq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async def moveToFailed(self, err, token:str, fetchNext:bool = False):
elif delay:
keys, args = self.scripts.moveToDelayedArgs(
self.id,
round(time.time() * 1000) + delay,
round(time.time() * 1000),
token,
delay
)
Expand Down
7 changes: 1 addition & 6 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,18 +263,13 @@ def retryJobArgs(self, job_id: str, lifo: bool, token: str, opts: dict = {}):
return (keys, args)

def moveToDelayedArgs(self, job_id: str, timestamp: int, token: str, delay: int = 0, opts: dict = {}):
max_timestamp = max(0, timestamp or 0)

if timestamp > 0:
max_timestamp = max_timestamp * 0x1000 + (convert_to_int(job_id) & 0xfff)

keys = self.getKeys(['marker', 'active', 'prioritized', 'delayed'])
keys.append(self.toKey(job_id))
keys.append(self.keys['events'])
keys.append(self.keys['meta'])
keys.append(self.keys['stalled'])

args = [self.keys[''], round(time.time() * 1000), str(max_timestamp),
args = [self.keys[''], str(timestamp),
job_id, token, delay, "1" if opts.get("skipAttempt") else "0"]

return (keys, args)
Expand Down
11 changes: 7 additions & 4 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ export class Job<
} else if (delay) {
const args = this.scripts.moveToDelayedArgs(
this.id,
Date.now() + delay,
Date.now(),
token,
delay,
);
Expand Down Expand Up @@ -1076,14 +1076,17 @@ export class Job<
* @returns
*/
async moveToDelayed(timestamp: number, token?: string): Promise<void> {
const delay = timestamp - Date.now();
const now = Date.now();
const delay = timestamp - now;
const finalDelay = delay > 0 ? delay : 0;
const movedToDelayed = await this.scripts.moveToDelayed(
this.id,
timestamp,
delay > 0 ? delay : 0,
now,
finalDelay,
token,
{ skipAttempt: true },
);
this.delay = finalDelay;

return movedToDelayed;
}
Expand Down
30 changes: 2 additions & 28 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -727,18 +727,7 @@ export class Scripts {
}

private changeDelayArgs(jobId: string, delay: number): (string | number)[] {
//
// Bake in the job id first 12 bits into the timestamp
// to guarantee correct execution order of delayed jobs
// (up to 4096 jobs per given timestamp or 4096 jobs apart per timestamp)
//
// WARNING: Jobs that are so far apart that they wrap around will cause FIFO to fail
//
let timestamp = Date.now() + delay;

if (timestamp > 0) {
timestamp = timestamp * 0x1000 + (+jobId & 0xfff);
}
const timestamp = Date.now();

const keys: (string | number)[] = [
this.queue.keys.delayed,
Expand Down Expand Up @@ -795,27 +784,13 @@ export class Scripts {
]);
}

// Note: We have an issue here with jobs using custom job ids
moveToDelayedArgs(
jobId: string,
timestamp: number,
token: string,
delay: number,
opts: MoveToDelayedOpts = {},
): (string | number)[] {
//
// Bake in the job id first 12 bits into the timestamp
// to guarantee correct execution order of delayed jobs
// (up to 4096 jobs per given timestamp or 4096 jobs apart per timestamp)
//
// WARNING: Jobs that are so far apart that they wrap around will cause FIFO to fail
//
timestamp = Math.max(0, timestamp ?? 0);

if (timestamp > 0) {
timestamp = timestamp * 0x1000 + (+jobId & 0xfff);
}

const queueKeys = this.queue.keys;
const keys: (string | number)[] = [
queueKeys.marker,
Expand All @@ -830,8 +805,7 @@ export class Scripts {

return keys.concat([
this.queue.keys[''],
Date.now(),
JSON.stringify(timestamp),
timestamp,
jobId,
token,
delay,
Expand Down
5 changes: 2 additions & 3 deletions src/commands/addDelayedJob-6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ local parentData

-- Includes
--- @include "includes/addDelayMarkerIfNeeded"
--- @include "includes/getDelayedScore"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/handleDuplicatedJob"
--- @include "includes/isQueuePaused"
Expand Down Expand Up @@ -91,9 +92,7 @@ local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],
opts, timestamp, parentKey, parentData,
repeatJobKey)

-- Compute delayed timestamp and the score.
local delayedTimestamp = (delay > 0 and (timestamp + delay)) or 0
local score = delayedTimestamp * 0x1000 + bit.band(jobCounter, 0xfff)
local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay))

rcall("ZADD", delayedKey, score, jobId)
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed",
Expand Down
10 changes: 6 additions & 4 deletions src/commands/changeDelay-4.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
KEYS[4] events stream
ARGV[1] delay
ARGV[2] delayedTimestamp
ARGV[2] timestamp
ARGV[3] the id of the job
ARGV[4] job key
Expand All @@ -23,21 +23,23 @@ local rcall = redis.call

-- Includes
--- @include "includes/addDelayMarkerIfNeeded"
--- @include "includes/getDelayedScore"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/isQueuePaused"

if rcall("EXISTS", ARGV[4]) == 1 then
local jobId = ARGV[3]
local score = tonumber(ARGV[2])
local delayedTimestamp = (score / 0x1000)

local delay = tonumber(ARGV[1])
local score, delayedTimestamp = getDelayedScore(KEYS[1], ARGV[2], delay)

local numRemovedElements = rcall("ZREM", KEYS[1], jobId)

if numRemovedElements < 1 then
return -3
end

rcall("HSET", ARGV[4], "delay", tonumber(ARGV[1]))
rcall("HSET", ARGV[4], "delay", delay)
rcall("ZADD", KEYS[1], score, jobId)

local maxEvents = getOrSetMaxEvents(KEYS[2])
Expand Down
25 changes: 25 additions & 0 deletions src/commands/includes/getDelayedScore.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
--[[
Bake in the job id first 12 bits into the timestamp
to guarantee correct execution order of delayed jobs
(up to 4096 jobs per given timestamp or 4096 jobs apart per timestamp)
WARNING: Jobs that are so far apart that they wrap around will cause FIFO to fail
]]
local function getDelayedScore(delayedKey, timestamp, delay)
local delayedTimestamp = (delay > 0 and (tonumber(timestamp) + delay)) or tonumber(timestamp)
local minScore = delayedTimestamp * 0x1000
local maxScore = (delayedTimestamp + 1 ) * 0x1000 - 1

local result = rcall("ZREVRANGEBYSCORE", delayedKey, maxScore,
minScore, "WITHSCORES","LIMIT", 0, 1)
if #result then
local currentMaxScore = tonumber(result[2])
if currentMaxScore ~= nil then
if currentMaxScore >= maxScore then
return maxScore, delayedTimestamp
else
return currentMaxScore + 1, delayedTimestamp
end
end
end
return minScore, delayedTimestamp
end
5 changes: 2 additions & 3 deletions src/commands/includes/getNextDelayedTimestamp.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ local function getNextDelayedTimestamp(delayedKey)
local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
if #result then
local nextTimestamp = tonumber(result[2])
if (nextTimestamp ~= nil) then
nextTimestamp = nextTimestamp / 0x1000
if nextTimestamp ~= nil then
return nextTimestamp / 0x1000
end
return nextTimestamp
end
end
2 changes: 1 addition & 1 deletion src/commands/includes/promoteDelayedJobs.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
-- Try to get as much as 1000 jobs at once
local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedKey,
eventStreamKey, prefix, timestamp, priorityCounterKey, isPaused)
local jobs = rcall("ZRANGEBYSCORE", delayedKey, 0, (timestamp + 1) * 0x1000, "LIMIT", 0, 1000)
local jobs = rcall("ZRANGEBYSCORE", delayedKey, 0, (timestamp + 1) * 0x1000 - 1, "LIMIT", 0, 1000)

if (#jobs > 0) then
rcall("ZREM", delayedKey, unpack(jobs))
Expand Down
26 changes: 13 additions & 13 deletions src/commands/moveToDelayed-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
ARGV[1] key prefix
ARGV[2] timestamp
ARGV[3] delayedTimestamp
ARGV[4] the id of the job
ARGV[5] queue token
ARGV[6] delay value
ARGV[7] skip attempt
ARGV[3] the id of the job
ARGV[4] queue token
ARGV[5] delay value
ARGV[6] skip attempt
Output:
0 - OK
Expand All @@ -31,32 +30,33 @@ local rcall = redis.call

-- Includes
--- @include "includes/addDelayMarkerIfNeeded"
--- @include "includes/getDelayedScore"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/isQueuePaused"
--- @include "includes/removeLock"

local jobKey = KEYS[5]
local metaKey = KEYS[7]
local token = ARGV[5]
local token = ARGV[4]
if rcall("EXISTS", jobKey) == 1 then
local errorCode = removeLock(jobKey, KEYS[8], token, ARGV[4])
local errorCode = removeLock(jobKey, KEYS[8], token, ARGV[3])
if errorCode < 0 then
return errorCode
end

local delayedKey = KEYS[4]
local jobId = ARGV[4]
local score = tonumber(ARGV[3])
local delayedTimestamp = (score / 0x1000)
local jobId = ARGV[3]
local delay = tonumber(ARGV[5])
local score, delayedTimestamp = getDelayedScore(delayedKey, ARGV[2], delay)

local numRemovedElements = rcall("LREM", KEYS[2], -1, jobId)
if numRemovedElements < 1 then return -3 end

if ARGV[7] == "0" then
if ARGV[6] == "0" then
rcall("HINCRBY", jobKey, "atm", 1)
end
rcall("HSET", jobKey, "delay", ARGV[6])

rcall("HSET", jobKey, "delay", ARGV[5])

local maxEvents = getOrSetMaxEvents(metaKey)

Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures/fixture_processor_move_to_delayed.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ module.exports = function (job, token) {
if (job.attemptsStarted == 1) {
return delay(250)
.then(() => {
job.moveToDelayed(2500, token);
job.moveToDelayed(Date.now() + 2500, token);
return delay(500);
})
.then(() => {
Expand Down
18 changes: 7 additions & 11 deletions tests/test_delay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,21 +510,17 @@ describe('Delayed jobs', function () {
});

const now = Date.now();
const promises: Promise<Job<any, any, string>>[] = [];
let i = 1;
for (i; i <= numJobs; i++) {
promises.push(
queue.add(
'test',
{ order: i },
{
delay: 1000,
timestamp: now,
},
),
await queue.add(
'test',
{ order: i },
{
delay: 1000,
timestamp: now,
},
);
}
await Promise.all(promises);
await processing;
await worker!.close();
});
Expand Down
2 changes: 1 addition & 1 deletion tests/test_sandboxed_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ function sandboxProcessTests(
const delaying = new Promise<void>((resolve, reject) => {
queueEvents.on('delayed', async ({ delay }) => {
try {
expect(Number(delay)).to.be.greaterThanOrEqual(2500);
expect(Number(delay)).to.be.lessThanOrEqual(Date.now() + 2500);
expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(
1,
);
Expand Down

0 comments on commit 228db2c

Please sign in to comment.