Skip to content

Commit

Permalink
fix(sync): skip already processed messages (#1267)
Browse files Browse the repository at this point in the history
* fix(sync): skip already processed messages
  • Loading branch information
mchenani authored Nov 14, 2024
1 parent 87c9c3f commit 54d5368
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 16 deletions.
4 changes: 2 additions & 2 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,12 +754,12 @@ where
) -> Result<Vec<GroupMessage>, ClientError> {
let id_cursor = conn.get_last_cursor_for_id(group_id, EntityKind::Group)?;

let welcomes = self
let messages = self
.api_client
.query_group_messages(group_id.to_vec(), Some(id_cursor as u64))
.await?;

Ok(welcomes)
Ok(messages)
}

/// Query for welcome messages that have a `sequence_id` > than the highest cursor
Expand Down
54 changes: 40 additions & 14 deletions xmtp_mls/src/groups/mls_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::{
};
use crate::{groups::device_sync::DeviceSyncContent, subscriptions::SyncMessage};
use futures::future::try_join_all;
use openmls::framing::WireFormat;
use openmls::{
credentials::BasicCredential,
extensions::Extensions,
Expand Down Expand Up @@ -690,26 +691,48 @@ where
&self,
envelope: &GroupMessage,
openmls_group: &mut OpenMlsGroup,
conn: &DbConnection,
) -> Result<(), MessageProcessingError> {
let msgv1 = match &envelope.version {
Some(GroupMessageVersion::V1(value)) => value,
_ => return Err(MessageProcessingError::InvalidPayload),
};

self.client
.intents()
.process_for_id(
&msgv1.group_id,
EntityKind::Group,
let mls_message_in = MlsMessageIn::tls_deserialize_exact(&msgv1.data)?;
let message_entity_kind = match mls_message_in.wire_format() {
WireFormat::Welcome => EntityKind::Welcome,
_ => EntityKind::Group,
};

let last_cursor = conn.get_last_cursor_for_id(&self.group_id, message_entity_kind)?;
tracing::info!("### last cursor --> [{:?}]", last_cursor);
let should_skip_message = last_cursor > msgv1.id as i64;
if should_skip_message {
tracing::info!(
inbox_id = "self.inbox_id()",
group_id = hex::encode(&self.group_id),
"Message already processed: skipped msgId:[{}] entity kind:[{:?}] last cursor in db: [{}]",
msgv1.id,
|provider| async move {
self.process_message(openmls_group, &provider, msgv1, true)
.await?;
Ok(())
},
)
.await?;
Ok(())
message_entity_kind,
last_cursor
);
Err(MessageProcessingError::AlreadyProcessed(msgv1.id))
} else {
self.client
.intents()
.process_for_id(
&msgv1.group_id,
EntityKind::Group,
msgv1.id,
|provider| async move {
self.process_message(openmls_group, &provider, msgv1, true)
.await?;
Ok(())
},
)
.await?;
Ok(())
}
}

#[tracing::instrument(level = "trace", skip_all)]
Expand All @@ -724,7 +747,10 @@ where
for message in messages.into_iter() {
let result = retry_async!(
Retry::default(),
(async { self.consume_message(&message, &mut openmls_group).await })
(async {
self.consume_message(&message, &mut openmls_group, provider.conn_ref())
.await
})
);
if let Err(e) = result {
let is_retryable = e.is_retryable();
Expand Down
54 changes: 54 additions & 0 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1526,6 +1526,7 @@ pub(crate) mod tests {
use prost::Message;
use std::sync::Arc;
use xmtp_cryptography::utils::generate_local_wallet;
use xmtp_proto::xmtp::mls::api::v1::group_message::Version;
use xmtp_proto::xmtp::mls::message_contents::EncodedContent;

use crate::{
Expand Down Expand Up @@ -3339,6 +3340,59 @@ pub(crate) mod tests {
}
}

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test(flavor = "multi_thread"))]
async fn skip_already_processed_messages() {
let alix = ClientBuilder::new_test_client(&generate_local_wallet()).await;

let bo_wallet = generate_local_wallet();
let bo_client = ClientBuilder::new_test_client(&bo_wallet).await;

let alix_group = alix
.create_group(None, GroupMetadataOptions::default())
.unwrap();

alix_group
.add_members_by_inbox_id(&[bo_client.inbox_id()])
.await
.unwrap();

let alix_message = vec![1];
alix_group.send_message(&alix_message).await.unwrap();
bo_client
.sync_welcomes(&bo_client.store().conn().unwrap())
.await
.unwrap();
let bo_groups = bo_client.find_groups(GroupQueryArgs::default()).unwrap();
let bo_group = bo_groups.first().unwrap();

let mut bo_messages_from_api = bo_client
.query_group_messages(&bo_group.group_id, &bo_client.store().conn().unwrap())
.await
.unwrap();

// override the messages to contain already processed messaged
for msg in &mut bo_messages_from_api {
if let Some(Version::V1(ref mut v1)) = msg.version {
v1.id = 0;
}
}

let process_result = bo_group
.process_messages(bo_messages_from_api, &bo_client.mls_provider().unwrap())
.await;
if let Some(GroupError::ReceiveErrors(errors)) = process_result.err() {
assert_eq!(errors.len(), 2);
assert!(errors
.first()
.unwrap()
.to_string()
.contains("already processed"));
} else {
panic!("Expected error")
}
}

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
#[cfg_attr(
not(target_arch = "wasm32"),
Expand Down
15 changes: 15 additions & 0 deletions xmtp_mls/src/intents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,20 @@ impl Intents {
process_envelope(provider).await
})
.await
.inspect(|_| {
tracing::info!(
"Transaction completed successfully: process for entity [{:?}] envelope cursor[{}]",
entity_id,
cursor
);
})
.inspect_err(|err| {
tracing::info!(
"Transaction failed: process for entity [{:?}] envelope cursor[{}] error:[{}]",
entity_id,
cursor,
err
);
})
}
}

0 comments on commit 54d5368

Please sign in to comment.