diff --git a/Cargo.lock b/Cargo.lock index 0f851dcd2..95701a280 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -855,6 +855,14 @@ dependencies = [ "tracing", ] +[[package]] +name = "cuprate-p2p-bucket" +version = "0.1.0" +dependencies = [ + "arrayvec", + "rand", +] + [[package]] name = "cuprate-p2p-core" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index d5aca71e2..614788d3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ members = [ "net/wire", "p2p/p2p", "p2p/p2p-core", + "p2p/bucket", "p2p/dandelion-tower", "p2p/async-buffer", "p2p/address-book", @@ -64,6 +65,7 @@ cuprate-levin = { path = "net/levin" ,default-feature cuprate-wire = { path = "net/wire" ,default-features = false} cuprate-p2p = { path = "p2p/p2p" ,default-features = false} cuprate-p2p-core = { path = "p2p/p2p-core" ,default-features = false} +cuprate-p2p-bucket = { path = "p2p/p2p-bucket" ,default-features = false} cuprate-dandelion-tower = { path = "p2p/dandelion-tower" ,default-features = false} cuprate-async-buffer = { path = "p2p/async-buffer" ,default-features = false} cuprate-address-book = { path = "p2p/address-book" ,default-features = false} @@ -80,6 +82,7 @@ cuprate-rpc-interface = { path = "rpc/interface" ,default-feature # External dependencies anyhow = { version = "1.0.89", default-features = false } +arrayvec = { version = "0.7", default-features = false } async-trait = { version = "0.1.82", default-features = false } bitflags = { version = "2.6.0", default-features = false } blake3 = { version = "1", default-features = false } diff --git a/books/architecture/src/appendix/crates.md b/books/architecture/src/appendix/crates.md index fe8f1f05b..ac2780e1f 100644 --- a/books/architecture/src/appendix/crates.md +++ b/books/architecture/src/appendix/crates.md @@ -35,6 +35,7 @@ cargo doc --open --package cuprate-blockchain | [`cuprate-async-buffer`](https://doc.cuprate.org/cuprate_async_buffer) | [`p2p/async-buffer/`](https://github.com/Cuprate/cuprate/tree/main/p2p/async-buffer) | A bounded SPSC, FIFO, asynchronous buffer that supports arbitrary weights for values | [`cuprate-dandelion-tower`](https://doc.cuprate.org/cuprate_dandelion_tower) | [`p2p/dandelion-tower/`](https://github.com/Cuprate/cuprate/tree/main/p2p/dandelion-tower) | TODO | [`cuprate-p2p`](https://doc.cuprate.org/cuprate_p2p) | [`p2p/p2p/`](https://github.com/Cuprate/cuprate/tree/main/p2p/p2p) | TODO +| [`cuprate-p2p-bucket`](https://doc.cuprate.org/cuprate_p2p_bucket) | [`p2p/bucket/`](https://github.com/Cuprate/cuprate/tree/main/p2p/bucket) | A collection data structure discriminating its items into "buckets" of limited size. | [`cuprate-p2p-core`](https://doc.cuprate.org/cuprate_p2p_core) | [`p2p/p2p-core/`](https://github.com/Cuprate/cuprate/tree/main/p2p/p2p-core) | TODO ## Storage diff --git a/p2p/bucket/Cargo.toml b/p2p/bucket/Cargo.toml new file mode 100644 index 000000000..1a53e85a1 --- /dev/null +++ b/p2p/bucket/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "cuprate-p2p-bucket" +version = "0.1.0" +edition = "2021" +license = "MIT" +authors = ["SyntheticBird"] + +[dependencies] +arrayvec = { workspace = true } +rand = { workspace = true, features = ["std", "std_rng"]} + +[lints] +workspace = true diff --git a/p2p/bucket/src/lib.rs b/p2p/bucket/src/lib.rs new file mode 100644 index 000000000..0f73eea29 --- /dev/null +++ b/p2p/bucket/src/lib.rs @@ -0,0 +1,172 @@ +//! Bucket data structure +//! +//! A collection data structure that discriminates its unique items and place them into "buckets". +//! +//! The item must implement the [`Bucketable`] trait that defines how to create the discriminant +//! from the item type. The data structure will internally contain any item into "buckets" or vectors +//! of sized capacity `N` that regroup all the stored items with this specific discriminant. +//! +//! A practical example of this data structure is for storing `N` amount of IP discriminated by their subnets. +//! You can store in each "buckets" corresponding to a `/16` subnet up to `N` IPs of that subnet. +//! +//! # Example +//! +//! ``` +//! use cuprate_p2p_bucket::Bucket; +//! use std::net::Ipv4Addr; +//! +//! // Create a new bucket that can store at most 2 IPs in a particular `/16` subnet. +//! let mut bucket = Bucket::<2,Ipv4Addr>::new(); +//! +//! // Fulfill the `96.96.0.0/16` bucket. +//! bucket.push("96.96.0.1".parse().unwrap()); +//! bucket.push("96.96.0.2".parse().unwrap()); +//! assert_eq!(2, bucket.len()); +//! assert_eq!(2, bucket.len_bucket(&[96_u8,96_u8]).unwrap()); +//! +//! // Push a new IP from another subnet +//! bucket.push("127.0.0.1".parse().unwrap()); +//! assert_eq!(3, bucket.len()); +//! assert_eq!(2, bucket.len_bucket(&[96_u8,96_u8]).unwrap()); +//! assert_eq!(1, bucket.len_bucket(&[127_u8,0_u8]).unwrap()); +//! +//! // Attempting to push a new IP within `96.96.0.0/16` bucket will return the IP back +//! // as this subnet is already full. +//! let pushed = bucket.push("96.96.0.3".parse().unwrap()); +//! assert!(pushed.is_some()); +//! assert_eq!(2, bucket.len_bucket(&[96_u8,96_u8]).unwrap()); +//! +//! ``` + +use arrayvec::{ArrayVec, CapacityError}; +use rand::random; + +use std::{collections::BTreeMap, net::Ipv4Addr}; + +/// A discriminant that can be computed from the type. +pub trait Bucketable: Sized + Eq + Clone { + /// The type of the discriminant being used in the Binary tree. + type Discriminant: Ord + AsRef<[u8]>; + + /// Method that can compute the discriminant from the item. + fn discriminant(&self) -> Self::Discriminant; +} + +/// A collection data structure discriminating its unique items +/// with a specified method. Limiting the amount of items stored +/// with that discriminant to the const `N`. +pub struct Bucket { + /// The storage of the bucket + storage: BTreeMap>, +} + +impl Bucket { + /// Create a new Bucket + pub const fn new() -> Self { + Self { + storage: BTreeMap::new(), + } + } + + /// Push a new element into the Bucket + /// + /// Will internally create a new vector for each new discriminant being + /// generated from an item. + /// + /// This function WILL NOT push the element if it already exists. + /// + /// Return `None` if the item has been pushed or ignored. `Some(I)` if + /// the vector is full. + /// + /// # Example + /// + /// ``` + /// use cuprate_p2p_bucket::Bucket; + /// use std::net::Ipv4Addr; + /// + /// let mut bucket = Bucket::<8,Ipv4Addr>::new(); + /// + /// // Push a first IP address. + /// bucket.push("127.0.0.1".parse().unwrap()); + /// assert_eq!(1, bucket.len()); + /// + /// // Push the same IP address a second time. + /// bucket.push("127.0.0.1".parse().unwrap()); + /// assert_eq!(1, bucket.len()); + /// ``` + pub fn push(&mut self, item: I) -> Option { + let discriminant = item.discriminant(); + + if let Some(vec) = self.storage.get_mut(&discriminant) { + // Push the item if it doesn't exist. + if !vec.contains(&item) { + return vec.try_push(item).err().map(CapacityError::element); + } + } else { + // Initialize the vector if not found. + let mut vec = ArrayVec::::new(); + vec.push(item); + self.storage.insert(discriminant, vec); + } + + None + } + + /// Will attempt to remove an item from the bucket. + pub fn remove(&mut self, item: &I) -> Option { + self.storage.get_mut(&item.discriminant()).and_then(|vec| { + vec.iter() + .enumerate() + .find_map(|(i, v)| (item == v).then_some(i)) + .map(|index| vec.swap_remove(index)) + }) + } + + /// Return the number of item stored within the storage + pub fn len(&self) -> usize { + self.storage.values().map(ArrayVec::len).sum() + } + + /// Return the number of item stored with a specific discriminant. + /// + /// This method returns None if the bucket with this discriminant + /// doesn't exist. + pub fn len_bucket(&self, discriminant: &I::Discriminant) -> Option { + self.storage.get(discriminant).map(ArrayVec::len) + } + + /// Return `true` if the storage contains no items + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Return a reference to an item chosen at random. + /// + /// Repeated use of this function will provide a normal distribution of + /// items based on their discriminants. + pub fn get_random(&mut self) -> Option<&I> { + // Get the total amount of discriminants to explore. + let len = self.storage.len(); + + // Get a random bucket. + let (_, vec) = self.storage.iter().nth(random::() / len).unwrap(); + + // Return a reference chose at random. + vec.get(random::() / vec.len()) + } +} + +impl Default for Bucket { + fn default() -> Self { + Self::new() + } +} + +impl Bucketable for Ipv4Addr { + /// We are discriminating by `/16` subnets. + type Discriminant = [u8; 2]; + + fn discriminant(&self) -> Self::Discriminant { + [self.octets()[0], self.octets()[1]] + } +}