Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add counts and add_new_item methods, without tests #4

Merged
merged 47 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
1260925
Implemented no duplicate when adding items and tests, atomic length.
Oxygen588 Jun 30, 2023
f4bd9c2
Remove `tests/go/go` binary
JOT85 Jul 3, 2023
28cd1c9
Merge branch 'JavaScript-Implementation' into Atomic-length-method
JOT85 Jul 3, 2023
b101df6
Update WorkQueue.ts
Oxygen588 Jul 4, 2023
19989b1
Atomic functions fix.
Oxygen588 Jul 4, 2023
50069e9
Update index.ts
Oxygen588 Jul 4, 2023
8abf3ed
Update run-test.sh
Oxygen588 Jul 4, 2023
1aa44b3
Atomic lengths Python Fix.
Oxygen588 Jul 4, 2023
61ac4a2
Rust-lang Atomicity functions fix
Oxygen588 Jul 4, 2023
67b5c70
Update main.rs
Oxygen588 Jul 4, 2023
0255224
Update WorkQueue.go
Oxygen588 Jul 4, 2023
1d8fe42
Update WorkQueue.go
Oxygen588 Jul 4, 2023
dc99a48
Update WorkQueue.go
Oxygen588 Jul 4, 2023
08c47ca
Atomicity fully fixed + tests fixed.
Oxygen588 Jul 4, 2023
5bf622e
Update WorkQueue.go
Oxygen588 Jul 4, 2023
d139f55
Update WorkQueue.ts
Oxygen588 Jul 4, 2023
7c5fc9b
AddAtomicItem Rename + Better Doc Comment
Oxygen588 Jul 5, 2023
e6db6b1
Update WorkQueue.go
Oxygen588 Jul 5, 2023
0dfd47a
AddItemAtomically, Lengths small changes
Oxygen588 Jul 5, 2023
292f428
Update WorkQueue.ts
Oxygen588 Jul 5, 2023
2529278
Fixed add_item_atomically.
Oxygen588 Jul 5, 2023
f07d603
Unexecuted pipe fix Ts, necessary fixes done within addAtomicItem Ts,…
Oxygen588 Jul 5, 2023
d804ee0
Update workqueue.py
Oxygen588 Jul 5, 2023
89e156c
Update WorkQueue.go
Oxygen588 Jul 5, 2023
1cd515e
Update WorkQueue.ts
Oxygen588 Jul 5, 2023
b34629a
Necessary Fixes + Documentation.
Oxygen588 Jul 5, 2023
c3c40f5
Docs for Length function
Oxygen588 Jul 5, 2023
29b4235
Renamed functions from AddAtomicItem to AddNewItem as well as removed…
Oxygen588 Sep 25, 2023
8523175
Delete redis-work-queue.sln
Oxygen588 Sep 25, 2023
61a5052
Necessary fixes.
Oxygen588 Sep 26, 2023
612b776
Update docs, rename `Lengths` -> `Counts` and small tweaks to impleme…
JOT85 Sep 26, 2023
0212ee8
Merge branch 'Atomic-length-method' of github.com:Oxygen588/redis-wor…
JOT85 Sep 26, 2023
14d9392
Fixes.
Oxygen588 Sep 26, 2023
6de4de9
Fixed addNewItem & tests for it.
Oxygen588 Sep 28, 2023
522c5a6
Fixes.
Oxygen588 Sep 29, 2023
ccb6f6e
rust: format and remove unused dependencies and imports
JOT85 Oct 5, 2023
04dbdbd
go: AddNewItem: correct TxPipeline usage, correct error message, form…
JOT85 Oct 5, 2023
a953fce
go: AddNewItem: remove retry limit
JOT85 Oct 5, 2023
9b4dfc5
go: Counts: format
JOT85 Oct 5, 2023
07e11a3
python: format
JOT85 Oct 5, 2023
37478c6
node: AddNewItem: correct retry logic and remove logs
JOT85 Oct 5, 2023
5cf7f41
node: format
JOT85 Oct 5, 2023
6d41f55
rust: rename `get_queue_lengths` -> `counts` and add docs
JOT85 Oct 5, 2023
4f5349d
Small fixes
Oxygen588 Oct 11, 2023
2b81138
Restore tests to upstream
JOT85 Oct 27, 2023
7a7294b
Readd tests/node/package-lock.json
JOT85 Oct 27, 2023
994b09f
go: correctly return boolean indicating if an item was added in AddNe…
JOT85 Oct 27, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 70 additions & 1 deletion go/WorkQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,77 @@ func (workQueue *WorkQueue) AddItem(ctx context.Context, db *redis.Client, item
return err
}

