diff --git a/rust/config/src/assignment/assignment_policy.rs b/rust/config/src/assignment/assignment_policy.rs index 9f7970e54f4..ce8fb5c893c 100644 --- a/rust/config/src/assignment/assignment_policy.rs +++ b/rust/config/src/assignment/assignment_policy.rs @@ -2,7 +2,7 @@ use crate::Configurable; use super::{ config::{AssignmentPolicyConfig, HasherType}, - rendezvous_hash::{assign, AssignmentError, Murmur3Hasher}, + rendezvous_hash::{AssignmentError, Hasher, Murmur3Hasher}, }; use async_trait::async_trait; use chroma_error::ChromaError; @@ -66,7 +66,7 @@ impl Configurable for RendezvousHashingAssignmentPolicy impl AssignmentPolicy for RendezvousHashingAssignmentPolicy { fn assign(&self, key: &str) -> Result { let members = self.get_members(); - assign(key, members, &self.hasher) + self.hasher.assign_one(members, key) } fn get_members(&self) -> Vec { diff --git a/rust/config/src/assignment/rendezvous_hash.rs b/rust/config/src/assignment/rendezvous_hash.rs index b8bb942c067..e8b1f8c8d99 100644 --- a/rust/config/src/assignment/rendezvous_hash.rs +++ b/rust/config/src/assignment/rendezvous_hash.rs @@ -12,6 +12,56 @@ use murmur3::murmur3_x64_128; /// A trait for hashing a member and a key to a score. pub trait Hasher { fn hash(&self, member: &str, key: &str) -> Result; + /// Assign a key to a collection of members using the rendezvous hash algorithm + /// # Arguments + /// - key: The key to assign. + /// - members: The members to assign to. + /// # Returns + /// The members that the key were assigned to. + /// # Errors + /// - If the key is empty. + /// - If there are insufficient members to assign to. + /// - If there is an error hashing a member. + /// # Notes + /// This implementation mirrors the rendezvous hash implementation + /// in the go and python services. + fn assign( + &self, + members: impl IntoIterator>, + key: &str, + k: usize, + ) -> Result, AssignmentError> { + let mut member_vec = members + .into_iter() + .map(|m| { + self.hash(m.as_ref(), key) + .map(|s| (s, m.as_ref().to_string())) + }) + .collect::, _>>()?; + if member_vec.len() < k { + return Err(AssignmentError::InsufficientMember(k, member_vec.len())); + } + + member_vec.sort_by_key(|(s, _)| *s); + Ok(member_vec + .into_iter() + .rev() + .take(k) + .map(|(_, m)| m) + .collect()) + } + + fn assign_one( + &self, + members: impl IntoIterator>, + key: &str, + ) -> Result { + self.assign(members, key, 1).map(|mut members| { + members + .pop() + .expect("The key should be assigned to exactly one member") + }) + } } /// Error codes for assignment @@ -19,8 +69,8 @@ pub trait Hasher { pub enum AssignmentError { #[error("Cannot assign empty key")] EmptyKey, - #[error("No members to assign to")] - NoMembers, + #[error("Insufficient members: requested {0}, available {1}")] + InsufficientMember(usize, usize), #[error("Error hashing member")] HashError, } @@ -29,64 +79,12 @@ impl ChromaError for AssignmentError { fn code(&self) -> ErrorCodes { match self { AssignmentError::EmptyKey => ErrorCodes::InvalidArgument, - AssignmentError::NoMembers => ErrorCodes::InvalidArgument, + AssignmentError::InsufficientMember(_, _) => ErrorCodes::InvalidArgument, AssignmentError::HashError => ErrorCodes::Internal, } } } -/// Assign a key to a member using the rendezvous hash algorithm. -/// # Arguments -/// - key: The key to assign. -/// - members: The members to assign to. -/// - hasher: The hasher to use. -/// # Returns -/// The member that the key was assigned to. -/// # Errors -/// - If the key is empty. -/// - If there are no members to assign to. -/// - If there is an error hashing a member. -/// # Notes -/// This implementation mirrors the rendezvous hash implementation -/// in the go and python services. -pub fn assign( - key: &str, - members: impl IntoIterator>, - hasher: &H, -) -> Result { - if key.is_empty() { - return Err(AssignmentError::EmptyKey); - } - - let mut iterated = false; - let mut max_score = u64::MIN; - let mut max_member = None; - - for member in members { - if !iterated { - iterated = true; - } - let score = hasher.hash(member.as_ref(), key); - let score = match score { - Ok(score) => score, - Err(_err) => return Err(AssignmentError::HashError), - }; - if score > max_score { - max_score = score; - max_member = Some(member); - } - } - - if !iterated { - return Err(AssignmentError::NoMembers); - } - - match max_member { - Some(max_member) => Ok(max_member.as_ref().to_string()), - None => Err(AssignmentError::NoMembers), - } -} - fn merge_hashes(x: u64, y: u64) -> u64 { let mut acc = x ^ y; acc ^= acc >> 33; @@ -118,6 +116,8 @@ impl Hasher for Murmur3Hasher { #[cfg(test)] mod tests { + use std::collections::HashSet; + use super::*; struct MockHasher {} @@ -138,35 +138,76 @@ mod tests { let members = vec!["a", "b", "c"]; let hasher = MockHasher {}; let key = "key"; - let member = assign(key, members, &hasher).unwrap(); + let member = hasher.assign_one(&members, key).unwrap(); assert_eq!(member, "c".to_string()); } #[test] fn test_even_distribution() { + let key_count = 1000; let member_count = 10; - let tolerance = 25; - let mut nodes = Vec::with_capacity(member_count); + // Probablity of a key get assigned to a particular member, assuming perfect hashing + let prob = 1_f64 / member_count as f64; + // Expected number of keys assigned to a member + let expected = key_count as f64 * prob; + // Variance of the total number of keys assigned to a member + let var = key_count as f64 * prob * (1_f64 - prob); let hasher = Murmur3Hasher {}; - for i in 0..member_count { - let member = format!("member{}", i); - nodes.push(member); + let nodes = (0..member_count) + .map(|i| format!("{i}")) + .collect::>(); + + let mut counts = vec![0; member_count]; + for i in 0..key_count { + let key = format!("key_{}", i); + let member = hasher.assign_one(&nodes, &key).unwrap(); + counts[member.parse::().unwrap()] += 1; } + for count in counts.iter().take(member_count).copied() { + let diff = count as f64 - expected; + // The distribution should be Binomial(key_count, prob) + // Since key_count is large, this is approximately normal + // We are confident that number of keys assigned to any member should be within 3 standard deviation of the expected value + assert!(diff.abs() < var.sqrt() * 3_f64); + } + } + + #[test] + fn test_multi_assign_even_distribution() { + let k = 3; + let key_count = 1000; + let member_count = 10; + // Probablity of a key get assigned to a particular member, assuming perfect hashing + let prob = k as f64 / member_count as f64; + // Expected number of keys assigned to a member + let expected = key_count as f64 * prob; + // Estimated variance of the total number of keys assigned to a member + let var = (k * key_count) as f64 * prob * (1_f64 - prob); + let hasher = Murmur3Hasher {}; + + let nodes = (0..member_count) + .map(|i| format!("{i}")) + .collect::>(); + let mut counts = vec![0; member_count]; - let num_keys = 1000; - for i in 0..num_keys { + for i in 0..key_count { let key = format!("key_{}", i); - let member = assign(&key, &nodes, &hasher).unwrap(); - let index = nodes.iter().position(|x| *x == member).unwrap(); - counts[index] += 1; + let members = hasher.assign(&nodes, &key, k).unwrap(); + // Assigned members should be unique + let unique_members: HashSet<_> = HashSet::from_iter(members.iter()); + assert_eq!(unique_members.len(), members.len()); + for member in members { + counts[member.parse::().unwrap()] += 1; + } } - let expected = num_keys / member_count; for count in counts.iter().take(member_count).copied() { - let diff = count - expected as i32; - assert!(diff.abs() < tolerance); + let diff = count as f64 - expected; + // The number of keys assigned to a member should be approximately normal + // We are confident that number of keys assigned to any member should be within 3 standard deviation of the expected value + assert!(diff.abs() < var.sqrt() * 3_f64); } } }