diff --git a/xmtp_mls/src/groups/sync.rs b/xmtp_mls/src/groups/sync.rs index 2ddd2dc75..e4fe80643 100644 --- a/xmtp_mls/src/groups/sync.rs +++ b/xmtp_mls/src/groups/sync.rs @@ -35,7 +35,6 @@ use crate::{ group_intent::{IntentKind, IntentState, NewGroupIntent, StoredGroupIntent, ID}, group_message::{DeliveryStatus, GroupMessageKind, StoredGroupMessage}, refresh_state::EntityKind, - StorageError, }, utils::{hash::sha256, id::calculate_message_id}, xmtp_openmls_provider::XmtpOpenMlsProvider, @@ -160,28 +159,23 @@ impl MlsGroup { last_err = Some(err); } - // This will return early if the fetch fails - let intent: Result, StorageError> = conn.fetch(&intent_id); - match intent { + match Fetch::::fetch(&conn, &intent_id) { Ok(None) => { // This is expected. The intent gets deleted on success return Ok(()); } - Ok(Some(intent)) => { - if intent.state == IntentState::Error { - log::warn!( - "not retrying intent ID {}. since it is in state Error", - intent.id, - ); - return Err(last_err.unwrap_or(GroupError::Generic( - "Group intent could not be committed".to_string(), - ))); - } - log::warn!( - "retrying intent ID {}. intent currently in state {:?}", - intent.id, - intent.state - ); + Ok(Some(StoredGroupIntent { + id, + state: IntentState::Error, + .. + })) => { + log::warn!("not retrying intent ID {id}. since it is in state Error",); + return Err(last_err.unwrap_or(GroupError::Generic( + "Group intent could not be committed".to_string(), + ))); + } + Ok(Some(StoredGroupIntent { id, state, .. })) => { + log::warn!("retrying intent ID {id}. intent currently in state {state:?}"); } Err(err) => { log::error!("database error fetching intent {:?}", err); @@ -287,36 +281,9 @@ impl MlsGroup { } } IntentKind::SendMessage => { - let intent_data = SendMessageIntentData::from_bytes(intent.data.as_slice())?; - let group_id = openmls_group.group_id().as_slice(); - let decrypted_message_data = intent_data.message.as_slice(); - - let envelope = PlaintextEnvelope::decode(decrypted_message_data) - .map_err(MessageProcessingError::DecodeError)?; - - match envelope.content { - Some(Content::V1(V1 { - idempotency_key, - content, - })) => { - let message_id = calculate_message_id(group_id, &content, &idempotency_key); - - conn.set_delivery_status_to_published(&message_id, envelope_timestamp_ns)?; - } - Some(Content::V2(V2 { - idempotency_key: _, - message_type, - })) => { - debug!( - "Send Message History Request with message_type {:#?}", - message_type - ); - - // return Empty Ok because it is okay to not process this self message - return Ok(()); - } - None => return Err(MessageProcessingError::InvalidPayload), - }; + if let Some(id) = intent.message_id()? { + conn.set_delivery_status_to_published(&id, envelope_timestamp_ns)?; + } } }; @@ -765,7 +732,9 @@ impl MlsGroup { if (intent.publish_attempts + 1) as usize >= MAX_INTENT_PUBLISH_ATTEMPTS { log::error!("intent {} has reached max publish attempts", intent.id); // TODO: Eventually clean up errored attempts - provider.conn().set_group_intent_error(intent.id)?; + provider + .conn() + .set_group_intent_error_and_fail_msg(&intent)?; } else { provider .conn() diff --git a/xmtp_mls/src/storage/encrypted_store/group_intent.rs b/xmtp_mls/src/storage/encrypted_store/group_intent.rs index 79e7eabf6..2fdbba9e3 100644 --- a/xmtp_mls/src/storage/encrypted_store/group_intent.rs +++ b/xmtp_mls/src/storage/encrypted_store/group_intent.rs @@ -7,14 +7,24 @@ use diesel::{ sql_types::Integer, sqlite::Sqlite, }; +use prost::Message; use super::{ db_connection::DbConnection, group, schema::{group_intents, group_intents::dsl}, }; -use crate::{impl_fetch, impl_store, storage::StorageError, Delete}; - +use crate::{ + groups::{intents::SendMessageIntentData, IntentError}, + impl_fetch, impl_store, + storage::StorageError, + utils::id::calculate_message_id, + Delete, +}; +use xmtp_proto::xmtp::mls::message_contents::{ + plaintext_envelope::{Content, V1}, + PlaintextEnvelope, +}; pub type ID = i32; #[repr(i32)] @@ -67,6 +77,41 @@ pub struct StoredGroupIntent { pub publish_attempts: i32, } +impl StoredGroupIntent { + /// Calculate the message id for this intent. + /// + /// # Note + /// This functions deserializes and decodes a [`PlaintextEnvelope`] from encoded bytes. + /// It would be costly to call this method while pulling extra data from a + /// [`PlaintextEnvelope`] elsewhere. The caller should consider combining implementations. + /// + /// # Returns + /// Returns [`Option::None`] if [`StoredGroupIntent`] is not [`IntentKind::SendMessage`] or if + /// an error occurs during decoding of intent data for [`IntentKind::SendMessage`]. + pub fn message_id(&self) -> Result>, IntentError> { + if self.kind != IntentKind::SendMessage { + return Ok(None); + } + + let data = SendMessageIntentData::from_bytes(&self.data)?; + let envelope: PlaintextEnvelope = PlaintextEnvelope::decode(data.message.as_slice())?; + + // optimistic message should always have a plaintext envelope + let PlaintextEnvelope { + content: + Some(Content::V1(V1 { + content: message, + idempotency_key: key, + })), + } = envelope + else { + return Ok(None); + }; + + Ok(Some(calculate_message_id(&self.group_id, &message, &key))) + } +} + impl_fetch!(StoredGroupIntent, group_intents, ID); impl Delete for DbConnection { @@ -264,6 +309,17 @@ impl DbConnection { Ok(()) } + + pub fn set_group_intent_error_and_fail_msg( + &self, + intent: &StoredGroupIntent, + ) -> Result<(), StorageError> { + self.set_group_intent_error(intent.id)?; + if let Some(id) = intent.message_id()? { + self.set_delivery_status_to_failed(&id)?; + } + Ok(()) + } } impl ToSql for IntentKind diff --git a/xmtp_mls/src/storage/encrypted_store/group_message.rs b/xmtp_mls/src/storage/encrypted_store/group_message.rs index c3164a6f3..da994db55 100644 --- a/xmtp_mls/src/storage/encrypted_store/group_message.rs +++ b/xmtp_mls/src/storage/encrypted_store/group_message.rs @@ -188,6 +188,18 @@ impl DbConnection { .execute(conn) })?) } + + pub fn set_delivery_status_to_failed>( + &self, + msg_id: &MessageId, + ) -> Result { + Ok(self.raw_query(|conn| { + diesel::update(dsl::group_messages) + .filter(dsl::id.eq(msg_id.as_ref())) + .set((dsl::delivery_status.eq(DeliveryStatus::Failed),)) + .execute(conn) + })?) + } } #[cfg(test)] diff --git a/xmtp_mls/src/storage/errors.rs b/xmtp_mls/src/storage/errors.rs index 68c407eaf..423e0fd8f 100644 --- a/xmtp_mls/src/storage/errors.rs +++ b/xmtp_mls/src/storage/errors.rs @@ -3,7 +3,7 @@ use std::sync::PoisonError; use diesel::result::DatabaseErrorKind; use thiserror::Error; -use crate::{retry::RetryableError, retryable}; +use crate::{groups::intents::IntentError, retry::RetryableError, retryable}; use super::sql_key_store; @@ -33,6 +33,8 @@ pub enum StorageError { PoolNeedsConnection, #[error("Conflict")] Conflict(String), + #[error(transparent)] + Intent(#[from] IntentError), } impl From> for StorageError {