// AddNewItem adds an item to the work queue only if an item with the same ID doesn't already exist.
//
// This method uses WATCH to add the item atomically. The db client passed must not be used by
// anything else while this method is running.
//
// Returns a boolean indicating if the item was added or not. The item is only not added if it
// already exists or if an error (other than a transaction error, which triggers a retry) occurs.
func (workQueue *WorkQueue) AddNewItem(ctx context.Context, db *redis.Client, item Item) (bool, error) {
added := false

txf := func(tx *redis.Tx) error {
// Try to find the current item in the queues:
pipeNew := db.Pipeline()
processingItemsInQueueCmd := pipeNew.LPos(ctx, workQueue.processingKey, item.ID, redis.LPosArgs{
Rank: 0,
MaxLen: 0,
})
workingItemsInQueueCmd := pipeNew.LPos(ctx, workQueue.mainQueueKey, item.ID, redis.LPosArgs{
Rank: 0,
MaxLen: 0,
})
_, err := pipeNew.Exec(ctx)
if err != nil {
return err
}
_, ProcessingQueueCheck := processingItemsInQueueCmd.Result()
_, WorkingQueueCheck := workingItemsInQueueCmd.Result()

if ProcessingQueueCheck == redis.Nil && WorkingQueueCheck == redis.Nil {
// If the item wasn't in either queue, try to add it.
added = true
pipeEx := tx.Pipeline()
workQueue.AddItemToPipeline(ctx, pipeEx, item)
_, err = pipeEx.Exec(ctx)
return err
} else {
// Otherwise, it's in a queue, so we don't need to add it!
added = false
return nil
}
}

for {
err := db.Watch(ctx, txf, workQueue.mainQueueKey, workQueue.processingKey)
// Retry on transaction failure, return on anything else.
if err != redis.TxFailedErr {
return added, err
}
}
}

// Counts returns the queue length, and number of items currently being processed, atomically.
func (workQueue *WorkQueue) Counts(ctx context.Context, db *redis.Client) (queueLen, processingLen int64, err error) {
tx := db.TxPipeline()

queueLenPipe := tx.LLen(ctx, workQueue.mainQueueKey)
processingLenPipe := tx.LLen(ctx, workQueue.processingKey)

_, err = tx.Exec(ctx)
JOT85 marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
queueLen, err = queueLenPipe.Result()
}
if err == nil {
processingLen, err = processingLenPipe.Result()
}

return
}

