Skip to content

Commit

Permalink
Fixed addNewItem & tests for it.
Browse files Browse the repository at this point in the history
Re-written the way tests are done for `addNewItem` now 3 functions that spawn 10 `addNewItem` calls for the item we are trying to add and runned concurrently.

TODO: Currently because of all added logic the difference in speed in between go and node & python, the difference in speed is much higher because of that the difference in number of shared-jobs done is much higher.
  • Loading branch information
Oxygen588 committed Sep 28, 2023
1 parent 14d9392 commit 6de4de9
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 378 deletions.
81 changes: 61 additions & 20 deletions go/WorkQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ package workqueue

import (
"context"
"errors"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -128,47 +129,87 @@ func (workQueue *WorkQueue) AddItem(ctx context.Context, db *redis.Client, item
// Returns a boolean indicating if the item was added or not. The item is only not added if it
// already exists or if an error (other than a transaction error, which triggers a retry) occurs.
func (workQueue *WorkQueue) AddNewItem(ctx context.Context, db *redis.Client, item Item) (bool, error) {
added := false

txf := func(tx *redis.Tx) error {
// Try to find the current item in the queues:
pipeNew := db.Pipeline()
processingItemsInQueueCmd := pipeNew.LPos(ctx, workQueue.processingKey, item.ID, redis.LPosArgs{

processingItemsInQueueCmd := tx.LPos(ctx, workQueue.processingKey, item.ID, redis.LPosArgs{
Rank: 0,
MaxLen: 0,
})
workingItemsInQueueCmd := pipeNew.LPos(ctx, workQueue.mainQueueKey, item.ID, redis.LPosArgs{
workingItemsInQueueCmd := tx.LPos(ctx, workQueue.mainQueueKey, item.ID, redis.LPosArgs{
Rank: 0,
MaxLen: 0,
})
_, err := pipeNew.Exec(ctx)
if err != nil {

_, ProcessingQueueCheck := processingItemsInQueueCmd.Result()
_, WorkingQueueCheck := workingItemsInQueueCmd.Result()

if ProcessingQueueCheck == redis.Nil && WorkingQueueCheck == redis.Nil {

_, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
workQueue.AddItemToPipeline(ctx, pipe, item)
_, err := pipe.Exec(ctx)
return err
})
return err
} else {
return nil
}
}

for i := 0; i < 100; i++ {
err := db.Watch(ctx, txf, workQueue.processingKey, workQueue.mainQueueKey)
if err == nil {
return true, nil
}
if err == redis.TxFailedErr {
continue
}
return false, err
}

return false, errors.New("increment reached maximum number of retries")
}

func (workQueue *WorkQueue) AddNewItemWithSleep(ctx context.Context, db *redis.Client, item Item) (bool, error) {

This comment has been minimized.

Copy link
@JOT85

JOT85 Sep 28, 2023

Member

Could you please move this to the tests directory? You'll need to pass the keys as arguments, but it shouldn't be in the main library!

txf := func(tx *redis.Tx) error {

processingItemsInQueueCmd := tx.LPos(ctx, workQueue.processingKey, item.ID, redis.LPosArgs{
Rank: 0,
MaxLen: 0,
})
workingItemsInQueueCmd := tx.LPos(ctx, workQueue.mainQueueKey, item.ID, redis.LPosArgs{
Rank: 0,
MaxLen: 0,
})

_, ProcessingQueueCheck := processingItemsInQueueCmd.Result()
_, WorkingQueueCheck := workingItemsInQueueCmd.Result()

if ProcessingQueueCheck == redis.Nil && WorkingQueueCheck == redis.Nil {
// If the item wasn't in either queue, try to add it.
added = true
pipeEx := tx.Pipeline()
workQueue.AddItemToPipeline(ctx, pipeEx, item)
_, err = pipeEx.Exec(ctx)
time.Sleep(100 * time.Microsecond)

This comment has been minimized.

Copy link
@JOT85

JOT85 Sep 28, 2023

Member

For the tests, this definitely isn't long enough to garentee a collision!

_, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
workQueue.AddItemToPipeline(ctx, pipe, item)
_, err := pipe.Exec(ctx)
return err
})
return err
} else {
// Otherwise, it's in a queue, so we don't need to add it!
added = false
return nil
}
}

for {
err := db.Watch(ctx, txf, workQueue.mainQueueKey, workQueue.processingKey)
// Retry on transaction failure, return on anything else.
if err != redis.TxFailedErr {
return added, err
for i := 0; i < 100; i++ {
err := db.Watch(ctx, txf, workQueue.processingKey, workQueue.mainQueueKey)
if err == nil {
return true, nil
}
if err == redis.TxFailedErr {
continue
}
return false, err
}

return false, errors.New("increment reached maximum number of retries")
}

// Counts returns the queue length, and number of items currently being processed, atomically.
Expand Down
107 changes: 80 additions & 27 deletions node/src/WorkQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
* @description A work queue backed by a redis database.
*/

import Redis, {ChainableCommander} from 'ioredis';
import {v4 as uuidv4} from 'uuid';
import {Item} from './Item';
import {KeyPrefix} from './KeyPrefix';
import Redis, { ChainableCommander } from 'ioredis';
import { v4 as uuidv4 } from 'uuid';
import { Item } from './Item';
import { KeyPrefix } from './KeyPrefix';

export {KeyPrefix, Item};
export { KeyPrefix, Item };

/**
* A work queue backed by a redis database.
Expand Down Expand Up @@ -76,31 +76,84 @@ export class WorkQueue {
* @returns {boolean} returns false if already in queue or true if the item is successfully added.
*/
async addNewItem(db: Redis, item: Item): Promise<boolean> {
for (;;) {
try {
await db.watch(this.mainQueueKey, this.processingKey);

const pipeNew = db.pipeline();
pipeNew.lpos(this.processingKey, item.id);
pipeNew.lpos(this.mainQueueKey, item.id);
const pipeResult = await pipeNew.exec();
if (pipeResult && (pipeResult[0][1] !== null || pipeResult[1][1] !== null)) {
// If the item already exists in either mainQueue or processingKey, don't add it.
return false;
}
while (true) {
try {
await db.watch(this.mainQueueKey, this.processingKey);

const pipeEx = db.multi();
this.addItemToPipeline(pipeEx, item);
let results = await pipeEx.exec()
if (results && results[0][1] === "OK") {
return true;
}
} finally {
db.unwatch();
const isItemInProcessingKey = await db.lpos(this.processingKey, item.id);
const isItemInMainQueueKey = await db.lpos(this.mainQueueKey, item.id);
if (isItemInProcessingKey !== null || isItemInMainQueueKey !== null) {
console.log("Item already exists, not added", item.id);
await db.unwatch();
return false;
}
const transaction = db.multi();
this.addItemToPipeline(transaction, item);
const results = await transaction.exec();

if (!results) {
console.log("Transaction failed, item not added", item.id);
await db.unwatch();
}
console.log("Item added successfully", item.id);
return true
} catch (e) {
console.log("Error", e);
} finally {
await db.unwatch();
}
}
return false;
}

/**
* This is a dummy function used for testing!
*
* Adds an item to the work queue only if an item with the same ID doesn't already exist with a 50 ms sleep.
*
* This method uses WATCH to add the item atomically. The db client passed must not be used by
* anything else while this method is running.
*
* Returns a boolean indicating if the item was added or not. The item is only not added if it
* already exists or if an error (other than a transaction error, which triggers a retry) occurs.
*
* @param {Redis} db The Redis Connection, this must not be used by anything else while this method is running.
* @param item The item that will be added, only if an item doesn't already exist with the same ID.
* @returns {boolean} returns false if already in queue or true if the item is successfully added.
*/
async addNewItemWithSeep(db: Redis, item: Item): Promise<boolean> {
while (true) {
try {
await db.watch(this.mainQueueKey, this.processingKey);

const isItemInProcessingKey = await db.lpos(this.processingKey, item.id);
const isItemInMainQueueKey = await db.lpos(this.mainQueueKey, item.id);
if (isItemInProcessingKey !== null || isItemInMainQueueKey !== null) {
console.log("Item already exists, not added", item.id);
await db.unwatch();
return false;
}
await new Promise<void>((resolve) => setTimeout(resolve, 50));;
const transaction = db.multi();
this.addItemToPipeline(transaction, item);
const results = await transaction.exec();

if (!results) {
console.log("Transaction failed, item not added", item.id);
await db.unwatch();
}
console.log("Item added successfully", item.id);
return true
} catch (e) {
console.log("Error", e);
} finally {
await db.unwatch();
}
}
return false;
}


/**
* Return the length of the work queue (not including items being processed, see
* `WorkQueue.processing` or `WorkQueue.counts` to get both).
Expand Down Expand Up @@ -189,7 +242,7 @@ export class WorkQueue {
} else {
maybeItemId = await db.rpoplpush(this.mainQueueKey, this.processingKey);
}

console.log(`Leased ${maybeItemId}`)
if (maybeItemId == null) {
return null;
}
Expand Down Expand Up @@ -265,7 +318,7 @@ export class WorkQueue {
if (removed === 0) {
return false;
}

console.log(`Completed ${item.id}`);
const pipeline = db.pipeline();
pipeline.del(this.itemDataKey.of(item.id));
pipeline.del(this.leaseKey.of(item.id));
Expand Down
34 changes: 18 additions & 16 deletions python/redis_work_queue/workqueue.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import uuid
from redis import Redis
import redis
from redis.client import Pipeline

from redis_work_queue import Item
from redis_work_queue import KeyPrefix

Redis = redis.Redis

class WorkQueue(object):
"""A work queue backed by a redis database"""
Expand Down Expand Up @@ -46,29 +45,32 @@ def add_new_item(self, db: Redis, item: Item) -> bool:
Returns a boolean indicating if the item was added or not. The item is only not added if it
already exists or if an error (other than a transaction error, which triggers a retry) occurs."""
pipeline = db.pipeline(transaction=True)

while True:
try:
pipeline = db.pipeline(transaction=True)
pipeline.watch(self._main_queue_key, self._processing_key)

pipeline_LPOS = db.pipeline()

pipeline_LPOS.lpos(self._main_queue_key, item.id())
pipeline_LPOS.lpos(self._processing_key, item.id())

LPos_pipeline = pipeline_LPOS.execute()
if LPos_pipeline[0] is not None or LPos_pipeline[1] is not None:
return False
if (
pipeline.lpos(self._main_queue_key, item.id()) is not None
or pipeline.lpos(self._processing_key, item.id()) is not None
):
pipeline.unwatch()
return False

pipeline.multi()

self.add_item_to_pipeline(pipeline, item)

pipeline.execute()
break
except Redis.WatchError:
return True

except redis.WatchError:
continue
except Exception as e:
print("Error:", e)
raise

pipeline.unwatch()
return True

def queue_len(self, db: Redis) -> int:
"""Return the length of the work queue (not including items being processed, see
Expand Down
Loading

0 comments on commit 6de4de9

Please sign in to comment.