Skip to content

Commit

Permalink
Fix creating consumer group against nonexisting stream.
Browse files Browse the repository at this point in the history
  • Loading branch information
brocaar committed Oct 5, 2023
1 parent e4af1ce commit 75c85ce
Showing 1 changed file with 18 additions and 0 deletions.
18 changes: 18 additions & 0 deletions src/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl Integration {
.arg(&key)
.arg(&self.consumer_group)
.arg(0)
.arg("MKSTREAM")
.query_async(&mut redis_conn)
.await
{
Expand Down Expand Up @@ -445,6 +446,7 @@ mod test {
async fn test_integration() {
let redis_url = env::var("TEST_REDIS_URL").unwrap_or("redis://127.0.0.1/1".to_string());

setup_log(&Configuration::default()).unwrap();
register(Box::new(MockIntegration {})).await;

let conf = Configuration {
Expand All @@ -463,6 +465,8 @@ mod test {
let redis_client = redis::Client::open(redis_url).unwrap();
let mut redis_conn = redis_client.get_async_connection().await.unwrap();

println!("Uplink");

// uplink
let pl = integration_pb::UplinkEvent::default();
let _: String = redis::cmd("XADD")
Expand All @@ -489,6 +493,8 @@ mod test {

assert_eq!(pl, pl_recv);

println!("Join");

// join
let pl = integration_pb::JoinEvent::default();
let _: String = redis::cmd("XADD")
Expand All @@ -515,6 +521,8 @@ mod test {

assert_eq!(pl, pl_recv);

println!("Ack");

// ack
let pl = integration_pb::AckEvent::default();
let _: String = redis::cmd("XADD")
Expand All @@ -541,6 +549,8 @@ mod test {

assert_eq!(pl, pl_recv);

println!("TxAck");

// txack
let pl = integration_pb::TxAckEvent::default();
let _: String = redis::cmd("XADD")
Expand All @@ -567,6 +577,8 @@ mod test {

assert_eq!(pl, pl_recv);

println!("Log");

// log
let pl = integration_pb::LogEvent::default();
let _: String = redis::cmd("XADD")
Expand All @@ -593,6 +605,8 @@ mod test {

assert_eq!(pl, pl_recv);

println!("Status");

// status
let pl = integration_pb::StatusEvent::default();
let _: String = redis::cmd("XADD")
Expand All @@ -619,6 +633,8 @@ mod test {

assert_eq!(pl, pl_recv);

println!("Location");

// location
let pl = integration_pb::LocationEvent::default();
let _: String = redis::cmd("XADD")
Expand All @@ -645,6 +661,8 @@ mod test {

assert_eq!(pl, pl_recv);

println!("Integration");

// integration
let pl = integration_pb::IntegrationEvent::default();
let _: String = redis::cmd("XADD")
Expand Down

0 comments on commit 75c85ce

Please sign in to comment.