Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return existing group on duplicate welcome message #855

Merged
Merged
60 changes: 54 additions & 6 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1415,11 +1415,6 @@ mod tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
#[ignore]
async fn test_can_stream_group_messages_for_updates() {
let _ = env_logger::builder()
.is_test(true)
.filter_level(log::LevelFilter::Info)
.try_init();

let alix = new_test_client().await;
let bo = new_test_client().await;

Expand Down Expand Up @@ -1550,8 +1545,10 @@ mod tests {
.unwrap();
assert_eq!(bo_messages2.len(), second_msg_check);

// TODO: message_callbacks should eventually come through here, why does this
// not work?
// tokio::time::sleep(tokio::time::Duration::from_millis(10000)).await;
// assert_eq!(message_callbacks.message_count(), 5);
// assert_eq!(message_callbacks.message_count(), second_msg_check as u32);

stream_messages.end();
tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
Expand Down Expand Up @@ -1794,4 +1791,55 @@ mod tests {
"The Inviter and added_by_address do not match!"
);
}

// TODO: Test current fails 50% of the time with db locking messages
#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
#[ignore]
async fn test_stream_groups_gets_callback_when_streaming_messages() {
let alix = new_test_client().await;
let bo = new_test_client().await;

// Stream all group messages
let message_callbacks = RustStreamCallback::new();
let group_callbacks = RustStreamCallback::new();
let stream_groups = bo
.conversations()
.stream(Box::new(group_callbacks.clone()))
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
let stream_messages = bo
.conversations()
.stream_all_messages(Box::new(message_callbacks.clone()))
.await
.unwrap();

tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;

// Create group and send first message
let alix_group = alix
.conversations()
.create_group(
vec![bo.account_address.clone()],
FfiCreateGroupOptions::default(),
)
.await
.unwrap();

tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;

alix_group.send("hello1".as_bytes().to_vec()).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;

assert_eq!(group_callbacks.message_count(), 1);
assert_eq!(message_callbacks.message_count(), 1);

stream_messages.end();
tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
assert!(stream_messages.is_closed());

stream_groups.end();
tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
assert!(stream_groups.is_closed());
}
}
10 changes: 10 additions & 0 deletions bindings_node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions xmtp_mls/src/storage/encrypted_store/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,18 @@ impl DbConnection {
Ok(groups.into_iter().next())
}

/// Return a single group that matches the given welcome ID
pub fn find_group_by_welcome_id(
&self,
welcome_id: i64,
) -> Result<Option<StoredGroup>, StorageError> {
let mut query = dsl::groups.order(dsl::created_at_ns.asc()).into_boxed();
query = query.limit(1).filter(dsl::welcome_id.eq(welcome_id));
cameronvoell marked this conversation as resolved.
Show resolved Hide resolved
let groups: Vec<StoredGroup> = self.raw_query(|conn| query.load(conn))?;
// Manually extract the first element
Ok(groups.into_iter().next())
}

/// Updates group membership state
pub fn update_group_membership<GroupId: AsRef<[u8]>>(
&self,
Expand Down
15 changes: 14 additions & 1 deletion xmtp_mls/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,20 @@ where
.await;

if let Some(err) = creation_result.as_ref().err() {
return Err(ClientError::Generic(err.to_string()));
let conn = self.context.store.conn()?;
let result = conn.find_group_by_welcome_id(welcome_v1.id as i64);
match result {
Ok(Some(group)) => {
println!("Found existing group by welcome id");
cameronvoell marked this conversation as resolved.
Show resolved Hide resolved
return Ok(MlsGroup::new(
self.context.clone(),
group.id,
group.created_at_ns,
));
}
Ok(None) => return Err(ClientError::Generic(err.to_string())),
Err(e) => return Err(ClientError::Generic(e.to_string())),
}
}

Ok(creation_result.unwrap())
Expand Down
Loading