Skip to content

Commit

Permalink
[ENH] Implement multiple member assignment (#3553)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
   - N/A
 - New functionality
   - Implements the multi assignment logic, which allows to deterministically assign multiple members for a given key and hasher. This mirrors the Python implementation.

## Test plan
*How are these changes tested?*

- [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs repository](https://github.com/chroma-core/docs)?*
  • Loading branch information
Sicheng-Pan authored Jan 24, 2025
1 parent b473056 commit 00ce32f
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 71 deletions.
4 changes: 2 additions & 2 deletions rust/config/src/assignment/assignment_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::Configurable;

use super::{
config::{AssignmentPolicyConfig, HasherType},
rendezvous_hash::{assign, AssignmentError, Murmur3Hasher},
rendezvous_hash::{AssignmentError, Hasher, Murmur3Hasher},
};
use async_trait::async_trait;
use chroma_error::ChromaError;
Expand Down Expand Up @@ -66,7 +66,7 @@ impl Configurable<AssignmentPolicyConfig> for RendezvousHashingAssignmentPolicy
impl AssignmentPolicy for RendezvousHashingAssignmentPolicy {
fn assign(&self, key: &str) -> Result<String, AssignmentError> {
let members = self.get_members();
assign(key, members, &self.hasher)
self.hasher.assign_one(members, key)
}

fn get_members(&self) -> Vec<String> {
Expand Down
179 changes: 110 additions & 69 deletions rust/config/src/assignment/rendezvous_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,65 @@ use murmur3::murmur3_x64_128;
/// A trait for hashing a member and a key to a score.
pub trait Hasher {
fn hash(&self, member: &str, key: &str) -> Result<u64, AssignmentError>;
/// Assign a key to a collection of members using the rendezvous hash algorithm
/// # Arguments
/// - key: The key to assign.
/// - members: The members to assign to.
/// # Returns
/// The members that the key were assigned to.
/// # Errors
/// - If the key is empty.
/// - If there are insufficient members to assign to.
/// - If there is an error hashing a member.
/// # Notes
/// This implementation mirrors the rendezvous hash implementation
/// in the go and python services.
fn assign(
&self,
members: impl IntoIterator<Item = impl AsRef<str>>,
key: &str,
k: usize,
) -> Result<Vec<String>, AssignmentError> {
let mut member_vec = members
.into_iter()
.map(|m| {
self.hash(m.as_ref(), key)
.map(|s| (s, m.as_ref().to_string()))
})
.collect::<Result<Vec<_>, _>>()?;
if member_vec.len() < k {
return Err(AssignmentError::InsufficientMember(k, member_vec.len()));
}

member_vec.sort_by_key(|(s, _)| *s);
Ok(member_vec
.into_iter()
.rev()
.take(k)
.map(|(_, m)| m)
.collect())
}

fn assign_one(
&self,
members: impl IntoIterator<Item = impl AsRef<str>>,
key: &str,
) -> Result<String, AssignmentError> {
self.assign(members, key, 1).map(|mut members| {
members
.pop()
.expect("The key should be assigned to exactly one member")
})
}
}

/// Error codes for assignment
#[derive(Error, Debug)]
pub enum AssignmentError {
#[error("Cannot assign empty key")]
EmptyKey,
#[error("No members to assign to")]
NoMembers,
#[error("Insufficient members: requested {0}, available {1}")]
InsufficientMember(usize, usize),
#[error("Error hashing member")]
HashError,
}
Expand All @@ -29,64 +79,12 @@ impl ChromaError for AssignmentError {
fn code(&self) -> ErrorCodes {
match self {
AssignmentError::EmptyKey => ErrorCodes::InvalidArgument,
AssignmentError::NoMembers => ErrorCodes::InvalidArgument,
AssignmentError::InsufficientMember(_, _) => ErrorCodes::InvalidArgument,
AssignmentError::HashError => ErrorCodes::Internal,
}
}
}

/// Assign a key to a member using the rendezvous hash algorithm.
/// # Arguments
/// - key: The key to assign.
/// - members: The members to assign to.
/// - hasher: The hasher to use.
/// # Returns
/// The member that the key was assigned to.
/// # Errors
/// - If the key is empty.
/// - If there are no members to assign to.
/// - If there is an error hashing a member.
/// # Notes
/// This implementation mirrors the rendezvous hash implementation
/// in the go and python services.
pub fn assign<H: Hasher>(
key: &str,
members: impl IntoIterator<Item = impl AsRef<str>>,
hasher: &H,
) -> Result<String, AssignmentError> {
if key.is_empty() {
return Err(AssignmentError::EmptyKey);
}

let mut iterated = false;
let mut max_score = u64::MIN;
let mut max_member = None;

for member in members {
if !iterated {
iterated = true;
}
let score = hasher.hash(member.as_ref(), key);
let score = match score {
Ok(score) => score,
Err(_err) => return Err(AssignmentError::HashError),
};
if score > max_score {
max_score = score;
max_member = Some(member);
}
}

if !iterated {
return Err(AssignmentError::NoMembers);
}

match max_member {
Some(max_member) => Ok(max_member.as_ref().to_string()),
None => Err(AssignmentError::NoMembers),
}
}

fn merge_hashes(x: u64, y: u64) -> u64 {
let mut acc = x ^ y;
acc ^= acc >> 33;
Expand Down Expand Up @@ -118,6 +116,8 @@ impl Hasher for Murmur3Hasher {

#[cfg(test)]
mod tests {
use std::collections::HashSet;

use super::*;

struct MockHasher {}
Expand All @@ -138,35 +138,76 @@ mod tests {
let members = vec!["a", "b", "c"];
let hasher = MockHasher {};
let key = "key";
let member = assign(key, members, &hasher).unwrap();
let member = hasher.assign_one(&members, key).unwrap();
assert_eq!(member, "c".to_string());
}

#[test]
fn test_even_distribution() {
let key_count = 1000;
let member_count = 10;
let tolerance = 25;
let mut nodes = Vec::with_capacity(member_count);
// Probablity of a key get assigned to a particular member, assuming perfect hashing
let prob = 1_f64 / member_count as f64;
// Expected number of keys assigned to a member
let expected = key_count as f64 * prob;
// Variance of the total number of keys assigned to a member
let var = key_count as f64 * prob * (1_f64 - prob);
let hasher = Murmur3Hasher {};

for i in 0..member_count {
let member = format!("member{}", i);
nodes.push(member);
let nodes = (0..member_count)
.map(|i| format!("{i}"))
.collect::<Vec<_>>();

let mut counts = vec![0; member_count];
for i in 0..key_count {
let key = format!("key_{}", i);
let member = hasher.assign_one(&nodes, &key).unwrap();
counts[member.parse::<usize>().unwrap()] += 1;
}

for count in counts.iter().take(member_count).copied() {
let diff = count as f64 - expected;
// The distribution should be Binomial(key_count, prob)
// Since key_count is large, this is approximately normal
// We are confident that number of keys assigned to any member should be within 3 standard deviation of the expected value
assert!(diff.abs() < var.sqrt() * 3_f64);
}
}

#[test]
fn test_multi_assign_even_distribution() {
let k = 3;
let key_count = 1000;
let member_count = 10;
// Probablity of a key get assigned to a particular member, assuming perfect hashing
let prob = k as f64 / member_count as f64;
// Expected number of keys assigned to a member
let expected = key_count as f64 * prob;
// Estimated variance of the total number of keys assigned to a member
let var = (k * key_count) as f64 * prob * (1_f64 - prob);
let hasher = Murmur3Hasher {};

let nodes = (0..member_count)
.map(|i| format!("{i}"))
.collect::<Vec<_>>();

let mut counts = vec![0; member_count];
let num_keys = 1000;
for i in 0..num_keys {
for i in 0..key_count {
let key = format!("key_{}", i);
let member = assign(&key, &nodes, &hasher).unwrap();
let index = nodes.iter().position(|x| *x == member).unwrap();
counts[index] += 1;
let members = hasher.assign(&nodes, &key, k).unwrap();
// Assigned members should be unique
let unique_members: HashSet<_> = HashSet::from_iter(members.iter());
assert_eq!(unique_members.len(), members.len());
for member in members {
counts[member.parse::<usize>().unwrap()] += 1;
}
}

let expected = num_keys / member_count;
for count in counts.iter().take(member_count).copied() {
let diff = count - expected as i32;
assert!(diff.abs() < tolerance);
let diff = count as f64 - expected;
// The number of keys assigned to a member should be approximately normal
// We are confident that number of keys assigned to any member should be within 3 standard deviation of the expected value
assert!(diff.abs() < var.sqrt() * 3_f64);
}
}
}

0 comments on commit 00ce32f

Please sign in to comment.