Skip to content

Commit

Permalink
Update welcomes to use concurrency logic - test not passing yet
Browse files Browse the repository at this point in the history
  • Loading branch information
richardhuaaa committed Nov 14, 2023
1 parent 36e7c53 commit 1911291
Showing 1 changed file with 34 additions and 29 deletions.
63 changes: 34 additions & 29 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ pub enum ClientError {
Serialization(#[from] TlsSerializationError),
#[error("key package verification: {0}")]
KeyPackageVerification(#[from] KeyPackageVerificationError),
#[error("message processing: {0}")]
MessageProcessing(#[from] MessageProcessingError),
#[error("syncing errors: {0:?}")]
SyncingError(Vec<MessageProcessingError>),
#[error("generic:{0}")]
Generic(String),
}
Expand Down Expand Up @@ -216,17 +216,17 @@ where
Ok(envelopes)
}

pub(crate) fn process_for_topic<F>(
pub(crate) fn process_for_topic<ProcessingFn, ReturnValue>(
&self,
topic: &str,
envelope_timestamp_ns: u64,
process_envelope: F,
) -> Result<(), MessageProcessingError>
process_envelope: ProcessingFn,
) -> Result<ReturnValue, MessageProcessingError>
where
F: FnOnce(&mut DbConnection) -> Result<(), MessageProcessingError>,
ProcessingFn: FnOnce(&mut DbConnection) -> Result<ReturnValue, MessageProcessingError>,
{
self.store.conn()?.transaction(
|transaction_manager| -> Result<(), MessageProcessingError> {
|transaction_manager| -> Result<ReturnValue, MessageProcessingError> {
let is_updated = self.store.update_last_synced_timestamp_for_topic(
transaction_manager,
topic,
Expand All @@ -239,8 +239,7 @@ where
}
process_envelope(transaction_manager)
},
)?;
Ok(())
)
}

// Get a flat list of one key package per installation for all the wallet addresses provided.
Expand Down Expand Up @@ -282,30 +281,36 @@ where
let welcome_topic = get_welcome_topic(&self.installation_public_key());
let envelopes = self.pull_from_topic(&welcome_topic).await?;

let mut conn = self.store.conn()?;
let provider = self.mls_provider();
let groups: Vec<MlsGroup<ApiClient>> = envelopes
.into_iter()
.filter_map(|envelope| {
// TODO: Wrap in a transaction
let welcome = match extract_welcome(&envelope.message) {
Ok(welcome) => welcome,
Err(err) => {
log::error!("failed to extract welcome: {}", err);
return None;
}
};

// TODO: Update last_message_timestamp_ns on success or non-retryable error
// TODO: Abort if error is retryable
match MlsGroup::create_from_welcome(self, &mut conn, &provider, welcome) {
Ok(mls_group) => Some(mls_group),
Err(err) => {
log::error!("failed to create group from welcome: {}", err);
None
}
}
.map(|envelope: Envelope| -> Result<Option<MlsGroup<ApiClient>>, MessageProcessingError> {
self.process_for_topic(
&welcome_topic,
envelope.timestamp_ns,
|transaction_manager| {
let welcome = match extract_welcome(&envelope.message) {
Ok(welcome) => welcome,
Err(err) => {
log::error!("failed to extract welcome: {}", err);
return Ok(None);
}
};

// TODO: Abort if error is retryable
match MlsGroup::create_from_welcome(self, transaction_manager, &provider, welcome)
{
Ok(mls_group) => Ok(Some(mls_group)),
Err(err) => {
log::error!("failed to create group from welcome: {}", err);
Ok(None)
}
}
},
)
})
.filter_map(|result| result.ok())
.filter_map(|option| option)
.collect();

Ok(groups)
Expand Down

0 comments on commit 1911291

Please sign in to comment.