From 23d0ed0dae22f392017897b7fd392d3b21fa789c Mon Sep 17 00:00:00 2001 From: Hamza Jadid Date: Mon, 1 Apr 2024 11:30:11 +0300 Subject: [PATCH] refactor: moved napmap to its own repo --- Cargo.toml | 2 +- jarust/Cargo.toml | 2 +- napmap/Cargo.toml | 20 ------ napmap/README.md | 3 - napmap/src/bounded.rs | 156 ---------------------------------------- napmap/src/lib.rs | 24 ------- napmap/src/unbounded.rs | 149 -------------------------------------- 7 files changed, 2 insertions(+), 354 deletions(-) delete mode 100644 napmap/Cargo.toml delete mode 100644 napmap/README.md delete mode 100644 napmap/src/bounded.rs delete mode 100644 napmap/src/lib.rs delete mode 100644 napmap/src/unbounded.rs diff --git a/Cargo.toml b/Cargo.toml index 2cbd6dc..d1cc1c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] resolver = "2" -members = ["jarust", "jarust_make_plugin", "jarust_plugins", "napmap"] +members = ["jarust", "jarust_make_plugin", "jarust_plugins"] [workspace.package] version = "0.2.4" diff --git a/jarust/Cargo.toml b/jarust/Cargo.toml index 0c1f65c..2cde913 100644 --- a/jarust/Cargo.toml +++ b/jarust/Cargo.toml @@ -16,7 +16,7 @@ doctest = false [dependencies] async-trait.workspace = true futures-util = "0.3.29" -napmap = { version = "0.1.0", path = "../napmap" } +napmap = "0.1.1" rand = "0.8.5" serde_json.workspace = true serde.workspace = true diff --git a/napmap/Cargo.toml b/napmap/Cargo.toml deleted file mode 100644 index 60552fb..0000000 --- a/napmap/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "napmap" -version = "0.1.0" -authors.workspace = true -description.workspace = true -readme = "./README.md" -license.workspace = true -keywords.workspace = true -categories.workspace = true -edition.workspace = true -repository.workspace = true - -[dependencies] -tokio = { version = "1.35.1", features = ["sync", "time", "rt"] } -tracing.workspace = true -indexmap = "2.2.6" - -[dev-dependencies] -tokio = { version = "1.35.1", features = ["time", "macros", "rt-multi-thread"] } -tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } diff --git a/napmap/README.md b/napmap/README.md deleted file mode 100644 index 3a65bf9..0000000 --- a/napmap/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# NapMap - -`NapMap` is a map that sends the task to sleep if the requested data do not exist and notify them once available. diff --git a/napmap/src/bounded.rs b/napmap/src/bounded.rs deleted file mode 100644 index f827527..0000000 --- a/napmap/src/bounded.rs +++ /dev/null @@ -1,156 +0,0 @@ -use indexmap::IndexMap; -use std::collections::HashMap; -use std::fmt::Debug; -use std::hash::Hash; -use std::sync::Arc; -use tokio::sync::Mutex as AsyncMutex; -use tokio::sync::Notify; -use tokio::sync::RwLock as AsyncRwLock; - -pub struct NapMap -where - K: Eq + Hash + Clone + Debug, - V: Clone + Debug, -{ - map: Arc>>, - notifiers: Arc>>>, - bound: usize, -} - -impl NapMap -where - K: Eq + Hash + Clone + Debug, - V: Clone + Debug, -{ - pub fn new(buffer: usize) -> Self { - assert!(buffer > 0, "bounded napmap requires buffer > 0"); - Self { - map: Arc::new(AsyncRwLock::new(IndexMap::with_capacity(buffer))), - notifiers: Arc::new(AsyncMutex::new(HashMap::new())), - bound: buffer, - } - } - - /// Inserts a key and a value into the map. - /// And notifies waiting tasks if any. - #[tracing::instrument(level = tracing::Level::TRACE, skip(self, v))] - pub async fn insert(&self, k: K, v: V) { - tracing::trace!("Insert"); - - let mut map = self.map.write().await; - if map.len() >= self.bound { - map.pop(); - } - map.insert(k.clone(), v); - drop(map); - - if let Some(notify) = self.notifiers.lock().await.remove(&k) { - notify.notify_waiters(); - tracing::trace!("Notified all waiting tasks"); - } - } - - /// Get an immutable reference to an entry in the map. - /// If the data is already presented return it, else wait until the data is inserted. - #[tracing::instrument(level = tracing::Level::TRACE, skip(self))] - pub async fn get(&self, k: K) -> Option { - tracing::trace!("Get"); - if self.map.read().await.contains_key(&k) { - tracing::debug!("Contains key"); - return self.map.read().await.get(&k).cloned(); - } - - let mut notifiers = self.notifiers.lock().await; - let notify = notifiers - .entry(k.clone()) - .or_insert(Arc::new(Notify::new())) - .clone(); - drop(notifiers); - - tracing::trace!("Waiting..."); - notify.notified().await; - tracing::trace!("Notified, data is available"); - self.map.read().await.get(&k).cloned() - } - - pub async fn len(&self) -> usize { - self.map.read().await.len() - } - - pub async fn is_empty(&self) -> bool { - self.map.read().await.is_empty() - } -} - -#[cfg(test)] -mod tests { - use super::NapMap; - use std::sync::Arc; - use std::time::Duration; - use tracing_subscriber::EnvFilter; - - // Add this to a test to see the logs - fn _tracing_sub() { - let env_filter = - EnvFilter::from_default_env().add_directive("napmap=trace".parse().unwrap()); - tracing_subscriber::fmt().with_env_filter(env_filter).init(); - } - - #[tokio::test] - async fn it_should_wait_until_data_is_inserted() { - let napmap = Arc::new(NapMap::new(10)); - - tokio::spawn({ - let map = napmap.clone(); - async move { - tokio::time::sleep(Duration::from_secs(1)).await; - map.insert("key", 7).await; - } - }); - - let res = napmap.get("key").await.unwrap(); - assert_eq!(res, 7); - } - - #[tokio::test] - async fn it_should_notify_all_waiters() { - let napmap = Arc::new(NapMap::new(10)); - - tokio::spawn({ - let map = napmap.clone(); - async move { - tokio::time::sleep(Duration::from_secs(1)).await; - map.insert("key", 7).await; - } - }); - - let first_handle = tokio::spawn({ - let map = napmap.clone(); - async move { - let res = map.get("key").await.unwrap(); - assert_eq!(res, 7); - } - }); - - let second_handle = tokio::spawn({ - let map = napmap.clone(); - async move { - let res = map.get("key").await.unwrap(); - assert_eq!(res, 7); - } - }); - - first_handle.await.unwrap(); - second_handle.await.unwrap(); - } - - #[tokio::test] - async fn it_should_not_exceed_the_provided_buffer_size() { - let napmap = Arc::new(NapMap::new(3)); - napmap.insert(1, 1).await; - napmap.insert(2, 2).await; - napmap.insert(3, 3).await; - napmap.insert(4, 4).await; - assert_eq!(napmap.len().await, 3); - } -} diff --git a/napmap/src/lib.rs b/napmap/src/lib.rs deleted file mode 100644 index defbd89..0000000 --- a/napmap/src/lib.rs +++ /dev/null @@ -1,24 +0,0 @@ -pub mod bounded; -pub mod unbounded; - -pub use bounded::NapMap; -pub use unbounded::UnboundedNapMap; - -use std::fmt::Debug; -use std::hash::Hash; - -pub fn unbounded() -> UnboundedNapMap -where - K: Eq + Hash + Clone + Debug, - V: Clone + Debug, -{ - UnboundedNapMap::new() -} - -pub fn napmap(buffer: usize) -> NapMap -where - K: Eq + Hash + Clone + Debug, - V: Clone + Debug, -{ - NapMap::new(buffer) -} diff --git a/napmap/src/unbounded.rs b/napmap/src/unbounded.rs deleted file mode 100644 index 7c07c9f..0000000 --- a/napmap/src/unbounded.rs +++ /dev/null @@ -1,149 +0,0 @@ -use std::collections::HashMap; -use std::fmt::Debug; -use std::hash::Hash; -use std::sync::Arc; -use tokio::sync::Mutex as AsyncMutex; -use tokio::sync::Notify; -use tokio::sync::RwLock as AsyncRwLock; - -pub struct UnboundedNapMap -where - K: Eq + Hash + Clone + Debug, - V: Clone + Debug, -{ - map: Arc>>, - notifiers: Arc>>>, -} - -impl UnboundedNapMap -where - K: Eq + Hash + Clone + Debug, - V: Clone + Debug, -{ - pub fn new() -> Self { - Self { - map: Arc::new(AsyncRwLock::new(HashMap::new())), - notifiers: Arc::new(AsyncMutex::new(HashMap::new())), - } - } - - /// Inserts a key and a value into the map. - /// And notifies waiting tasks if any. - #[tracing::instrument(level = tracing::Level::TRACE, skip(self, v))] - pub async fn insert(&self, k: K, v: V) { - tracing::trace!("Insert"); - self.map.write().await.insert(k.clone(), v); - if let Some(notify) = self.notifiers.lock().await.remove(&k) { - notify.notify_waiters(); - tracing::trace!("Notified all waiting tasks"); - } - } - - /// Get an immutable reference to an entry in the map. - /// If the data is already presented return it, else wait until the data is inserted. - #[tracing::instrument(level = tracing::Level::TRACE, skip(self))] - pub async fn get(&self, k: K) -> Option { - tracing::trace!("Get"); - if self.map.read().await.contains_key(&k) { - tracing::debug!("Contains key"); - return self.map.read().await.get(&k).cloned(); - } - - let mut notifiers = self.notifiers.lock().await; - let notify = notifiers - .entry(k.clone()) - .or_insert(Arc::new(Notify::new())) - .clone(); - drop(notifiers); - - tracing::trace!("Waiting..."); - notify.notified().await; - tracing::trace!("Notified, data is available"); - self.map.read().await.get(&k).cloned() - } - - pub async fn remove(&self, k: K) -> Option { - self.map.write().await.remove(&k) - } - - pub async fn len(&self) -> usize { - self.map.read().await.len() - } - - pub async fn is_empty(&self) -> bool { - self.map.read().await.is_empty() - } -} - -impl Default for UnboundedNapMap -where - K: Eq + Hash + Clone + Debug, - V: Clone + Debug, -{ - fn default() -> Self { - Self::new() - } -} - -#[cfg(test)] -mod tests { - use super::UnboundedNapMap; - use std::sync::Arc; - use std::time::Duration; - use tracing_subscriber::EnvFilter; - - // Add this to a test to see the logs - fn _tracing_sub() { - let env_filter = - EnvFilter::from_default_env().add_directive("napmap=trace".parse().unwrap()); - tracing_subscriber::fmt().with_env_filter(env_filter).init(); - } - - #[tokio::test] - async fn it_should_wait_until_data_is_inserted() { - let napmap = Arc::new(UnboundedNapMap::new()); - - tokio::spawn({ - let map = napmap.clone(); - async move { - tokio::time::sleep(Duration::from_secs(1)).await; - map.insert("key", 7).await; - } - }); - - let res = napmap.get("key").await.unwrap(); - assert_eq!(res, 7); - } - - #[tokio::test] - async fn it_should_notify_all_waiters() { - let napmap = Arc::new(UnboundedNapMap::new()); - - tokio::spawn({ - let map = napmap.clone(); - async move { - tokio::time::sleep(Duration::from_secs(1)).await; - map.insert("key", 7).await; - } - }); - - let first_handle = tokio::spawn({ - let map = napmap.clone(); - async move { - let res = map.get("key").await.unwrap(); - assert_eq!(res, 7); - } - }); - - let second_handle = tokio::spawn({ - let map = napmap.clone(); - async move { - let res = map.get("key").await.unwrap(); - assert_eq!(res, 7); - } - }); - - first_handle.await.unwrap(); - second_handle.await.unwrap(); - } -}