From a9bbb5d28c941e1460f4eb7a788f009dfbf56315 Mon Sep 17 00:00:00 2001 From: Mojtaba Chenani Date: Mon, 16 Dec 2024 19:19:45 +0100 Subject: [PATCH] revert to semaphore --- xmtp_mls/src/groups/mod.rs | 2 +- xmtp_mls/src/lib.rs | 44 ++++++++++++++++++++++++++++++++------ 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index bd6661514..b5e238790 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -380,7 +380,7 @@ impl MlsGroup { let group_id = self.group_id.clone(); // Acquire the lock asynchronously - let _lock = MLS_COMMIT_LOCK.get_lock_sync(group_id.clone()).await; + let _lock = MLS_COMMIT_LOCK.get_lock_async(group_id.clone()).await; // Load the MLS group let mls_group = diff --git a/xmtp_mls/src/lib.rs b/xmtp_mls/src/lib.rs index 8a769a57b..0a401a4f1 100644 --- a/xmtp_mls/src/lib.rs +++ b/xmtp_mls/src/lib.rs @@ -20,10 +20,10 @@ pub mod verified_key_package_v2; mod xmtp_openmls_provider; pub use client::{Client, Network}; -use parking_lot::Mutex; use std::collections::HashMap; -use std::sync::{Arc, LazyLock}; +use std::sync::{Arc, LazyLock, Mutex}; use storage::{DuplicateItem, StorageError}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; pub use xmtp_openmls_provider::XmtpOpenMlsProvider; pub use xmtp_id::InboxOwner; @@ -33,7 +33,7 @@ pub use xmtp_proto::api_client::trait_impls::*; #[derive(Debug)] pub struct GroupCommitLock { // Storage for group-specific semaphores - locks: Mutex, Arc>>>, + locks: Mutex, Arc>>, } impl Default for GroupCommitLock { @@ -50,17 +50,47 @@ impl GroupCommitLock { } /// Get or create a semaphore for a specific group and acquire it, returning a guard - pub async fn get_lock_sync(&self, group_id: Vec) -> Result>, GroupError> { - let mutex = { + pub async fn get_lock_async(&self, group_id: Vec) -> Result { + let semaphore = { let mut locks = self.locks.lock(); locks + .unwrap() .entry(group_id) - .or_insert_with(|| Arc::new(Mutex::new(()))) + .or_insert_with(|| Arc::new(Semaphore::new(1))) .clone() }; - Ok(mutex) + let permit = semaphore.clone().acquire_owned().await?; + Ok(SemaphoreGuard { + _permit: permit, + _semaphore: semaphore, + }) } + + /// Get or create a semaphore for a specific group and acquire it synchronously + pub fn get_lock_sync(&self, group_id: Vec) -> Result { + let semaphore = { + let locks = self.locks.lock(); + locks + .unwrap() + .entry(group_id) + .or_insert_with(|| Arc::new(Semaphore::new(1))) + .clone() + }; + + // Synchronously acquire the permit + let permit = semaphore.clone().try_acquire_owned()?; + Ok(SemaphoreGuard { + _permit: permit, + _semaphore: semaphore, // semaphore is now valid because we cloned it earlier + }) + } +} + +/// A guard that releases the semaphore when dropped +pub struct SemaphoreGuard { + _permit: OwnedSemaphorePermit, + _semaphore: Arc, } // Static instance of `GroupCommitLock`