-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add counts
and add_new_item
methods, without tests
#4
Changes from 44 commits
1260925
f4bd9c2
28cd1c9
b101df6
19989b1
50069e9
8abf3ed
1aa44b3
61ac4a2
67b5c70
0255224
1d8fe42
dc99a48
08c47ca
5bf622e
d139f55
7c5fc9b
e6db6b1
0dfd47a
292f428
2529278
f07d603
d804ee0
89e156c
1cd515e
b34629a
c3c40f5
29b4235
8523175
61a5052
612b776
0212ee8
14d9392
6de4de9
522c5a6
ccb6f6e
04dbdbd
a953fce
9b4dfc5
07e11a3
37478c6
5cf7f41
6d41f55
4f5349d
2b81138
7a7294b
994b09f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,12 +3,12 @@ | |
* @description A work queue backed by a redis database. | ||
*/ | ||
|
||
import Redis, {ChainableCommander} from 'ioredis'; | ||
import {v4 as uuidv4} from 'uuid'; | ||
import {Item} from './Item'; | ||
import {KeyPrefix} from './KeyPrefix'; | ||
import Redis, { ChainableCommander } from 'ioredis'; | ||
import { v4 as uuidv4 } from 'uuid'; | ||
import { Item } from './Item'; | ||
import { KeyPrefix } from './KeyPrefix'; | ||
|
||
export {KeyPrefix, Item}; | ||
export { KeyPrefix, Item }; | ||
|
||
/** | ||
* A work queue backed by a redis database. | ||
|
@@ -35,10 +35,11 @@ export class WorkQueue { | |
|
||
/** | ||
* Add an item to the work queue. This adds the redis commands onto the pipeline passed. | ||
* | ||
* Use `WorkQueue.addItem` if you don't want to pass a pipeline directly. | ||
* Add the item data. | ||
* @param {Pipeline} pipeline The pipeline that the data will be executed. | ||
* @param {Item} item The Item which will be set in the Redis with the key of this.itemDataKey.of(item.id). | ||
* | ||
* @param {Pipeline} pipeline The pipeline that the commands to add the item will be pushed to. | ||
* @param {Item} item The Item to be added. | ||
*/ | ||
addItemToPipeline(pipeline: ChainableCommander, item: Item) { | ||
// NOTE: it's important that the data is added first, otherwise someone before the data is ready. | ||
|
@@ -48,11 +49,12 @@ export class WorkQueue { | |
} | ||
|
||
/** | ||
* Add an item to the work queue. | ||
* Add an item to the work queue. See `addNewItem` to avoid adding duplicate items. | ||
* | ||
* This creates a pipeline and executes it on the database. | ||
* | ||
* @param {Redis} db The Redis Connection. | ||
* @param item The item that will be executed using the method addItemToPipeline. | ||
* @param item The item to be added. | ||
*/ | ||
async addItem(db: Redis, item: Item): Promise<void> { | ||
const pipeline = db.pipeline(); | ||
|
@@ -61,7 +63,47 @@ export class WorkQueue { | |
} | ||
|
||
/** | ||
* This is used to get the length of the Main Queue. | ||
* Adds an item to the work queue only if an item with the same ID doesn't already exist. | ||
* | ||
* This method uses WATCH to add the item atomically. The db client passed must not be used by | ||
* anything else while this method is running. | ||
* | ||
* Returns a boolean indicating if the item was added or not. The item is only not added if it | ||
* already exists or if an error (other than a transaction error, which triggers a retry) occurs. | ||
* | ||
* @param {Redis} db The Redis Connection, this must not be used by anything else while this method is running. | ||
* @param item The item that will be added, only if an item doesn't already exist with the same ID. | ||
* @returns {boolean} returns true for success (item added to the queue), false if the item is in one of the queues. | ||
*/ | ||
async addNewItem(db: Redis, item: Item): Promise<boolean> { | ||
for (;;) { | ||
try { | ||
await db.watch(this.mainQueueKey, this.processingKey); | ||
|
||
const isItemInProcessingKey = await db.lpos(this.processingKey, item.id); | ||
const isItemInMainQueueKey = await db.lpos(this.mainQueueKey, item.id); | ||
if (isItemInProcessingKey !== null || isItemInMainQueueKey !== null) { | ||
await db.unwatch(); | ||
return false; | ||
} | ||
|
||
const transaction = db.multi(); | ||
this.addItemToPipeline(transaction, item); | ||
const results = await transaction.exec(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Oxygen588 what does the return value look like in the success and failure case? I'm not sure what it would be, so it would be nice to have a comment! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, to clarify what I mean, I don't know what the value of |
||
if (!results) { | ||
continue; | ||
} | ||
return true; | ||
} catch (err) { | ||
await db.unwatch(); | ||
throw err; | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Return the length of the work queue (not including items being processed, see | ||
* `WorkQueue.processing` or `WorkQueue.counts` to get both). | ||
* | ||
* @param {Redis} db The Redis Connection. | ||
* @returns {Promise<number>} Return the length of the work queue (not including items being processed, see `WorkQueue.processing()`). | ||
|
@@ -71,7 +113,7 @@ export class WorkQueue { | |
} | ||
|
||
/** | ||
* This is used to get the lenght of the Processing Queue. | ||
* This is used to get the number of items currently being processed. | ||
* | ||
* @param {Redis} db The Redis Connection. | ||
* @returns {Promise<number>} The number of items being processed. | ||
|
@@ -80,6 +122,22 @@ export class WorkQueue { | |
return db.llen(this.processingKey); | ||
} | ||
|
||
/** | ||
* Returns the queue length, and number of items currently being processed, atomically. | ||
* | ||
* @param {Redis} db The Redis Connection. | ||
* @returns {Promise<[number, number]>} Return the length of main queue and processing queue, respectively. | ||
*/ | ||
async counts(db: Redis): Promise<[number, number]> { | ||
const multi = db.multi(); | ||
multi.llen(this.mainQueueKey); | ||
multi.llen(this.processingKey); | ||
const result = (await multi.exec()) as Array<[Error | null, number]>; | ||
const queueLength = result[0][1]; | ||
const processingLength = result[1][1]; | ||
return [queueLength, processingLength]; | ||
} | ||
|
||
/** | ||
* This method can be used to check if a Lease Exists or not for a itemId. | ||
* | ||
|
@@ -116,8 +174,8 @@ export class WorkQueue { | |
async lease( | ||
db: Redis, | ||
leaseSecs: number, | ||
block = true, | ||
timeout = 1 | ||
block: boolean = true, | ||
timeout: number = 1 | ||
): Promise<Item | null> { | ||
let maybeItemId: string | null = null; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what's going on here? 😂