Skip to content

Commit

Permalink
Message Failed State (#902)
Browse files Browse the repository at this point in the history
* update message with failed state if intent failed

* message_id fn in publish_intents
  • Loading branch information
insipx authored Jul 16, 2024
1 parent 53ad89f commit 8fe95b8
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 53 deletions.
69 changes: 19 additions & 50 deletions xmtp_mls/src/groups/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use crate::{
group_intent::{IntentKind, IntentState, NewGroupIntent, StoredGroupIntent, ID},
group_message::{DeliveryStatus, GroupMessageKind, StoredGroupMessage},
refresh_state::EntityKind,
StorageError,
},
utils::{hash::sha256, id::calculate_message_id},
xmtp_openmls_provider::XmtpOpenMlsProvider,
Expand Down Expand Up @@ -160,28 +159,23 @@ impl MlsGroup {
last_err = Some(err);
}

// This will return early if the fetch fails
let intent: Result<Option<StoredGroupIntent>, StorageError> = conn.fetch(&intent_id);
match intent {
match Fetch::<StoredGroupIntent>::fetch(&conn, &intent_id) {
Ok(None) => {
// This is expected. The intent gets deleted on success
return Ok(());
}
Ok(Some(intent)) => {
if intent.state == IntentState::Error {
log::warn!(
"not retrying intent ID {}. since it is in state Error",
intent.id,
);
return Err(last_err.unwrap_or(GroupError::Generic(
"Group intent could not be committed".to_string(),
)));
}
log::warn!(
"retrying intent ID {}. intent currently in state {:?}",
intent.id,
intent.state
);
Ok(Some(StoredGroupIntent {
id,
state: IntentState::Error,
..
})) => {
log::warn!("not retrying intent ID {id}. since it is in state Error",);
return Err(last_err.unwrap_or(GroupError::Generic(
"Group intent could not be committed".to_string(),
)));
}
Ok(Some(StoredGroupIntent { id, state, .. })) => {
log::warn!("retrying intent ID {id}. intent currently in state {state:?}");
}
Err(err) => {
log::error!("database error fetching intent {:?}", err);
Expand Down Expand Up @@ -287,36 +281,9 @@ impl MlsGroup {
}
}
IntentKind::SendMessage => {
let intent_data = SendMessageIntentData::from_bytes(intent.data.as_slice())?;
let group_id = openmls_group.group_id().as_slice();
let decrypted_message_data = intent_data.message.as_slice();

let envelope = PlaintextEnvelope::decode(decrypted_message_data)
.map_err(MessageProcessingError::DecodeError)?;

match envelope.content {
Some(Content::V1(V1 {
idempotency_key,
content,
})) => {
let message_id = calculate_message_id(group_id, &content, &idempotency_key);

conn.set_delivery_status_to_published(&message_id, envelope_timestamp_ns)?;
}
Some(Content::V2(V2 {
idempotency_key: _,
message_type,
})) => {
debug!(
"Send Message History Request with message_type {:#?}",
message_type
);

// return Empty Ok because it is okay to not process this self message
return Ok(());
}
None => return Err(MessageProcessingError::InvalidPayload),
};
if let Some(id) = intent.message_id()? {
conn.set_delivery_status_to_published(&id, envelope_timestamp_ns)?;
}
}
};

Expand Down Expand Up @@ -765,7 +732,9 @@ impl MlsGroup {
if (intent.publish_attempts + 1) as usize >= MAX_INTENT_PUBLISH_ATTEMPTS {
log::error!("intent {} has reached max publish attempts", intent.id);
// TODO: Eventually clean up errored attempts
provider.conn().set_group_intent_error(intent.id)?;
provider
.conn()
.set_group_intent_error_and_fail_msg(&intent)?;
} else {
provider
.conn()
Expand Down
60 changes: 58 additions & 2 deletions xmtp_mls/src/storage/encrypted_store/group_intent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,24 @@ use diesel::{
sql_types::Integer,
sqlite::Sqlite,
};
use prost::Message;

use super::{
db_connection::DbConnection,
group,
schema::{group_intents, group_intents::dsl},
};
use crate::{impl_fetch, impl_store, storage::StorageError, Delete};

use crate::{
groups::{intents::SendMessageIntentData, IntentError},
impl_fetch, impl_store,
storage::StorageError,
utils::id::calculate_message_id,
Delete,
};
use xmtp_proto::xmtp::mls::message_contents::{
plaintext_envelope::{Content, V1},
PlaintextEnvelope,
};
pub type ID = i32;

#[repr(i32)]
Expand Down Expand Up @@ -67,6 +77,41 @@ pub struct StoredGroupIntent {
pub publish_attempts: i32,
}

impl StoredGroupIntent {
/// Calculate the message id for this intent.
///
/// # Note
/// This functions deserializes and decodes a [`PlaintextEnvelope`] from encoded bytes.
/// It would be costly to call this method while pulling extra data from a
/// [`PlaintextEnvelope`] elsewhere. The caller should consider combining implementations.
///
/// # Returns
/// Returns [`Option::None`] if [`StoredGroupIntent`] is not [`IntentKind::SendMessage`] or if
/// an error occurs during decoding of intent data for [`IntentKind::SendMessage`].
pub fn message_id(&self) -> Result<Option<Vec<u8>>, IntentError> {
if self.kind != IntentKind::SendMessage {
return Ok(None);
}

let data = SendMessageIntentData::from_bytes(&self.data)?;
let envelope: PlaintextEnvelope = PlaintextEnvelope::decode(data.message.as_slice())?;

// optimistic message should always have a plaintext envelope
let PlaintextEnvelope {
content:
Some(Content::V1(V1 {
content: message,
idempotency_key: key,
})),
} = envelope
else {
return Ok(None);
};

Ok(Some(calculate_message_id(&self.group_id, &message, &key)))
}
}

impl_fetch!(StoredGroupIntent, group_intents, ID);

impl Delete<StoredGroupIntent> for DbConnection {
Expand Down Expand Up @@ -264,6 +309,17 @@ impl DbConnection {

Ok(())
}

pub fn set_group_intent_error_and_fail_msg(
&self,
intent: &StoredGroupIntent,
) -> Result<(), StorageError> {
self.set_group_intent_error(intent.id)?;
if let Some(id) = intent.message_id()? {
self.set_delivery_status_to_failed(&id)?;
}
Ok(())
}
}

impl ToSql<Integer, Sqlite> for IntentKind
Expand Down
12 changes: 12 additions & 0 deletions xmtp_mls/src/storage/encrypted_store/group_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,18 @@ impl DbConnection {
.execute(conn)
})?)
}

pub fn set_delivery_status_to_failed<MessageId: AsRef<[u8]>>(
&self,
msg_id: &MessageId,
) -> Result<usize, StorageError> {
Ok(self.raw_query(|conn| {
diesel::update(dsl::group_messages)
.filter(dsl::id.eq(msg_id.as_ref()))
.set((dsl::delivery_status.eq(DeliveryStatus::Failed),))
.execute(conn)
})?)
}
}

#[cfg(test)]
Expand Down
4 changes: 3 additions & 1 deletion xmtp_mls/src/storage/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::PoisonError;
use diesel::result::DatabaseErrorKind;
use thiserror::Error;

use crate::{retry::RetryableError, retryable};
use crate::{groups::intents::IntentError, retry::RetryableError, retryable};

use super::sql_key_store;

Expand Down Expand Up @@ -33,6 +33,8 @@ pub enum StorageError {
PoolNeedsConnection,
#[error("Conflict")]
Conflict(String),
#[error(transparent)]
Intent(#[from] IntentError),
}

impl<T> From<PoisonError<T>> for StorageError {
Expand Down

0 comments on commit 8fe95b8

Please sign in to comment.