Skip to content

Commit

Permalink
[CLN] Make sysdb use enum not dyn (chroma-core#2350)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
- Makes sysdb an enum not dyn. We perfer an enum to dyn where the number
of impls is fairly bounded - in this case 2 - the real one and a mock.
 - New functionality
	 - None

## Test plan
*How are these changes tested?*
Existing tests - this is a non-functional refactor.
- [x] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need
to make documentation changes in the [docs
repository](https://github.com/chroma-core/docs)?*
  • Loading branch information
HammadB authored and Anush008 committed Jun 27, 2024
1 parent e4d3b22 commit fce40bf
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 109 deletions.
42 changes: 25 additions & 17 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::execution::orchestration::CompactionResponse;
use crate::index::hnsw_provider::HnswIndexProvider;
use crate::log::log::Log;
use crate::memberlist::Memberlist;
use crate::segment::record_segment::RecordSegmentReader;
use crate::storage::Storage;
use crate::sysdb;
use crate::sysdb::sysdb::SysDb;
Expand All @@ -38,7 +37,7 @@ pub(crate) struct CompactionManager {
scheduler: Scheduler,
// Dependencies
log: Box<dyn Log>,
sysdb: Box<dyn SysDb>,
sysdb: Box<SysDb>,
storage: Storage,
blockfile_provider: BlockfileProvider,
hnsw_index_provider: HnswIndexProvider,
Expand Down Expand Up @@ -67,7 +66,7 @@ impl CompactionManager {
pub(crate) fn new(
scheduler: Scheduler,
log: Box<dyn Log>,
sysdb: Box<dyn SysDb>,
sysdb: Box<SysDb>,
storage: Storage,
blockfile_provider: BlockfileProvider,
hnsw_index_provider: HnswIndexProvider,
Expand Down Expand Up @@ -358,7 +357,7 @@ mod tests {
}),
);

let mut sysdb = Box::new(TestSysDb::new());
let mut sysdb = Box::new(SysDb::Test(TestSysDb::new()));

let tenant_1 = "tenant_1".to_string();
let collection_1 = Collection {
Expand All @@ -383,8 +382,13 @@ mod tests {
log_position: -1,
version: 0,
};
sysdb.add_collection(collection_1);
sysdb.add_collection(collection_2);
match *sysdb {
SysDb::Test(ref mut sysdb) => {
sysdb.add_collection(collection_1);
sysdb.add_collection(collection_2);
}
_ => panic!("Invalid sysdb type"),
}

let collection_1_record_segment = Segment {
id: Uuid::new_v4(),
Expand Down Expand Up @@ -440,17 +444,21 @@ mod tests {
file_path: HashMap::new(),
};

sysdb.add_segment(collection_1_record_segment);
sysdb.add_segment(collection_2_record_segment);
sysdb.add_segment(collection_1_hnsw_segment);
sysdb.add_segment(collection_2_hnsw_segment);
sysdb.add_segment(collection_1_metadata_segment);
sysdb.add_segment(collection_2_metadata_segment);

let last_compaction_time_1 = 2;
sysdb.add_tenant_last_compaction_time(tenant_1, last_compaction_time_1);
let last_compaction_time_2 = 1;
sysdb.add_tenant_last_compaction_time(tenant_2, last_compaction_time_2);
match *sysdb {
SysDb::Test(ref mut sysdb) => {
sysdb.add_segment(collection_1_record_segment);
sysdb.add_segment(collection_2_record_segment);
sysdb.add_segment(collection_1_hnsw_segment);
sysdb.add_segment(collection_2_hnsw_segment);
sysdb.add_segment(collection_1_metadata_segment);
sysdb.add_segment(collection_2_metadata_segment);
let last_compaction_time_1 = 2;
sysdb.add_tenant_last_compaction_time(tenant_1, last_compaction_time_1);
let last_compaction_time_2 = 1;
sysdb.add_tenant_last_compaction_time(tenant_2, last_compaction_time_2);
}
_ => panic!("Invalid sysdb type"),
}

let my_member_id = "1".to_string();
let compaction_manager_queue_size = 1000;
Expand Down
45 changes: 29 additions & 16 deletions rust/worker/src/compactor/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use uuid::Uuid;
pub(crate) struct Scheduler {
my_ip: String,
log: Box<dyn Log>,
sysdb: Box<dyn SysDb>,
sysdb: Box<SysDb>,
policy: Box<dyn SchedulerPolicy>,
job_queue: Vec<CompactionJob>,
max_concurrent_jobs: usize,
Expand All @@ -23,7 +23,7 @@ impl Scheduler {
pub(crate) fn new(
my_ip: String,
log: Box<dyn Log>,
sysdb: Box<dyn SysDb>,
sysdb: Box<SysDb>,
policy: Box<dyn SchedulerPolicy>,
max_concurrent_jobs: usize,
assignment_policy: Box<dyn AssignmentPolicy>,
Expand Down Expand Up @@ -246,7 +246,7 @@ mod tests {
}),
);

let mut sysdb = Box::new(TestSysDb::new());
let mut sysdb = Box::new(SysDb::Test(TestSysDb::new()));

let tenant_1 = "tenant_1".to_string();
let collection_1 = Collection {
Expand All @@ -271,11 +271,15 @@ mod tests {
log_position: 0,
version: 0,
};
sysdb.add_collection(collection_1);
sysdb.add_collection(collection_2);

let last_compaction_time_1 = 2;
sysdb.add_tenant_last_compaction_time(tenant_1, last_compaction_time_1);
match *sysdb {
SysDb::Test(ref mut sysdb) => {
sysdb.add_collection(collection_1);
sysdb.add_collection(collection_2);
let last_compaction_time_1 = 2;
sysdb.add_tenant_last_compaction_time(tenant_1, last_compaction_time_1);
}
_ => panic!("Invalid sysdb type"),
}

let my_member_id = "1".to_string();
let scheduler_policy = Box::new(LasCompactionTimeSchedulerPolicy {});
Expand Down Expand Up @@ -314,8 +318,14 @@ mod tests {
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].collection_id, collection_uuid_1,);

let last_compaction_time_2 = 1;
sysdb.add_tenant_last_compaction_time(tenant_2, last_compaction_time_2);
// Add last compaction time for tenant_2
match *sysdb {
SysDb::Test(ref mut sysdb) => {
let last_compaction_time_2 = 1;
sysdb.add_tenant_last_compaction_time(tenant_2, last_compaction_time_2);
}
_ => panic!("Invalid sysdb type"),
}
scheduler.schedule().await;
let jobs = scheduler.get_jobs();
let jobs = jobs.collect::<Vec<&CompactionJob>>();
Expand Down Expand Up @@ -420,7 +430,7 @@ mod tests {
);
let _ = log.update_collection_log_offset(collection_uuid_1, 2).await;

let mut sysdb = Box::new(TestSysDb::new());
let mut sysdb = Box::new(SysDb::Test(TestSysDb::new()));

let tenant_1 = "tenant_1".to_string();
let collection_1 = Collection {
Expand All @@ -434,11 +444,14 @@ mod tests {
version: 0,
};

sysdb.add_collection(collection_1);

let last_compaction_time_1 = 2;
sysdb.add_tenant_last_compaction_time(tenant_1, last_compaction_time_1);

match *sysdb {
SysDb::Test(ref mut sysdb) => {
sysdb.add_collection(collection_1);
let last_compaction_time_1 = 2;
sysdb.add_tenant_last_compaction_time(tenant_1, last_compaction_time_1);
}
_ => panic!("Invalid sysdb type"),
}
let my_ip = "0.0.0.1".to_string();
let scheduler_policy = Box::new(LasCompactionTimeSchedulerPolicy {});
let max_concurrent_jobs = 1000;
Expand Down
25 changes: 18 additions & 7 deletions rust/worker/src/execution/operators/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub struct RegisterInput {
log_position: i64,
collection_version: i32,
segment_flush_info: Arc<[SegmentFlushInfo]>,
sysdb: Box<dyn SysDb>,
sysdb: Box<SysDb>,
log: Box<dyn Log>,
}

Expand All @@ -57,7 +57,7 @@ impl RegisterInput {
log_position: i64,
collection_version: i32,
segment_flush_info: Arc<[SegmentFlushInfo]>,
sysdb: Box<dyn SysDb>,
sysdb: Box<SysDb>,
log: Box<dyn Log>,
) -> Self {
RegisterInput {
Expand Down Expand Up @@ -150,7 +150,7 @@ mod tests {

#[tokio::test]
async fn test_register_operator() {
let mut sysdb = Box::new(TestSysDb::new());
let mut sysdb = Box::new(SysDb::Test(TestSysDb::new()));
let mut log = Box::new(InMemoryLog::new());
let collection_version = 0;
let collection_uuid_1 = Uuid::from_str("00000000-0000-0000-0000-000000000001").unwrap();
Expand Down Expand Up @@ -178,8 +178,14 @@ mod tests {
log_position: 0,
version: collection_version,
};
sysdb.add_collection(collection_1);
sysdb.add_collection(collection_2);

match *sysdb {
SysDb::Test(ref mut sysdb) => {
sysdb.add_collection(collection_1);
sysdb.add_collection(collection_2);
}
_ => panic!("Invalid sysdb type"),
}

let mut file_path_1 = HashMap::new();
file_path_1.insert("hnsw".to_string(), vec!["path_1".to_string()]);
Expand All @@ -205,8 +211,13 @@ mod tests {
metadata: None,
file_path: file_path_2.clone(),
};
sysdb.add_segment(segment_1);
sysdb.add_segment(segment_2);
match *sysdb {
SysDb::Test(ref mut sysdb) => {
sysdb.add_segment(segment_1);
sysdb.add_segment(segment_2);
}
_ => panic!("Invalid sysdb type"),
}

let mut file_path_3 = HashMap::new();
file_path_3.insert("hnsw".to_string(), vec!["path_3".to_string()]);
Expand Down
4 changes: 2 additions & 2 deletions rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub struct CompactOrchestrator {
collection_id: Uuid,
// Dependencies
log: Box<dyn Log>,
sysdb: Box<dyn SysDb>,
sysdb: Box<SysDb>,
blockfile_provider: BlockfileProvider,
hnsw_index_provider: HnswIndexProvider,
// State we hold across the execution
Expand Down Expand Up @@ -146,7 +146,7 @@ impl CompactOrchestrator {
system: System,
collection_id: Uuid,
log: Box<dyn Log>,
sysdb: Box<dyn SysDb>,
sysdb: Box<SysDb>,
blockfile_provider: BlockfileProvider,
hnsw_index_provider: HnswIndexProvider,
dispatcher: Box<dyn Receiver<TaskMessage>>,
Expand Down
10 changes: 5 additions & 5 deletions rust/worker/src/execution/orchestration/hnsw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub(crate) struct HnswQueryOrchestrator {
finish_dependency_count: u32,
// Services
log: Box<dyn Log>,
sysdb: Box<dyn SysDb>,
sysdb: Box<SysDb>,
dispatcher: Box<dyn Receiver<TaskMessage>>,
hnsw_index_provider: HnswIndexProvider,
blockfile_provider: BlockfileProvider,
Expand All @@ -149,7 +149,7 @@ impl HnswQueryOrchestrator {
include_embeddings: bool,
segment_id: Uuid,
log: Box<dyn Log>,
sysdb: Box<dyn SysDb>,
sysdb: Box<SysDb>,
hnsw_index_provider: HnswIndexProvider,
blockfile_provider: BlockfileProvider,
dispatcher: Box<dyn Receiver<TaskMessage>>,
Expand Down Expand Up @@ -419,7 +419,7 @@ impl HnswQueryOrchestrator {

async fn get_hnsw_segment_from_id(
&self,
mut sysdb: Box<dyn SysDb>,
mut sysdb: Box<SysDb>,
hnsw_segment_id: &Uuid,
) -> Result<Segment, Box<dyn ChromaError>> {
let segments = sysdb
Expand Down Expand Up @@ -449,7 +449,7 @@ impl HnswQueryOrchestrator {

async fn get_collection(
&self,
mut sysdb: Box<dyn SysDb>,
mut sysdb: Box<SysDb>,
collection_id: &Uuid,
) -> Result<Collection, Box<dyn ChromaError>> {
let child_span: tracing::Span =
Expand All @@ -475,7 +475,7 @@ impl HnswQueryOrchestrator {

async fn get_record_segment_for_collection(
&self,
mut sysdb: Box<dyn SysDb>,
mut sysdb: Box<SysDb>,
collection_id: &Uuid,
) -> Result<Segment, Box<dyn ChromaError>> {
let segments = sysdb
Expand Down
20 changes: 10 additions & 10 deletions rust/worker/src/execution/orchestration/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub(crate) struct MetadataQueryOrchestrator {
merge_dependency_count: u32,
// Services
log: Box<dyn Log>,
sysdb: Box<dyn SysDb>,
sysdb: Box<SysDb>,
dispatcher: Box<dyn Receiver<TaskMessage>>,
blockfile_provider: BlockfileProvider,
// Query params
Expand All @@ -84,7 +84,7 @@ pub(crate) struct CountQueryOrchestrator {
collection: Option<Collection>,
// Services
log: Box<dyn Log>,
sysdb: Box<dyn SysDb>,
sysdb: Box<SysDb>,
dispatcher: Box<dyn Receiver<TaskMessage>>,
blockfile_provider: BlockfileProvider,
// Result channel
Expand Down Expand Up @@ -130,7 +130,7 @@ impl CountQueryOrchestrator {
system: System,
metadata_segment_id: &Uuid,
log: Box<dyn Log>,
sysdb: Box<dyn SysDb>,
sysdb: Box<SysDb>,
dispatcher: Box<dyn Receiver<TaskMessage>>,
blockfile_provider: BlockfileProvider,
) -> Self {
Expand Down Expand Up @@ -247,7 +247,7 @@ impl CountQueryOrchestrator {

async fn get_metadata_segment_from_id(
&self,
mut sysdb: Box<dyn SysDb>,
mut sysdb: Box<SysDb>,
metadata_segment_id: &Uuid,
) -> Result<Segment, Box<dyn ChromaError>> {
let segments = sysdb
Expand Down Expand Up @@ -279,7 +279,7 @@ impl CountQueryOrchestrator {

async fn get_record_segment_from_collection_id(
&self,
mut sysdb: Box<dyn SysDb>,
mut sysdb: Box<SysDb>,
collection_id: &Uuid,
) -> Result<Segment, Box<dyn ChromaError>> {
let segments = sysdb
Expand Down Expand Up @@ -310,7 +310,7 @@ impl CountQueryOrchestrator {

async fn get_collection_from_id(
&self,
mut sysdb: Box<dyn SysDb>,
mut sysdb: Box<SysDb>,
collection_id: &Uuid,
ctx: &ComponentContext<Self>,
) -> Result<Collection, Box<dyn ChromaError>> {
Expand Down Expand Up @@ -451,7 +451,7 @@ impl MetadataQueryOrchestrator {
metadata_segment_id: &Uuid,
query_ids: Option<Vec<String>>,
log: Box<dyn Log>,
sysdb: Box<dyn SysDb>,
sysdb: Box<SysDb>,
dispatcher: Box<dyn Receiver<TaskMessage>>,
blockfile_provider: BlockfileProvider,
where_clause: Option<Where>,
Expand Down Expand Up @@ -605,7 +605,7 @@ impl MetadataQueryOrchestrator {

async fn get_metadata_segment_from_id(
&self,
mut sysdb: Box<dyn SysDb>,
mut sysdb: Box<SysDb>,
metadata_segment_id: &Uuid,
) -> Result<Segment, Box<dyn ChromaError>> {
let segments = sysdb
Expand Down Expand Up @@ -637,7 +637,7 @@ impl MetadataQueryOrchestrator {

async fn get_record_segment_from_collection_id(
&self,
mut sysdb: Box<dyn SysDb>,
mut sysdb: Box<SysDb>,
collection_id: &Uuid,
) -> Result<Segment, Box<dyn ChromaError>> {
let segments = sysdb
Expand Down Expand Up @@ -668,7 +668,7 @@ impl MetadataQueryOrchestrator {

async fn get_collection_from_id(
&self,
mut sysdb: Box<dyn SysDb>,
mut sysdb: Box<SysDb>,
collection_id: &Uuid,
ctx: &ComponentContext<Self>,
) -> Result<Collection, Box<dyn ChromaError>> {
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct WorkerServer {
dispatcher: Option<Box<dyn Receiver<TaskMessage>>>,
// Service dependencies
log: Box<dyn Log>,
sysdb: Box<dyn SysDb>,
sysdb: Box<SysDb>,
hnsw_index_provider: HnswIndexProvider,
blockfile_provider: BlockfileProvider,
port: u16,
Expand Down
Loading

0 comments on commit fce40bf

Please sign in to comment.