// Return the length of the work queue (not including items being processed, see
// [WorkQueue.Processing]).
// [WorkQueue.Processing] or [WorkQueue.Counts] to get both).
func (workQueue *WorkQueue) QueueLen(ctx context.Context, db *redis.Client) (int64, error) {
return db.LLen(ctx, workQueue.mainQueueKey).Result()
}
Expand Down
76 changes: 67 additions & 9 deletions node/src/WorkQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ export class WorkQueue {

/**
* Add an item to the work queue. This adds the redis commands onto the pipeline passed.
*
* Use `WorkQueue.addItem` if you don't want to pass a pipeline directly.
* Add the item data.
* @param {Pipeline} pipeline The pipeline that the data will be executed.
* @param {Item} item The Item which will be set in the Redis with the key of this.itemDataKey.of(item.id).
*
* @param {Pipeline} pipeline The pipeline that the commands to add the item will be pushed to.
* @param {Item} item The Item to be added.
*/
addItemToPipeline(pipeline: ChainableCommander, item: Item) {
// NOTE: it's important that the data is added first, otherwise someone before the data is ready.
Expand All @@ -48,11 +49,12 @@ export class WorkQueue {
}

/**
* Add an item to the work queue.
* Add an item to the work queue. See `addNewItem` to avoid adding duplicate items.
*
* This creates a pipeline and executes it on the database.
*
* @param {Redis} db The Redis Connection.
* @param item The item that will be executed using the method addItemToPipeline.
* @param item The item to be added.
*/
async addItem(db: Redis, item: Item): Promise<void> {
const pipeline = db.pipeline();
Expand All @@ -61,7 +63,47 @@ export class WorkQueue {
}

/**
* This is used to get the length of the Main Queue.
* Adds an item to the work queue only if an item with the same ID doesn't already exist.
*
* This method uses WATCH to add the item atomically. The db client passed must not be used by
* anything else while this method is running.
*
* Returns a boolean indicating if the item was added or not. The item is only not added if it
* already exists or if an error (other than a transaction error, which triggers a retry) occurs.
*
* @param {Redis} db The Redis Connection, this must not be used by anything else while this method is running.
* @param item The item that will be added, only if an item doesn't already exist with the same ID.
* @returns {boolean} returns false if already in queue or true if the item is successfully added.
*/
async addNewItem(db: Redis, item: Item): Promise<boolean> {
for (;;) {
try {
await db.watch(this.mainQueueKey, this.processingKey);

const pipeNew = db.pipeline();
pipeNew.lpos(this.processingKey, item.id);
pipeNew.lpos(this.mainQueueKey, item.id);
const pipeResult = await pipeNew.exec();
if (pipeResult && (pipeResult[0][1] !== null || pipeResult[1][1] !== null)) {
// If the item already exists in either mainQueue or processingKey, don't add it.
return false;
}

const pipeEx = db.multi();
this.addItemToPipeline(pipeEx, item);
let results = await pipeEx.exec()
if (results && results[0][1] === "OK") {
return true;
}
} finally {
db.unwatch();
}
}
}

/**
* Return the length of the work queue (not including items being processed, see
* `WorkQueue.processing` or `WorkQueue.counts` to get both).
*
* @param {Redis} db The Redis Connection.
* @returns {Promise<number>} Return the length of the work queue (not including items being processed, see `WorkQueue.processing()`).
Expand All @@ -71,7 +113,7 @@ export class WorkQueue {
}

/**
* This is used to get the lenght of the Processing Queue.
* This is used to get the number of items currently being processed.
*
* @param {Redis} db The Redis Connection.
* @returns {Promise<number>} The number of items being processed.
Expand All @@ -80,6 +122,22 @@ export class WorkQueue {
return db.llen(this.processingKey);
}

/**
* Returns the queue length, and number of items currently being processed, atomically.
*
* @param {Redis} db The Redis Connection.
* @returns {Promise<[number, number]>} Return the length of main queue and processing queue, respectively.
*/
async counts(db: Redis): Promise<[number, number]> {
const multi = db.multi();
multi.llen(this.mainQueueKey);
multi.llen(this.processingKey);
const result = (await multi.exec()) as Array<[Error | null, number]>;
const queueLength = result[0][1];
const processingLength = result[1][1];
return [queueLength, processingLength];
}

/**
* This method can be used to check if a Lease Exists or not for a itemId.
*
Expand Down Expand Up @@ -116,8 +174,8 @@ export class WorkQueue {
async lease(
db: Redis,
leaseSecs: number,
block = true,
timeout = 1
block: boolean = true,
timeout: number = 1
): Promise<Item | null> {
let maybeItemId: string | null = null;

Expand Down
44 changes: 44 additions & 0 deletions python/redis_work_queue/workqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,38 @@ def add_item(self, db: Redis, item: Item) -> None:
pipeline = db.pipeline()
self.add_item_to_pipeline(pipeline, item)
pipeline.execute()

def add_new_item(self, db: Redis, item: Item) -> bool:
"""Adds an item to the work queue only if an item with the same ID doesn't already exist.

This method uses WATCH to add the item atomically. The db client passed must not be used by
anything else while this method is running.

Returns a boolean indicating if the item was added or not. The item is only not added if it
already exists or if an error (other than a transaction error, which triggers a retry) occurs."""
pipeline = db.pipeline(transaction=True)
while True:
try:
pipeline.watch(self._main_queue_key, self._processing_key)
JOT85 marked this conversation as resolved.
Show resolved Hide resolved

pipeline_LPOS = db.pipeline()

pipeline_LPOS.lpos(self._main_queue_key, item.id())
pipeline_LPOS.lpos(self._processing_key, item.id())

LPos_pipeline = pipeline_LPOS.execute()
if LPos_pipeline[0] is not None or LPos_pipeline[1] is not None:
return False

pipeline.multi()
self.add_item_to_pipeline(pipeline, item)
pipeline.execute()
break
except Redis.WatchError:
continue

pipeline.unwatch()
JOT85 marked this conversation as resolved.
Show resolved Hide resolved
return True

def queue_len(self, db: Redis) -> int:
"""Return the length of the work queue (not including items being processed, see
Expand All @@ -47,6 +79,18 @@ def processing(self, db: Redis) -> int:
"""Return the number of items being processed."""
return db.llen(self._processing_key)


def counts(self, db: Redis) -> tuple[int, int]:
"""Returns the queue length, and number of items currently being processed, atomically.

The return value is `(qlen, processing)`."""
pipeline = db.pipeline(transaction=True)
pipeline.llen(self._main_queue_key)
pipeline.llen(self._processing_key)
results = pipeline.execute()
return results[0], results[1]


def light_clean(self, db: Redis) -> None:
processing: list[bytes | str] = db.lrange(
self._processing_key, 0, -1)
Expand Down
18 changes: 17 additions & 1 deletion rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@
use std::future::Future;
use std::time::Duration;

use redis::{AsyncCommands, RedisResult};
use redis::{AsyncCommands, RedisError, RedisResult};
use serde::{Deserialize, Serialize};
use uuid::Uuid;

Expand Down Expand Up @@ -303,6 +303,22 @@ impl WorkQueue {
db.llen(&self.processing_key)
}

pub fn get_queue_lengths<'a, C: AsyncCommands>(
&'a self,
db: &'a mut C,
) -> Box<dyn std::future::Future<Output = Result<(u32, u32), RedisError>> + 'a> {
Box::new(async move {
let result = redis::pipe()
.atomic()
.llen(&self.main_queue_key)
.llen(&self.processing_key)
.query_async(db)
.await?;

Ok(result)
JOT85 marked this conversation as resolved.
Show resolved Hide resolved
})
}

/// Request a work lease the work queue. This should be called by a worker to get work to
/// complete. When completed, the `complete` method should be called.
///
Expand Down
3 changes: 2 additions & 1 deletion tests/go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ go 1.20

require (
github.com/mevitae/redis-work-queue/go v0.1.0
github.com/redis/go-redis/v9 v9.0.2
github.com/redis/go-redis/v9 v9.0.3
)

require (
github.com/bsm/redislock v0.9.3 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/google/uuid v1.3.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions tests/go/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
github.com/bsm/ginkgo/v2 v2.5.0 h1:aOAnND1T40wEdAtkGSkvSICWeQ8L3UASX7YVCqQx+eQ=
github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
github.com/bsm/gomega v1.20.0 h1:JhAwLmtRzXFTx2AkALSLa8ijZafntmhSoU63Ok18Uq8=
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
github.com/bsm/redislock v0.9.3 h1:osmvugkXGiLDEhzUPdM0EUtKpTEgLLuli4Ky2Z4vx38=
github.com/bsm/redislock v0.9.3/go.mod h1:Epf7AJLiSFwLCiZcfi6pWFO/8eAYrYpQXFxEDPoDeAk=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand All @@ -10,5 +14,7 @@ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE=
github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps=
github.com/redis/go-redis/v9 v9.0.3 h1:+7mmR26M0IvyLxGZUHxu4GiBkJkVDid0Un+j4ScYu4k=
github.com/redis/go-redis/v9 v9.0.3/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Loading