Skip to content

Commit

Permalink
revert to semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
mchenani committed Dec 16, 2024
1 parent ba0b09c commit a9bbb5d
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 8 deletions.
2 changes: 1 addition & 1 deletion xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
let group_id = self.group_id.clone();

// Acquire the lock asynchronously
let _lock = MLS_COMMIT_LOCK.get_lock_sync(group_id.clone()).await;
let _lock = MLS_COMMIT_LOCK.get_lock_async(group_id.clone()).await;

// Load the MLS group
let mls_group =
Expand Down
44 changes: 37 additions & 7 deletions xmtp_mls/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ pub mod verified_key_package_v2;
mod xmtp_openmls_provider;

pub use client::{Client, Network};
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::{Arc, LazyLock};
use std::sync::{Arc, LazyLock, Mutex};
use storage::{DuplicateItem, StorageError};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
pub use xmtp_openmls_provider::XmtpOpenMlsProvider;

pub use xmtp_id::InboxOwner;
Expand All @@ -33,7 +33,7 @@ pub use xmtp_proto::api_client::trait_impls::*;
#[derive(Debug)]
pub struct GroupCommitLock {
// Storage for group-specific semaphores
locks: Mutex<HashMap<Vec<u8>, Arc<Mutex<()>>>>,
locks: Mutex<HashMap<Vec<u8>, Arc<Semaphore>>>,
}

impl Default for GroupCommitLock {
Expand All @@ -50,17 +50,47 @@ impl GroupCommitLock {
}

/// Get or create a semaphore for a specific group and acquire it, returning a guard
pub async fn get_lock_sync(&self, group_id: Vec<u8>) -> Result<Arc<Mutex<()>>, GroupError> {
let mutex = {
pub async fn get_lock_async(&self, group_id: Vec<u8>) -> Result<SemaphoreGuard, GroupError> {
let semaphore = {
let mut locks = self.locks.lock();
locks
.unwrap()
.entry(group_id)
.or_insert_with(|| Arc::new(Mutex::new(())))
.or_insert_with(|| Arc::new(Semaphore::new(1)))
.clone()
};

Ok(mutex)
let permit = semaphore.clone().acquire_owned().await?;
Ok(SemaphoreGuard {
_permit: permit,
_semaphore: semaphore,
})
}

/// Get or create a semaphore for a specific group and acquire it synchronously
pub fn get_lock_sync(&self, group_id: Vec<u8>) -> Result<SemaphoreGuard, GroupError> {
let semaphore = {
let locks = self.locks.lock();
locks
.unwrap()
.entry(group_id)
.or_insert_with(|| Arc::new(Semaphore::new(1)))
.clone()
};

// Synchronously acquire the permit
let permit = semaphore.clone().try_acquire_owned()?;
Ok(SemaphoreGuard {
_permit: permit,
_semaphore: semaphore, // semaphore is now valid because we cloned it earlier
})
}
}

/// A guard that releases the semaphore when dropped
pub struct SemaphoreGuard {
_permit: OwnedSemaphorePermit,
_semaphore: Arc<Semaphore>,
}

// Static instance of `GroupCommitLock`
Expand Down

0 comments on commit a9bbb5d

Please sign in to comment.