Skip to content

Commit

Permalink
Even more locking fixes (#872)
Browse files Browse the repository at this point in the history
* Initial commit

* Simplify retry strategy

* Remove clone
  • Loading branch information
neekolas authored Jun 26, 2024
1 parent c3a03b3 commit feb4864
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 32 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ futures = "0.3.30"
futures-core = "0.3.30"
hex = "0.4.3"
log = { version = "0.4", features = ["release_max_level_debug"] }
openmls = { git = "https://github.com/xmtp/openmls", rev = "9cb3207" }
openmls = { git = "https://github.com/xmtp/openmls", rev = "9cb3207", default_features = false }
openmls_basic_credential = { git = "https://github.com/xmtp/openmls", rev = "9cb3207" }
openmls_rust_crypto = { git = "https://github.com/xmtp/openmls", rev = "9cb3207" }
openmls_traits = { git = "https://github.com/xmtp/openmls", rev = "9cb3207" }
Expand Down
1 change: 0 additions & 1 deletion bindings_ffi/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1859,7 +1859,7 @@ mod tests {
.stream(Box::new(group_callbacks.clone()))
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;

let stream_messages = bo
.conversations()
.stream_all_messages(Box::new(message_callbacks.clone()))
Expand All @@ -1878,8 +1878,6 @@ mod tests {
.await
.unwrap();

tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;

alix_group.send("hello1".as_bytes().to_vec()).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;

Expand Down
1 change: 0 additions & 1 deletion bindings_node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ pub enum MessageProcessingError {
#[from] openmls::prelude::ProcessMessageError<sql_key_store::SqlKeyStoreError>,
),
#[error("merge pending commit: {0}")]
MergePendingCommit(#[from] openmls::group::MergePendingCommitError<StorageError>),
MergePendingCommit(
#[from] openmls::group::MergePendingCommitError<sql_key_store::SqlKeyStoreError>,
),
#[error("merge staged commit: {0}")]
MergeStagedCommit(#[from] openmls::group::MergeCommitError<sql_key_store::SqlKeyStoreError>),
#[error(
Expand Down Expand Up @@ -175,7 +177,10 @@ impl crate::retry::RetryableError for MessageProcessingError {
fn is_retryable(&self) -> bool {
match self {
Self::Group(group_error) => retryable!(group_error),
// Self::Identity(identity_error) => false,
Self::Identity(identity_error) => retryable!(identity_error),
Self::OpenMlsProcessMessage(err) => retryable!(err),
Self::MergePendingCommit(err) => retryable!(err),
Self::MergeStagedCommit(err) => retryable!(err),
Self::Diesel(diesel_error) => retryable!(diesel_error),
Self::Storage(s) => retryable!(s),
Self::Generic(err) => err.contains("database is locked"),
Expand Down Expand Up @@ -525,7 +530,6 @@ where
return None;
}
};

retry_async!(
Retry::default(),
(async {
Expand Down
4 changes: 4 additions & 0 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,14 @@ impl RetryableError for GroupError {
Self::Diesel(diesel) => diesel.is_retryable(),
Self::Storage(storage) => storage.is_retryable(),
Self::ReceiveError(msg) => msg.is_retryable(),
Self::Hpke(hpke) => hpke.is_retryable(),
Self::Identity(identity) => identity.is_retryable(),
Self::UpdateGroupMembership(update) => update.is_retryable(),
Self::GroupCreate(group) => group.is_retryable(),
Self::SelfUpdate(update) => update.is_retryable(),
Self::WelcomeError(welcome) => welcome.is_retryable(),
Self::InstallationDiff(diff) => diff.is_retryable(),
Self::CreateGroupContextExtProposalError(create) => create.is_retryable(),
_ => false,
}
}
Expand Down
1 change: 0 additions & 1 deletion xmtp_mls/src/groups/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ impl MlsGroup {
let created_ns = msgv1.created_ns;

let client_pointer = client.clone();

let process_result = retry_async!(
Retry::default(),
(async {
Expand Down
29 changes: 17 additions & 12 deletions xmtp_mls/src/hpke.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::{
configuration::{CIPHERSUITE, WELCOME_HPKE_LABEL},
storage::sql_key_store::KEY_PACKAGE_REFERENCES,
retry::RetryableError,
retryable,
storage::sql_key_store::{SqlKeyStoreError, KEY_PACKAGE_REFERENCES},
xmtp_openmls_provider::XmtpOpenMlsProvider,
};
use openmls::{
Expand All @@ -22,10 +24,21 @@ pub enum HpkeError {
Hpke(#[from] OpenmlsHpkeError),
#[error("TLS Codec error: {0}")]
TlsError(#[from] TlsCodecError),
#[error("Storage error: {0}")]
StorageError(#[from] SqlKeyStoreError),
#[error("Key not found")]
KeyNotFound,
}

impl RetryableError for HpkeError {
fn is_retryable(&self) -> bool {
match self {
Self::StorageError(storage) => retryable!(storage),
_ => false,
}
}
}

#[tracing::instrument(level = "trace", skip_all)]
pub fn encrypt_welcome(welcome_payload: &[u8], hpke_key: &[u8]) -> Result<Vec<u8>, HpkeError> {
let crypto = RustCrypto::default();
Expand All @@ -52,21 +65,13 @@ pub fn decrypt_welcome(

let serialized_hpke_public_key = hpke_public_key.tls_serialize_detached()?;

let hash_ref: Option<KeyPackageRef> = match provider
let hash_ref: Option<KeyPackageRef> = provider
.storage()
.read(KEY_PACKAGE_REFERENCES, &serialized_hpke_public_key)
{
Ok(hash_ref) => hash_ref,
Err(_) => return Err(HpkeError::KeyNotFound),
};
.read(KEY_PACKAGE_REFERENCES, &serialized_hpke_public_key)?;

if let Some(hash_ref) = hash_ref {
// With the hash reference we can read the key package.
let key_package: Option<KeyPackageBundle> = match provider.storage().key_package(&hash_ref)
{
Ok(key_package) => key_package,
Err(_) => return Err(HpkeError::KeyNotFound),
};
let key_package: Option<KeyPackageBundle> = provider.storage().key_package(&hash_ref)?;

if let Some(kp) = key_package {
return Ok(decrypt_with_label(
Expand Down
19 changes: 16 additions & 3 deletions xmtp_mls/src/identity.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::array::TryFromSliceError;

use crate::configuration::GROUP_PERMISSIONS_EXTENSION_ID;
use crate::retry::RetryableError;
use crate::storage::db_connection::DbConnection;
use crate::storage::identity::StoredIdentity;
use crate::storage::sql_key_store::{SqlKeyStoreError, KEY_PACKAGE_REFERENCES};
Expand All @@ -12,7 +13,7 @@ use crate::{
xmtp_openmls_provider::XmtpOpenMlsProvider,
XmtpApi,
};
use crate::{Fetch, Store};
use crate::{retryable, Fetch, Store};
use ed25519_dalek::SigningKey;
use ethers::signers::WalletError;
use log::debug;
Expand Down Expand Up @@ -117,10 +118,10 @@ pub enum IdentityError {
Decode(#[from] prost::DecodeError),
#[error(transparent)]
WrappedApi(#[from] WrappedApiError),
#[error("installation not found: {0}")]
InstallationIdNotFound(String),
#[error(transparent)]
Api(#[from] xmtp_proto::api_client::Error),
#[error("installation not found: {0}")]
InstallationIdNotFound(String),
#[error(transparent)]
SignatureRequestBuilder(#[from] SignatureRequestError),
#[error(transparent)]
Expand Down Expand Up @@ -163,6 +164,18 @@ pub enum IdentityError {
NewIdentity(String),
}

impl RetryableError for IdentityError {
fn is_retryable(&self) -> bool {
match self {
Self::Api(_) => true,
Self::WrappedApi(err) => retryable!(err),
Self::StorageError(err) => retryable!(err),
Self::OpenMlsStorageError(err) => retryable!(err),
_ => false,
}
}
}

#[derive(Debug, Clone)]
pub struct Identity {
pub(crate) inbox_id: InboxId,
Expand Down
35 changes: 29 additions & 6 deletions xmtp_mls/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::time::Duration;

use rand::Rng;
use smart_default::SmartDefault;

/// Specifies which errors are retryable.
Expand All @@ -31,8 +32,13 @@ pub trait RetryableError: std::error::Error {
pub struct Retry {
#[default = 5]
retries: usize,
#[default(_code = "std::time::Duration::from_millis(200)")]
#[default(_code = "std::time::Duration::from_millis(50)")]
duration: std::time::Duration,
#[default = 3]
// The amount to multiply the duration on each subsequent attempt
multiplier: u32,
#[default = 25]
max_jitter_ms: usize,
}

impl Retry {
Expand All @@ -42,8 +48,16 @@ impl Retry {
}

/// Get the duration to wait between retries.
pub fn duration(&self) -> Duration {
self.duration
/// Multiples the duration by the multiplier for each subsequent attempt
/// and adds a random jitter to avoid repeated collisions
pub fn duration(&self, attempts: usize) -> Duration {
let mut duration = self.duration;
for _ in 0..attempts - 1 {
duration *= self.multiplier;
}

let jitter = rand::thread_rng().gen_range(0..=self.max_jitter_ms);
duration + Duration::from_millis(jitter as u64)
}
}

Expand Down Expand Up @@ -155,12 +169,12 @@ macro_rules! retry_sync {
Ok(v) => break Ok(v),
Err(e) => {
if (&e).is_retryable() && attempts < $retry.retries() {
log::debug!(
log::info!(
"retrying function that failed with error=`{}`",
e.to_string()
);
attempts += 1;
std::thread::sleep($retry.duration());
std::thread::sleep($retry.duration(attempts));
} else {
break Err(e);
}
Expand Down Expand Up @@ -231,7 +245,7 @@ macro_rules! retry_async {
if (&e).is_retryable() && attempts < $retry.retries() {
log::warn!("retrying function that failed with error={}", e.to_string());
attempts += 1;
tokio::time::sleep($retry.duration()).await;
tokio::time::sleep($retry.duration(attempts)).await;
} else {
log::info!("error is not retryable. {:?}", e);
break Err(e);
Expand Down Expand Up @@ -388,4 +402,13 @@ mod tests {
)
.unwrap();
}

#[test]
fn backoff_retry() {
let backoff_retry = Retry::default();

assert!(backoff_retry.duration(1).as_millis() - 50 <= 25);
assert!(backoff_retry.duration(2).as_millis() - 150 <= 25);
assert!(backoff_retry.duration(3).as_millis() - 450 <= 25);
}
}
27 changes: 27 additions & 0 deletions xmtp_mls/src/storage/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,30 @@ impl RetryableError for openmls::prelude::WelcomeError<sql_key_store::SqlKeyStor
}
}
}

impl RetryableError for openmls::group::MergeCommitError<sql_key_store::SqlKeyStoreError> {
fn is_retryable(&self) -> bool {
match self {
Self::StorageError(storage) => retryable!(storage),
_ => false,
}
}
}

impl RetryableError for openmls::group::MergePendingCommitError<sql_key_store::SqlKeyStoreError> {
fn is_retryable(&self) -> bool {
match self {
Self::MlsGroupStateError(err) => retryable!(err),
Self::MergeCommitError(err) => retryable!(err),
}
}
}

impl RetryableError for openmls::prelude::ProcessMessageError<sql_key_store::SqlKeyStoreError> {
fn is_retryable(&self) -> bool {
match self {
Self::GroupStateError(err) => retryable!(err),
_ => false,
}
}
}

0 comments on commit feb4864

Please sign in to comment.