diff --git a/go/WorkQueue.go b/go/WorkQueue.go index 365bfb6..4cf8019 100644 --- a/go/WorkQueue.go +++ b/go/WorkQueue.go @@ -120,8 +120,80 @@ func (workQueue *WorkQueue) AddItem(ctx context.Context, db *redis.Client, item return err } +// AddNewItem adds an item to the work queue only if an item with the same ID doesn't already exist. +// +// This method uses WATCH to add the item atomically. +// +// Returns a boolean indicating if the item was added or not. The item is only not added if it +// already exists or if an error (other than a transaction error, which triggers a retry) occurs. +func (workQueue *WorkQueue) AddNewItem( + ctx context.Context, + db *redis.Client, + item Item, +) (bool, error) { + added := false + + txf := func(tx *redis.Tx) error { + processingItemsInQueueCmd := tx.LPos(ctx, workQueue.processingKey, item.ID, redis.LPosArgs{ + Rank: 0, + MaxLen: 0, + }) + workingItemsInQueueCmd := tx.LPos(ctx, workQueue.mainQueueKey, item.ID, redis.LPosArgs{ + Rank: 0, + MaxLen: 0, + }) + + _, ProcessingQueueCheck := processingItemsInQueueCmd.Result() + _, WorkingQueueCheck := workingItemsInQueueCmd.Result() + + if ProcessingQueueCheck == redis.Nil && WorkingQueueCheck == redis.Nil { + _, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { + workQueue.AddItemToPipeline(ctx, pipe, item) + return nil + }) + added = true + return err + } else { + added = false + return nil + } + } + + for { + err := db.Watch(ctx, txf, workQueue.processingKey, workQueue.mainQueueKey) + if err == nil { + return added, nil + } + if err == redis.TxFailedErr { + continue + } + return false, err + } +} + +// Counts returns the queue length, and number of items currently being processed, atomically. +func (workQueue *WorkQueue) Counts( + ctx context.Context, + db *redis.Client, +) (queueLen, processingLen int64, err error) { + tx := db.TxPipeline() + + queueLenPipe := tx.LLen(ctx, workQueue.mainQueueKey) + processingLenPipe := tx.LLen(ctx, workQueue.processingKey) + + _, err = tx.Exec(ctx) + if err == nil { + queueLen, err = queueLenPipe.Result() + } + if err == nil { + processingLen, err = processingLenPipe.Result() + } + + return +} + // Return the length of the work queue (not including items being processed, see -// [WorkQueue.Processing]). +// [WorkQueue.Processing] or [WorkQueue.Counts] to get both). func (workQueue *WorkQueue) QueueLen(ctx context.Context, db *redis.Client) (int64, error) { return db.LLen(ctx, workQueue.mainQueueKey).Result() } diff --git a/node/src/WorkQueue.ts b/node/src/WorkQueue.ts index 5fb59ab..23da9ed 100644 --- a/node/src/WorkQueue.ts +++ b/node/src/WorkQueue.ts @@ -3,12 +3,12 @@ * @description A work queue backed by a redis database. */ -import Redis, {ChainableCommander} from 'ioredis'; -import {v4 as uuidv4} from 'uuid'; -import {Item} from './Item'; -import {KeyPrefix} from './KeyPrefix'; +import Redis, { ChainableCommander } from 'ioredis'; +import { v4 as uuidv4 } from 'uuid'; +import { Item } from './Item'; +import { KeyPrefix } from './KeyPrefix'; -export {KeyPrefix, Item}; +export { KeyPrefix, Item }; /** * A work queue backed by a redis database. @@ -35,10 +35,11 @@ export class WorkQueue { /** * Add an item to the work queue. This adds the redis commands onto the pipeline passed. + * * Use `WorkQueue.addItem` if you don't want to pass a pipeline directly. - * Add the item data. - * @param {Pipeline} pipeline The pipeline that the data will be executed. - * @param {Item} item The Item which will be set in the Redis with the key of this.itemDataKey.of(item.id). + * + * @param {Pipeline} pipeline The pipeline that the commands to add the item will be pushed to. + * @param {Item} item The Item to be added. */ addItemToPipeline(pipeline: ChainableCommander, item: Item) { // NOTE: it's important that the data is added first, otherwise someone before the data is ready. @@ -48,11 +49,12 @@ export class WorkQueue { } /** - * Add an item to the work queue. + * Add an item to the work queue. See `addNewItem` to avoid adding duplicate items. + * * This creates a pipeline and executes it on the database. * * @param {Redis} db The Redis Connection. - * @param item The item that will be executed using the method addItemToPipeline. + * @param item The item to be added. */ async addItem(db: Redis, item: Item): Promise { const pipeline = db.pipeline(); @@ -61,7 +63,47 @@ export class WorkQueue { } /** - * This is used to get the length of the Main Queue. + * Adds an item to the work queue only if an item with the same ID doesn't already exist. + * + * This method uses WATCH to add the item atomically. The db client passed must not be used by + * anything else while this method is running. + * + * Returns a boolean indicating if the item was added or not. The item is only not added if it + * already exists or if an error (other than a transaction error, which triggers a retry) occurs. + * + * @param {Redis} db The Redis Connection, this must not be used by anything else while this method is running. + * @param item The item that will be added, only if an item doesn't already exist with the same ID. + * @returns {boolean} returns true for success (item added to the queue), false if the item is in one of the queues. + */ + async addNewItem(db: Redis, item: Item): Promise { + for (;;) { + try { + await db.watch(this.mainQueueKey, this.processingKey); + + const isItemInProcessingKey = await db.lpos(this.processingKey, item.id); + const isItemInMainQueueKey = await db.lpos(this.mainQueueKey, item.id); + if (isItemInProcessingKey !== null || isItemInMainQueueKey !== null) { + await db.unwatch(); + return false; + } + + const transaction = db.multi(); + this.addItemToPipeline(transaction, item); + const results = await transaction.exec(); + if (!results) { + continue; + } + return true; + } catch (err) { + await db.unwatch(); + throw err; + } + } + } + + /** + * Return the length of the work queue (not including items being processed, see + * `WorkQueue.processing` or `WorkQueue.counts` to get both). * * @param {Redis} db The Redis Connection. * @returns {Promise} Return the length of the work queue (not including items being processed, see `WorkQueue.processing()`). @@ -71,7 +113,7 @@ export class WorkQueue { } /** - * This is used to get the lenght of the Processing Queue. + * This is used to get the number of items currently being processed. * * @param {Redis} db The Redis Connection. * @returns {Promise} The number of items being processed. @@ -80,6 +122,22 @@ export class WorkQueue { return db.llen(this.processingKey); } + /** + * Returns the queue length, and number of items currently being processed, atomically. + * + * @param {Redis} db The Redis Connection. + * @returns {Promise<[number, number]>} Return the length of main queue and processing queue, respectively. + */ + async counts(db: Redis): Promise<[number, number]> { + const multi = db.multi(); + multi.llen(this.mainQueueKey); + multi.llen(this.processingKey); + const result = (await multi.exec()) as Array<[Error | null, number]>; + const queueLength = result[0][1]; + const processingLength = result[1][1]; + return [queueLength, processingLength]; + } + /** * This method can be used to check if a Lease Exists or not for a itemId. * @@ -116,8 +174,8 @@ export class WorkQueue { async lease( db: Redis, leaseSecs: number, - block = true, - timeout = 1 + block: boolean = true, + timeout: number = 1 ): Promise { let maybeItemId: string | null = null; diff --git a/python/redis_work_queue/workqueue.py b/python/redis_work_queue/workqueue.py index 79b605e..21dce89 100644 --- a/python/redis_work_queue/workqueue.py +++ b/python/redis_work_queue/workqueue.py @@ -1,9 +1,8 @@ import uuid -from redis import Redis from redis.client import Pipeline - from redis_work_queue import Item from redis_work_queue import KeyPrefix +from redis import Redis, WatchError class WorkQueue(object): @@ -38,6 +37,33 @@ def add_item(self, db: Redis, item: Item) -> None: self.add_item_to_pipeline(pipeline, item) pipeline.execute() + def add_new_item(self, db: Redis, item: Item) -> bool: + """Adds an item to the work queue only if an item with the same ID doesn't already exist. + + This method uses WATCH to add the item atomically. The db client passed must not be used by + anything else while this method is running. + + Returns a boolean indicating if the item was added or not. The item is only not added if it + already exists or if an error (other than a transaction error, which triggers a retry) occurs.""" + while True: + try: + pipeline = db.pipeline(transaction=True) + pipeline.watch(self._main_queue_key, self._processing_key) + + if ( + pipeline.lpos(self._main_queue_key, item.id()) is not None + or pipeline.lpos(self._processing_key, item.id()) is not None + ): + pipeline.unwatch() + return False + + pipeline.multi() + self.add_item_to_pipeline(pipeline, item) + pipeline.execute() + return True + except WatchError: + continue + def queue_len(self, db: Redis) -> int: """Return the length of the work queue (not including items being processed, see `WorkQueue.processing`).""" @@ -47,6 +73,16 @@ def processing(self, db: Redis) -> int: """Return the number of items being processed.""" return db.llen(self._processing_key) + def counts(self, db: Redis) -> tuple[int, int]: + """Returns the queue length, and number of items currently being processed, atomically. + + The return value is `(qlen, processing)`.""" + pipeline = db.pipeline(transaction=True) + pipeline.llen(self._main_queue_key) + pipeline.llen(self._processing_key) + results = pipeline.execute() + return results[0], results[1] + def light_clean(self, db: Redis) -> None: processing: list[bytes | str] = db.lrange( self._processing_key, 0, -1) diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 08d7723..c39e0f1 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -303,6 +303,23 @@ impl WorkQueue { db.llen(&self.processing_key) } + /// Returns the queue length, and number of items currently being processed, atomically. + /// + /// The output is `(queue_len, processing)`. + pub fn counts<'a, C: AsyncCommands>( + &'a self, + db: &'a mut C, + ) -> impl Future> + 'a { + async { + redis::pipe() + .atomic() + .llen(&self.main_queue_key) + .llen(&self.processing_key) + .query_async(db) + .await + } + } + /// Request a work lease the work queue. This should be called by a worker to get work to /// complete. When completed, the `complete` method should be called. /// diff --git a/tests/dotnet/RedisWorkQueueTests/RedisWorkQueueTests.csproj b/tests/dotnet/RedisWorkQueueTests/RedisWorkQueueTests.csproj index 35f38e4..99fa23c 100644 --- a/tests/dotnet/RedisWorkQueueTests/RedisWorkQueueTests.csproj +++ b/tests/dotnet/RedisWorkQueueTests/RedisWorkQueueTests.csproj @@ -8,7 +8,7 @@ - + diff --git a/tests/job-spawner-and-cleaner.py b/tests/job-spawner-and-cleaner.py index b0c568b..a072393 100644 --- a/tests/job-spawner-and-cleaner.py +++ b/tests/job-spawner-and-cleaner.py @@ -35,8 +35,10 @@ elif counter % 2 == 0: # Every other tick just log how much work is left for queue in queue_list: + print(queue._main_queue_key) print((queue.queue_len(db), queue.processing(db))) + print(shared_queue._main_queue_key) print((shared_queue.queue_len(db), shared_queue.processing(db))) sleep(0.5) elif counter == 501: diff --git a/tests/rust/Cargo.toml b/tests/rust/Cargo.toml index 07c2925..2b3a604 100644 --- a/tests/rust/Cargo.toml +++ b/tests/rust/Cargo.toml @@ -8,6 +8,6 @@ edition = "2021" [dependencies] redis-work-queue = { path = "../../rust" } futures-lite = "1" -redis = "0.22" +redis = "0.23" serde = "1" serde_json = "1" diff --git a/tests/rust/src/main.rs b/tests/rust/src/main.rs index 1472cc1..4950e5b 100644 --- a/tests/rust/src/main.rs +++ b/tests/rust/src/main.rs @@ -49,17 +49,18 @@ async fn async_main() -> RedisResult<()> { shared_job_counter += 1; // First, try to get a job from the shared job queue - let timeout = if shared_job_counter%5 == 0 { + let timeout = if shared_job_counter % 5 == 0 { Some(Duration::from_secs(1)) } else { Some(Duration::ZERO) }; println!("Leasing from shared with timeout: {:?}", timeout); - let Some(job) = shared_queue.lease( - db, - timeout, - Duration::from_secs(2), - ).await? else { continue }; + let Some(job) = shared_queue + .lease(db, timeout, Duration::from_secs(2)) + .await? + else { + continue; + }; // Also, if we get 'unlucky', crash while completing the job. if shared_job_counter % 7 == 0 { println!("Dropping job"); @@ -97,17 +98,18 @@ async fn async_main() -> RedisResult<()> { rust_job_counter += 1; // First, try to get a job from the rust job queue - let timeout = if shared_job_counter%6 == 0 { + let timeout = if shared_job_counter % 6 == 0 { Some(Duration::from_secs(2)) } else { Some(Duration::ZERO) }; println!("Leasing from rust with timeout: {:?}", timeout); - let Some(job) = rust_queue.lease( - db, - timeout, - Duration::from_secs(1), - ).await? else { continue }; + let Some(job) = rust_queue + .lease(db, timeout, Duration::from_secs(1)) + .await? + else { + continue; + }; // Also, if we get 'unlucky', crash while completing the job. if rust_job_counter % 7 == 0 { println!("Dropping job");