Skip to content

Commit

Permalink
Core: Add support to route by address. (valkey-io#971)
Browse files Browse the repository at this point in the history
* Core: Add support to route by address.

* Update test.
  • Loading branch information
nihohit authored Feb 21, 2024
1 parent 9b11e19 commit c3f55e4
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 1 deletion.
6 changes: 6 additions & 0 deletions glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,17 @@ message SlotKeyRoute {
string slot_key = 2;
}

message ByAddressRoute {
string host = 1;
int32 port = 2;
}

message Routes {
oneof value {
SimpleRoutes simple_routes = 1;
SlotKeyRoute slot_key_route = 2;
SlotIdRoute slot_id_route = 3;
ByAddressRoute by_address_route = 4;
}
}

Expand Down
12 changes: 12 additions & 0 deletions glide-core/src/socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,18 @@ fn get_route(
get_slot_addr(&slot_id_route.slot_type)?,
)),
))),
Value::ByAddressRoute(by_address_route) => match u16::try_from(by_address_route.port) {
Ok(port) => Ok(Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::ByAddress {
host: by_address_route.host.to_string(),
port,
},
))),
Err(err) => {
log_warn("get route", format!("Failed to parse port: {err:?}"));
Ok(None)
}
},
}
}

Expand Down
71 changes: 70 additions & 1 deletion glide-core/tests/test_socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod socket_listener {
use glide_core::response::{response, ConstantResponse, Response};
use glide_core::scripts_container::add_script;
use protobuf::{EnumOrUnknown, Message};
use redis::{Cmd, ConnectionAddr, Value};
use redis::{Cmd, ConnectionAddr, FromRedisValue, Value};
use redis_request::{RedisRequest, RequestType};
use rstest::rstest;
use std::mem::size_of;
Expand Down Expand Up @@ -619,6 +619,75 @@ mod socket_listener {
}
}

#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_socket_pass_manual_route_by_address() {
// We send a request to a random node, get that node's hostname & port, and then
// route the same request to the hostname & port, and verify that we've received the same value.
let mut test_basics = setup_cluster_test_basics(false, true);

const CALLBACK_INDEX: u32 = 100;
let approx_message_length = 4 + APPROX_RESP_HEADER_LEN;
let mut buffer = Vec::with_capacity(approx_message_length);
let mut request = get_command_request(
CALLBACK_INDEX,
vec!["CLUSTER".to_string(), "NODES".to_string()],
RequestType::CustomCommand.into(),
false,
);
let mut routes = redis_request::Routes::default();
routes.set_simple_routes(redis_request::SimpleRoutes::Random);
request.route = Some(routes).into();
write_message(&mut buffer, request.clone());
test_basics.socket.write_all(&buffer).unwrap();

let _size = read_from_socket(&mut buffer, &mut test_basics.socket);
let (message_length, header_bytes) = parse_header(&buffer);
let response = decode_response(&buffer, header_bytes, message_length as usize);

assert_eq!(response.callback_idx, CALLBACK_INDEX);
let Some(response::Value::RespPointer(pointer)) = response.value else {
panic!("Unexpected response {:?}", response.value);
};
let pointer = pointer as *mut Value;
let received_value = unsafe { Box::from_raw(pointer) };
let first_value = String::from_redis_value(&received_value).unwrap();
let (host, port) = first_value
.split('\n')
.find(|line| line.contains("myself"))
.and_then(|line| line.split_once(' '))
.and_then(|(_, second)| second.split_once('@'))
.and_then(|(first, _)| first.split_once(':'))
.and_then(|(host, port)| port.parse::<i32>().map(|port| (host, port)).ok())
.unwrap();

buffer.clear();
let mut routes = redis_request::Routes::default();
let by_address_route = glide_core::redis_request::ByAddressRoute {
host: host.into(),
port,
..Default::default()
};
routes.set_by_address_route(by_address_route);
request.route = Some(routes).into();
write_message(&mut buffer, request);
test_basics.socket.write_all(&buffer).unwrap();

let _size = read_from_socket(&mut buffer, &mut test_basics.socket);
let (message_length, header_bytes) = parse_header(&buffer);
let response = decode_response(&buffer, header_bytes, message_length as usize);

assert_eq!(response.callback_idx, CALLBACK_INDEX);
let Some(response::Value::RespPointer(pointer)) = response.value else {
panic!("Unexpected response {:?}", response.value);
};
let pointer = pointer as *mut Value;
let received_value = unsafe { Box::from_raw(pointer) };
let second_value = String::from_redis_value(&received_value).unwrap();

assert_eq!(first_value, second_value);
}

#[rstest]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_socket_get_returns_null(#[values(false, true)] use_arg_pointer: bool) {
Expand Down

0 comments on commit c3f55e4

Please sign in to comment.