From 1a09519d8031670ad4d68ccd99677623419fe379 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Wed, 14 Aug 2024 12:09:32 +0200 Subject: [PATCH] Upgrade redis dependency --- omniqueue/Cargo.toml | 4 ++-- omniqueue/src/backends/redis/mod.rs | 2 +- omniqueue/src/backends/redis/streams.rs | 28 ++++++++++++------------- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/omniqueue/Cargo.toml b/omniqueue/Cargo.toml index 404825b..8662944 100644 --- a/omniqueue/Cargo.toml +++ b/omniqueue/Cargo.toml @@ -16,13 +16,13 @@ aws-sdk-sqs = { version = "1.13.0", optional = true } azure_storage = { version = "0.20.0", optional = true } azure_storage_queues = { version = "0.20.0", optional = true } bb8 = { version = "0.8", optional = true } -bb8-redis = { version = "0.15.0", optional = true } +bb8-redis = { version = "0.16.0", optional = true } bytesize = "1.3.0" futures-util = { version = "0.3.28", default-features = false, features = ["async-await", "std"], optional = true } google-cloud-googleapis = { version = "0.12.0", optional = true } google-cloud-pubsub = { version = "0.24.0", optional = true } lapin = { version = "2", optional = true } -redis = { version = "0.25.3", features = ["tokio-comp", "tokio-native-tls-comp", "streams"], optional = true } +redis = { version = "0.26.1", features = ["tokio-comp", "tokio-native-tls-comp", "streams"], optional = true } serde = "1.0.196" serde_json = "1" svix-ksuid = { version = "0.8.0", optional = true } diff --git a/omniqueue/src/backends/redis/mod.rs b/omniqueue/src/backends/redis/mod.rs index b1c7174..72d866d 100644 --- a/omniqueue/src/backends/redis/mod.rs +++ b/omniqueue/src/backends/redis/mod.rs @@ -104,7 +104,7 @@ async fn check_eviction_policy( let results: Vec = redis::cmd("CONFIG") .arg("GET") .arg("maxmemory-policy") - .query_async::>(&mut *conn) + .query_async(&mut *conn) .await .map_err(|_| EvictionCheckError::CheckEvictionPolicyFailed)?; diff --git a/omniqueue/src/backends/redis/streams.rs b/omniqueue/src/backends/redis/streams.rs index 451808f..c926630 100644 --- a/omniqueue/src/backends/redis/streams.rs +++ b/omniqueue/src/backends/redis/streams.rs @@ -4,7 +4,9 @@ use std::time::Duration; use bb8::ManageConnection; use redis::{ - streams::{StreamClaimReply, StreamId, StreamReadOptions, StreamReadReply}, + streams::{ + StreamAutoClaimOptions, StreamClaimReply, StreamId, StreamReadOptions, StreamReadReply, + }, AsyncCommands as _, FromRedisValue, RedisResult, }; use tracing::{error, trace}; @@ -20,7 +22,7 @@ const LISTEN_STREAM_ID: &str = ">"; /// The maximum number of pending messages to reinsert into the queue after /// becoming stale per loop // FIXME(onelson): expose in config? -const PENDING_BATCH_SIZE: i16 = 1000; +const PENDING_BATCH_SIZE: usize = 1000; pub(super) async fn send_raw( producer: &RedisProducer, @@ -250,17 +252,15 @@ async fn reenqueue_timed_out_messages( // Every iteration checks whether the processing queue has items that should // be picked back up, claiming them in the process - let mut cmd = redis::cmd("XAUTOCLAIM"); - cmd.arg(main_queue_name) - .arg(consumer_group) - .arg(consumer_name) - .arg(ack_deadline_ms) - .arg("-") - .arg("COUNT") - .arg(PENDING_BATCH_SIZE); - - let StreamAutoclaimReply { ids } = cmd - .query_async(&mut *conn) + let StreamAutoclaimReply { ids } = conn + .xautoclaim_options( + main_queue_name, + consumer_group, + consumer_name, + ack_deadline_ms, + "-", + StreamAutoClaimOptions::default().count(PENDING_BATCH_SIZE), + ) .await .map_err(QueueError::generic)?; @@ -276,7 +276,7 @@ async fn reenqueue_timed_out_messages( GENERATE_STREAM_ID, &map.iter() .filter_map(|(k, v)| { - if let redis::Value::Data(data) = v { + if let redis::Value::BulkString(data) = v { Some((k.as_str(), data.as_slice())) } else { None