From feb48641c4ee9cad4af6353298078bf78abee0cf Mon Sep 17 00:00:00 2001 From: Nicholas Molnar <65710+neekolas@users.noreply.github.com> Date: Wed, 26 Jun 2024 15:31:21 -0700 Subject: [PATCH] Even more locking fixes (#872) * Initial commit * Simplify retry strategy * Remove clone --- Cargo.lock | 1 - Cargo.toml | 2 +- bindings_ffi/Cargo.lock | 1 - bindings_ffi/src/mls.rs | 4 +--- bindings_node/Cargo.lock | 1 - xmtp_mls/src/client.rs | 10 +++++--- xmtp_mls/src/groups/mod.rs | 4 ++++ xmtp_mls/src/groups/subscriptions.rs | 1 - xmtp_mls/src/hpke.rs | 29 +++++++++++++---------- xmtp_mls/src/identity.rs | 19 ++++++++++++--- xmtp_mls/src/retry.rs | 35 +++++++++++++++++++++++----- xmtp_mls/src/storage/errors.rs | 27 +++++++++++++++++++++ 12 files changed, 102 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cf551d954..d9d8a7187 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3246,7 +3246,6 @@ name = "openmls" version = "0.5.0" source = "git+https://github.com/xmtp/openmls?rev=9cb3207#9cb3207b077fcf6bc327408dfcf3df6237aec49c" dependencies = [ - "backtrace", "itertools 0.10.5", "log", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index bf40c3554..82ab524f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ futures = "0.3.30" futures-core = "0.3.30" hex = "0.4.3" log = { version = "0.4", features = ["release_max_level_debug"] } -openmls = { git = "https://github.com/xmtp/openmls", rev = "9cb3207" } +openmls = { git = "https://github.com/xmtp/openmls", rev = "9cb3207", default_features = false } openmls_basic_credential = { git = "https://github.com/xmtp/openmls", rev = "9cb3207" } openmls_rust_crypto = { git = "https://github.com/xmtp/openmls", rev = "9cb3207" } openmls_traits = { git = "https://github.com/xmtp/openmls", rev = "9cb3207" } diff --git a/bindings_ffi/Cargo.lock b/bindings_ffi/Cargo.lock index 2b85e8b43..9c6dbe3ad 100644 --- a/bindings_ffi/Cargo.lock +++ b/bindings_ffi/Cargo.lock @@ -2819,7 +2819,6 @@ name = "openmls" version = "0.5.0" source = "git+https://github.com/xmtp/openmls?rev=9cb3207#9cb3207b077fcf6bc327408dfcf3df6237aec49c" dependencies = [ - "backtrace", "itertools 0.10.5", "log", "once_cell", diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 454169301..09a211afa 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -1859,7 +1859,7 @@ mod tests { .stream(Box::new(group_callbacks.clone())) .await .unwrap(); - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + let stream_messages = bo .conversations() .stream_all_messages(Box::new(message_callbacks.clone())) @@ -1878,8 +1878,6 @@ mod tests { .await .unwrap(); - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; - alix_group.send("hello1".as_bytes().to_vec()).await.unwrap(); tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; diff --git a/bindings_node/Cargo.lock b/bindings_node/Cargo.lock index 8d712d2ab..a0e9a9c94 100644 --- a/bindings_node/Cargo.lock +++ b/bindings_node/Cargo.lock @@ -2683,7 +2683,6 @@ name = "openmls" version = "0.5.0" source = "git+https://github.com/xmtp/openmls?rev=9cb3207#9cb3207b077fcf6bc327408dfcf3df6237aec49c" dependencies = [ - "backtrace", "itertools 0.10.5", "log", "once_cell", diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index 03a641e11..ff89d4f7d 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -131,7 +131,9 @@ pub enum MessageProcessingError { #[from] openmls::prelude::ProcessMessageError, ), #[error("merge pending commit: {0}")] - MergePendingCommit(#[from] openmls::group::MergePendingCommitError), + MergePendingCommit( + #[from] openmls::group::MergePendingCommitError, + ), #[error("merge staged commit: {0}")] MergeStagedCommit(#[from] openmls::group::MergeCommitError), #[error( @@ -175,7 +177,10 @@ impl crate::retry::RetryableError for MessageProcessingError { fn is_retryable(&self) -> bool { match self { Self::Group(group_error) => retryable!(group_error), - // Self::Identity(identity_error) => false, + Self::Identity(identity_error) => retryable!(identity_error), + Self::OpenMlsProcessMessage(err) => retryable!(err), + Self::MergePendingCommit(err) => retryable!(err), + Self::MergeStagedCommit(err) => retryable!(err), Self::Diesel(diesel_error) => retryable!(diesel_error), Self::Storage(s) => retryable!(s), Self::Generic(err) => err.contains("database is locked"), @@ -525,7 +530,6 @@ where return None; } }; - retry_async!( Retry::default(), (async { diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 29bebd4c4..49523de32 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -179,10 +179,14 @@ impl RetryableError for GroupError { Self::Diesel(diesel) => diesel.is_retryable(), Self::Storage(storage) => storage.is_retryable(), Self::ReceiveError(msg) => msg.is_retryable(), + Self::Hpke(hpke) => hpke.is_retryable(), + Self::Identity(identity) => identity.is_retryable(), Self::UpdateGroupMembership(update) => update.is_retryable(), Self::GroupCreate(group) => group.is_retryable(), Self::SelfUpdate(update) => update.is_retryable(), Self::WelcomeError(welcome) => welcome.is_retryable(), + Self::InstallationDiff(diff) => diff.is_retryable(), + Self::CreateGroupContextExtProposalError(create) => create.is_retryable(), _ => false, } } diff --git a/xmtp_mls/src/groups/subscriptions.rs b/xmtp_mls/src/groups/subscriptions.rs index 077d4efc1..1ee1bbd5a 100644 --- a/xmtp_mls/src/groups/subscriptions.rs +++ b/xmtp_mls/src/groups/subscriptions.rs @@ -33,7 +33,6 @@ impl MlsGroup { let created_ns = msgv1.created_ns; let client_pointer = client.clone(); - let process_result = retry_async!( Retry::default(), (async { diff --git a/xmtp_mls/src/hpke.rs b/xmtp_mls/src/hpke.rs index 9948c4bcf..99964fb1d 100644 --- a/xmtp_mls/src/hpke.rs +++ b/xmtp_mls/src/hpke.rs @@ -1,6 +1,8 @@ use crate::{ configuration::{CIPHERSUITE, WELCOME_HPKE_LABEL}, - storage::sql_key_store::KEY_PACKAGE_REFERENCES, + retry::RetryableError, + retryable, + storage::sql_key_store::{SqlKeyStoreError, KEY_PACKAGE_REFERENCES}, xmtp_openmls_provider::XmtpOpenMlsProvider, }; use openmls::{ @@ -22,10 +24,21 @@ pub enum HpkeError { Hpke(#[from] OpenmlsHpkeError), #[error("TLS Codec error: {0}")] TlsError(#[from] TlsCodecError), + #[error("Storage error: {0}")] + StorageError(#[from] SqlKeyStoreError), #[error("Key not found")] KeyNotFound, } +impl RetryableError for HpkeError { + fn is_retryable(&self) -> bool { + match self { + Self::StorageError(storage) => retryable!(storage), + _ => false, + } + } +} + #[tracing::instrument(level = "trace", skip_all)] pub fn encrypt_welcome(welcome_payload: &[u8], hpke_key: &[u8]) -> Result, HpkeError> { let crypto = RustCrypto::default(); @@ -52,21 +65,13 @@ pub fn decrypt_welcome( let serialized_hpke_public_key = hpke_public_key.tls_serialize_detached()?; - let hash_ref: Option = match provider + let hash_ref: Option = provider .storage() - .read(KEY_PACKAGE_REFERENCES, &serialized_hpke_public_key) - { - Ok(hash_ref) => hash_ref, - Err(_) => return Err(HpkeError::KeyNotFound), - }; + .read(KEY_PACKAGE_REFERENCES, &serialized_hpke_public_key)?; if let Some(hash_ref) = hash_ref { // With the hash reference we can read the key package. - let key_package: Option = match provider.storage().key_package(&hash_ref) - { - Ok(key_package) => key_package, - Err(_) => return Err(HpkeError::KeyNotFound), - }; + let key_package: Option = provider.storage().key_package(&hash_ref)?; if let Some(kp) = key_package { return Ok(decrypt_with_label( diff --git a/xmtp_mls/src/identity.rs b/xmtp_mls/src/identity.rs index 9af363f07..22e63a80c 100644 --- a/xmtp_mls/src/identity.rs +++ b/xmtp_mls/src/identity.rs @@ -1,6 +1,7 @@ use std::array::TryFromSliceError; use crate::configuration::GROUP_PERMISSIONS_EXTENSION_ID; +use crate::retry::RetryableError; use crate::storage::db_connection::DbConnection; use crate::storage::identity::StoredIdentity; use crate::storage::sql_key_store::{SqlKeyStoreError, KEY_PACKAGE_REFERENCES}; @@ -12,7 +13,7 @@ use crate::{ xmtp_openmls_provider::XmtpOpenMlsProvider, XmtpApi, }; -use crate::{Fetch, Store}; +use crate::{retryable, Fetch, Store}; use ed25519_dalek::SigningKey; use ethers::signers::WalletError; use log::debug; @@ -117,10 +118,10 @@ pub enum IdentityError { Decode(#[from] prost::DecodeError), #[error(transparent)] WrappedApi(#[from] WrappedApiError), - #[error("installation not found: {0}")] - InstallationIdNotFound(String), #[error(transparent)] Api(#[from] xmtp_proto::api_client::Error), + #[error("installation not found: {0}")] + InstallationIdNotFound(String), #[error(transparent)] SignatureRequestBuilder(#[from] SignatureRequestError), #[error(transparent)] @@ -163,6 +164,18 @@ pub enum IdentityError { NewIdentity(String), } +impl RetryableError for IdentityError { + fn is_retryable(&self) -> bool { + match self { + Self::Api(_) => true, + Self::WrappedApi(err) => retryable!(err), + Self::StorageError(err) => retryable!(err), + Self::OpenMlsStorageError(err) => retryable!(err), + _ => false, + } + } +} + #[derive(Debug, Clone)] pub struct Identity { pub(crate) inbox_id: InboxId, diff --git a/xmtp_mls/src/retry.rs b/xmtp_mls/src/retry.rs index 3f9261c35..fed9020a1 100644 --- a/xmtp_mls/src/retry.rs +++ b/xmtp_mls/src/retry.rs @@ -18,6 +18,7 @@ use std::time::Duration; +use rand::Rng; use smart_default::SmartDefault; /// Specifies which errors are retryable. @@ -31,8 +32,13 @@ pub trait RetryableError: std::error::Error { pub struct Retry { #[default = 5] retries: usize, - #[default(_code = "std::time::Duration::from_millis(200)")] + #[default(_code = "std::time::Duration::from_millis(50)")] duration: std::time::Duration, + #[default = 3] + // The amount to multiply the duration on each subsequent attempt + multiplier: u32, + #[default = 25] + max_jitter_ms: usize, } impl Retry { @@ -42,8 +48,16 @@ impl Retry { } /// Get the duration to wait between retries. - pub fn duration(&self) -> Duration { - self.duration + /// Multiples the duration by the multiplier for each subsequent attempt + /// and adds a random jitter to avoid repeated collisions + pub fn duration(&self, attempts: usize) -> Duration { + let mut duration = self.duration; + for _ in 0..attempts - 1 { + duration *= self.multiplier; + } + + let jitter = rand::thread_rng().gen_range(0..=self.max_jitter_ms); + duration + Duration::from_millis(jitter as u64) } } @@ -155,12 +169,12 @@ macro_rules! retry_sync { Ok(v) => break Ok(v), Err(e) => { if (&e).is_retryable() && attempts < $retry.retries() { - log::debug!( + log::info!( "retrying function that failed with error=`{}`", e.to_string() ); attempts += 1; - std::thread::sleep($retry.duration()); + std::thread::sleep($retry.duration(attempts)); } else { break Err(e); } @@ -231,7 +245,7 @@ macro_rules! retry_async { if (&e).is_retryable() && attempts < $retry.retries() { log::warn!("retrying function that failed with error={}", e.to_string()); attempts += 1; - tokio::time::sleep($retry.duration()).await; + tokio::time::sleep($retry.duration(attempts)).await; } else { log::info!("error is not retryable. {:?}", e); break Err(e); @@ -388,4 +402,13 @@ mod tests { ) .unwrap(); } + + #[test] + fn backoff_retry() { + let backoff_retry = Retry::default(); + + assert!(backoff_retry.duration(1).as_millis() - 50 <= 25); + assert!(backoff_retry.duration(2).as_millis() - 150 <= 25); + assert!(backoff_retry.duration(3).as_millis() - 450 <= 25); + } } diff --git a/xmtp_mls/src/storage/errors.rs b/xmtp_mls/src/storage/errors.rs index 0b6e09afb..68c407eaf 100644 --- a/xmtp_mls/src/storage/errors.rs +++ b/xmtp_mls/src/storage/errors.rs @@ -183,3 +183,30 @@ impl RetryableError for openmls::prelude::WelcomeError { + fn is_retryable(&self) -> bool { + match self { + Self::StorageError(storage) => retryable!(storage), + _ => false, + } + } +} + +impl RetryableError for openmls::group::MergePendingCommitError { + fn is_retryable(&self) -> bool { + match self { + Self::MlsGroupStateError(err) => retryable!(err), + Self::MergeCommitError(err) => retryable!(err), + } + } +} + +impl RetryableError for openmls::prelude::ProcessMessageError { + fn is_retryable(&self) -> bool { + match self { + Self::GroupStateError(err) => retryable!(err), + _ => false, + } + } +}