diff --git a/glide-core/src/client/mod.rs b/glide-core/src/client/mod.rs index 04489e317b..6abf7f2ce9 100644 --- a/glide-core/src/client/mod.rs +++ b/glide-core/src/client/mod.rs @@ -189,17 +189,17 @@ impl Client { .boxed() } - pub fn send_pipeline<'a>( + pub fn send_transaction<'a>( &'a mut self, pipeline: &'a redis::Pipeline, - offset: usize, - count: usize, routing: Option, - ) -> redis::RedisFuture<'a, Vec> { + ) -> redis::RedisFuture<'a, Value> { + let command_count = pipeline.cmd_iter().count(); + let offset = command_count + 1; run_with_timeout(self.request_timeout, async move { match self.internal_client { ClientWrapper::Standalone(ref mut client) => { - client.send_pipeline(pipeline, offset, count).await + client.send_pipeline(pipeline, offset, 1).await } ClientWrapper::Cluster { ref mut client } => { @@ -208,17 +208,50 @@ impl Client { _ => SingleNodeRoutingInfo::Random, }; - client.route_pipeline(pipeline, offset, count, route).await + client.route_pipeline(pipeline, offset, 1, route).await } } - .and_then(|values| { - values + .and_then(|mut values| { + assert_eq!(values.len(), 1); + let values = values.pop(); + let values = match values { + Some(Value::Array(values)) => values, + Some(Value::Nil) => { + return Ok(Value::Nil); + } + Some(value) => { + if offset == 2 { + vec![value] + } else { + return Err(( + ErrorKind::ResponseError, + "Received non-array response for transaction", + format!("Response: `{value:?}`"), + ) + .into()); + } + } + _ => { + return Err(( + ErrorKind::ResponseError, + "Received empty response for transaction", + ) + .into()); + } + }; + + let results = values .into_iter() .zip(pipeline.cmd_iter().map(expected_type_for_cmd)) .map(|(value, expected_type)| convert_to_expected_type(value, expected_type)) - .collect::>() - .into_iter() - .collect() + .try_fold( + Vec::with_capacity(command_count), + |mut acc, result| -> RedisResult<_> { + acc.push(result?); + Ok(acc) + }, + )?; + Ok(Value::Array(results)) }) }) .boxed() diff --git a/glide-core/src/socket_listener.rs b/glide-core/src/socket_listener.rs index eebb351abf..35155b3337 100644 --- a/glide-core/src/socket_listener.rs +++ b/glide-core/src/socket_listener.rs @@ -381,16 +381,14 @@ async fn send_transaction( routing: Option, ) -> ClientUsageResult { let mut pipeline = redis::Pipeline::with_capacity(request.commands.capacity()); - let offset = request.commands.len() + 1; pipeline.atomic(); for command in request.commands { pipeline.add_command(get_redis_command(&command)?); } client - .send_pipeline(&pipeline, offset, 1, routing) + .send_transaction(&pipeline, routing) .await - .map(|mut values| values.pop().unwrap_or(Value::Nil)) .map_err(|err| err.into()) } diff --git a/glide-core/tests/test_client.rs b/glide-core/tests/test_client.rs index 3043d1edcd..7adfe8d358 100644 --- a/glide-core/tests/test_client.rs +++ b/glide-core/tests/test_client.rs @@ -253,7 +253,7 @@ mod shared_client_tests { #[rstest] #[timeout(SHORT_CLUSTER_TEST_TIMEOUT)] - fn test_request_pipeline_timeout(#[values(false, true)] use_cluster: bool) { + fn test_request_transaction_timeout(#[values(false, true)] use_cluster: bool) { block_on_all(async { let mut test_basics = setup_test_basics( use_cluster, @@ -267,13 +267,54 @@ mod shared_client_tests { let mut pipeline = redis::pipe(); pipeline.blpop("foo", 0.0); // 0 timeout blocks indefinitely - let result = test_basics - .client - .send_pipeline(&pipeline, 0, 1, None) - .await; + let result = test_basics.client.send_transaction(&pipeline, None).await; assert!(result.is_err()); let err = result.unwrap_err(); assert!(err.is_timeout(), "{err}"); }); } + + #[rstest] + #[timeout(SHORT_CLUSTER_TEST_TIMEOUT)] + fn test_request_transaction_and_convert_all_values(#[values(false, true)] use_cluster: bool) { + block_on_all(async { + let mut test_basics = setup_test_basics( + use_cluster, + TestConfiguration { + shared_server: true, + protocol: glide_core::connection_request::ProtocolVersion::RESP2, + ..Default::default() + }, + ) + .await; + + let key = generate_random_string(10); + let mut pipeline = redis::pipe(); + pipeline.atomic(); + pipeline.hset(&key, "bar", "vaz"); + pipeline.hgetall(&key); + pipeline.hexists(&key, "bar"); + pipeline.del(&key); + pipeline.set(&key, "0"); + pipeline.cmd("INCRBYFLOAT").arg(&key).arg("0.5"); + pipeline.del(&key); + + let result = test_basics.client.send_transaction(&pipeline, None).await; + assert_eq!( + result, + Ok(Value::Array(vec![ + Value::Int(1), + Value::Map(vec![( + Value::BulkString(b"bar".to_vec()), + Value::BulkString(b"vaz".to_vec()), + )]), + Value::Boolean(true), + Value::Int(1), + Value::Okay, + Value::Double(0.5.into()), + Value::Int(1), + ]),) + ); + }); + } } diff --git a/glide-core/tests/utilities/mod.rs b/glide-core/tests/utilities/mod.rs index 0abbe63e22..aec6ed02da 100644 --- a/glide-core/tests/utilities/mod.rs +++ b/glide-core/tests/utilities/mod.rs @@ -2,7 +2,7 @@ use futures::Future; use glide_core::{ client::{Client, StandaloneClient}, - connection_request::{self, AuthenticationInfo, NodeAddress}, + connection_request::{self, AuthenticationInfo, NodeAddress, ProtocolVersion}, }; use once_cell::sync::Lazy; use rand::{distributions::Alphanumeric, Rng}; @@ -656,6 +656,7 @@ pub struct TestConfiguration { pub shared_server: bool, pub read_from: Option, pub database_id: u32, + pub protocol: ProtocolVersion, } pub(crate) async fn setup_test_basics_internal(configuration: &TestConfiguration) -> TestBasics { @@ -680,6 +681,7 @@ pub(crate) async fn setup_test_basics_internal(configuration: &TestConfiguration } let mut connection_request = create_connection_request(&[connection_addr], configuration); connection_request.cluster_mode_enabled = false; + connection_request.protocol = configuration.protocol.into(); let client = StandaloneClient::create_client(connection_request) .await .unwrap(); diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts index bdd4f8fc0e..d647aaf8f2 100644 --- a/node/tests/TestUtilities.ts +++ b/node/tests/TestUtilities.ts @@ -96,7 +96,7 @@ export function transactionTest( { [field]: value }, 1, [null], - 0, + false, 3, field + "3", 2, diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index d54fb0e1ed..d64aefb29c 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -94,7 +94,7 @@ def transaction_test( 3, 2, 0, - "0.5", + 0.5, 1, "PONG", OK, @@ -102,10 +102,10 @@ def transaction_test( 2, value2, 5, - "10.5", + 10.5, 1, [value, None, value2], - [key, value, key2, value2, key3, "10.5"], + {key: value, key2: value2, key3: "10.5"}, 2, None, 4,