From e05cae87231b5eb620a53cc41b6568ffb6f6f176 Mon Sep 17 00:00:00 2001 From: Sanket Kedia Date: Thu, 9 Jan 2025 16:11:41 -0800 Subject: [PATCH] [ENH] Move system and task into its own crate (#3464) ## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - GC service needs the system and the dispatcher/worker framework ## Test plan *How are these changes tested?* - [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes None --- Cargo.lock | 20 +++++++ Cargo.toml | 3 +- rust/garbage_collector/src/config.rs | 8 +-- rust/system/Cargo.toml | 23 ++++++++ .../src/execution/config.rs | 0 .../src/execution/dispatcher.rs | 7 +-- rust/system/src/execution/mod.rs | 10 ++++ .../src/execution/operator.rs | 12 ++--- .../src/execution}/orchestrator.rs | 7 +-- .../src/execution/worker_thread.rs | 2 +- .../src/system => system/src}/executor.rs | 6 ++- rust/system/src/lib.rs | 17 ++++++ .../src/system => system/src}/receiver.rs | 6 +-- .../src/system => system/src}/scheduler.rs | 4 +- .../src/system => system/src}/system.rs | 6 +-- .../src/system => system/src}/types.rs | 20 +++---- rust/system/src/utils/mod.rs | 3 ++ rust/{worker => system}/src/utils/panic.rs | 0 .../system => system/src}/wrapped_message.rs | 0 rust/worker/Cargo.toml | 1 + rust/worker/benches/filter.rs | 2 +- rust/worker/benches/get.rs | 9 +--- rust/worker/benches/limit.rs | 2 +- rust/worker/benches/query.rs | 4 +- rust/worker/benches/spann.rs | 10 ++-- .../src/compactor/compaction_manager.rs | 10 ++-- rust/worker/src/config.rs | 4 +- rust/worker/src/execution/mod.rs | 6 --- .../operators/apply_log_to_segment_writer.rs | 2 +- .../operators/commit_segment_writer.rs | 2 +- .../src/execution/operators/count_records.rs | 12 ++--- .../src/execution/operators/fetch_log.rs | 10 ++-- rust/worker/src/execution/operators/filter.rs | 20 ++++--- .../operators/flush_segment_writer.rs | 6 +-- .../src/execution/operators/knn_hnsw.rs | 5 +- .../worker/src/execution/operators/knn_log.rs | 18 +++---- .../src/execution/operators/knn_merge.rs | 8 ++- .../src/execution/operators/knn_projection.rs | 14 +++-- rust/worker/src/execution/operators/limit.rs | 18 +++---- .../execution/operators/materialize_logs.rs | 2 +- .../src/execution/operators/partition.rs | 2 +- .../execution/operators/prefetch_record.rs | 15 +++--- .../src/execution/operators/projection.rs | 18 +++---- .../src/execution/operators/register.rs | 2 +- .../src/execution/operators/spann_bf_pl.rs | 8 ++- .../operators/spann_centers_search.rs | 7 +-- .../src/execution/operators/spann_fetch_pl.rs | 7 +-- .../execution/operators/spann_knn_merge.rs | 4 +- .../src/execution/orchestration/compact.rs | 54 +++++++++---------- .../src/execution/orchestration/count.rs | 19 ++----- .../worker/src/execution/orchestration/get.rs | 23 +++----- .../worker/src/execution/orchestration/knn.rs | 34 +++++------- .../src/execution/orchestration/knn_filter.rs | 27 ++++------ .../worker/src/execution/orchestration/mod.rs | 1 - .../src/execution/orchestration/spann_knn.rs | 50 ++++++++--------- rust/worker/src/lib.rs | 35 ++++++------ .../src/memberlist/memberlist_provider.rs | 5 +- rust/worker/src/server.rs | 9 ++-- rust/worker/src/system/mod.rs | 13 ----- rust/worker/src/utils/mod.rs | 3 -- 60 files changed, 305 insertions(+), 350 deletions(-) create mode 100644 rust/system/Cargo.toml rename rust/{worker => system}/src/execution/config.rs (100%) rename rust/{worker => system}/src/execution/dispatcher.rs (98%) create mode 100644 rust/system/src/execution/mod.rs rename rust/{worker => system}/src/execution/operator.rs (96%) rename rust/{worker/src/execution/orchestration => system/src/execution}/orchestrator.rs (95%) rename rust/{worker => system}/src/execution/worker_thread.rs (95%) rename rust/{worker/src/system => system/src}/executor.rs (95%) create mode 100644 rust/system/src/lib.rs rename rust/{worker/src/system => system/src}/receiver.rs (92%) rename rust/{worker/src/system => system/src}/scheduler.rs (99%) rename rust/{worker/src/system => system/src}/system.rs (98%) rename rust/{worker/src/system => system/src}/types.rs (95%) create mode 100644 rust/system/src/utils/mod.rs rename rust/{worker => system}/src/utils/panic.rs (100%) rename rust/{worker/src/system => system/src}/wrapped_message.rs (100%) delete mode 100644 rust/worker/src/system/mod.rs diff --git a/Cargo.lock b/Cargo.lock index c9cdaf21284..57ca85b1335 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1348,6 +1348,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "chroma-system" +version = "0.1.0" +dependencies = [ + "async-trait", + "chroma-config", + "chroma-error", + "futures", + "parking_lot", + "rand", + "serde", + "tempfile", + "thiserror 1.0.69", + "tokio", + "tokio-util", + "tracing", + "uuid", +] + [[package]] name = "chroma-types" version = "0.1.0" @@ -6813,6 +6832,7 @@ dependencies = [ "chroma-error", "chroma-index", "chroma-storage", + "chroma-system", "chroma-types", "criterion", "fastrace", diff --git a/Cargo.toml b/Cargo.toml index bf2b4ea21a0..38b45a7899e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" -members = ["rust/benchmark", "rust/blockstore", "rust/cache", "rust/chroma", "rust/config", "rust/distance", "rust/error", "rust/garbage_collector", "rust/index", "rust/load", "rust/storage", "rust/types", "rust/worker"] +members = ["rust/benchmark", "rust/blockstore", "rust/cache", "rust/chroma", "rust/config", "rust/distance", "rust/error", "rust/garbage_collector", "rust/index", "rust/load", "rust/storage", "rust/system", "rust/types", "rust/worker"] [workspace.dependencies] arrow = "52.2.0" @@ -43,6 +43,7 @@ chroma-cache = { path = "rust/cache" } chroma-types = { path = "rust/types" } chroma-index = { path = "rust/index" } chroma-distance = { path = "rust/distance" } +chroma-system = { path = "rust/system" } worker = { path = "rust/worker" } # Dev dependencies diff --git a/rust/garbage_collector/src/config.rs b/rust/garbage_collector/src/config.rs index 6e1b8e53d47..1e4b3c822b8 100644 --- a/rust/garbage_collector/src/config.rs +++ b/rust/garbage_collector/src/config.rs @@ -2,9 +2,9 @@ use figment::providers::{Env, Format, Yaml}; const DEFAULT_CONFIG_PATH: &str = "./garbage_collector_config.yaml"; -#[derive(Debug, serde::Deserialize)] -// TODO(rohitcpbot): Remove this dead code annotation. #[allow(dead_code)] +#[derive(Debug, serde::Deserialize)] +// TODO(Sanket): Remove this dead code annotation. pub(super) struct GarbageCollectorConfig { pub(super) service_name: String, pub(super) otel_endpoint: String, @@ -15,9 +15,9 @@ pub(super) struct GarbageCollectorConfig { sysdb_connection: SysdbConnectionConfig, } -#[derive(Debug, serde::Deserialize)] -// TODO(rohitcpbot): Remove this dead code annotation. #[allow(dead_code)] +#[derive(Debug, serde::Deserialize)] +// TODO(Sanket): Remove this dead code annotation. pub(super) struct SysdbConnectionConfig { host: String, port: u32, diff --git a/rust/system/Cargo.toml b/rust/system/Cargo.toml new file mode 100644 index 00000000000..68bed6a5514 --- /dev/null +++ b/rust/system/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "chroma-system" +version = "0.1.0" +edition = "2021" + +[lib] +path = "src/lib.rs" + +[dependencies] +thiserror = { workspace = true } +async-trait = { workspace = true } +futures = { workspace = true } +parking_lot = { workspace = true } +tracing = { workspace = true } +tokio = { workspace = true } +tokio-util = { workspace = true } +serde = { workspace = true } +uuid = { workspace = true } +rand = { workspace = true } +tempfile = { workspace = true } + +chroma-error = { workspace = true } +chroma-config = { workspace = true } diff --git a/rust/worker/src/execution/config.rs b/rust/system/src/execution/config.rs similarity index 100% rename from rust/worker/src/execution/config.rs rename to rust/system/src/execution/config.rs diff --git a/rust/worker/src/execution/dispatcher.rs b/rust/system/src/execution/dispatcher.rs similarity index 98% rename from rust/worker/src/execution/dispatcher.rs rename to rust/system/src/execution/dispatcher.rs index a7230f121f6..63c90fa5a53 100644 --- a/rust/worker/src/execution/dispatcher.rs +++ b/rust/system/src/execution/dispatcher.rs @@ -1,7 +1,7 @@ use super::operator::OperatorType; use super::{operator::TaskMessage, worker_thread::WorkerThread}; use crate::execution::config::DispatcherConfig; -use crate::system::{Component, ComponentContext, Handler, ReceiverForMessage, System}; +use crate::{Component, ComponentContext, Handler, ReceiverForMessage, System}; use async_trait::async_trait; use chroma_config::Configurable; use chroma_error::ChromaError; @@ -223,10 +223,7 @@ mod tests { use uuid::Uuid; use super::*; - use crate::{ - execution::operator::{wrap, Operator, TaskResult}, - system::{ComponentHandle, System}, - }; + use crate::{operator::*, ComponentHandle}; use std::{ collections::HashSet, sync::{ diff --git a/rust/system/src/execution/mod.rs b/rust/system/src/execution/mod.rs new file mode 100644 index 00000000000..404c9ed8e16 --- /dev/null +++ b/rust/system/src/execution/mod.rs @@ -0,0 +1,10 @@ +pub mod config; +pub mod dispatcher; +pub mod operator; +pub mod orchestrator; +pub mod worker_thread; + +pub use config::*; +pub use dispatcher::*; +pub use operator::*; +pub use orchestrator::*; diff --git a/rust/worker/src/execution/operator.rs b/rust/system/src/execution/operator.rs similarity index 96% rename from rust/worker/src/execution/operator.rs rename to rust/system/src/execution/operator.rs index 86a15fe0867..5a2460eae5c 100644 --- a/rust/worker/src/execution/operator.rs +++ b/rust/system/src/execution/operator.rs @@ -1,4 +1,4 @@ -use crate::{system::ReceiverForMessage, utils::PanicError}; +use crate::{utils::PanicError, ReceiverForMessage}; use async_trait::async_trait; use chroma_error::{ChromaError, ErrorCodes}; use futures::FutureExt; @@ -32,7 +32,7 @@ where } #[derive(Debug, Error)] -pub(super) enum TaskError { +pub enum TaskError { #[error("Panic occurred while handling task: {0:?}")] Panic(PanicError), #[error("Task failed with error: {0:?}")] @@ -54,13 +54,13 @@ where /// A task result is a wrapper around the result of a task. /// It contains the task id for tracking purposes. #[derive(Debug)] -pub(super) struct TaskResult { +pub struct TaskResult { result: Result>, task_id: Uuid, } impl TaskResult { - pub(super) fn into_inner(self) -> Result> { + pub fn into_inner(self) -> Result> { self.result } @@ -183,7 +183,7 @@ where } /// Wrap an operator and its input into a task message. -pub(super) fn wrap( +pub fn wrap( operator: Box>, input: Input, reply_channel: Box>>, @@ -211,7 +211,7 @@ mod tests { use crate::{ execution::dispatcher::Dispatcher, - system::{Component, ComponentContext, ComponentHandle, Handler, System}, + {Component, ComponentContext, ComponentHandle, Handler, System}, }; use super::*; diff --git a/rust/worker/src/execution/orchestration/orchestrator.rs b/rust/system/src/execution/orchestrator.rs similarity index 95% rename from rust/worker/src/execution/orchestration/orchestrator.rs rename to rust/system/src/execution/orchestrator.rs index 9c629325d1a..73247826c12 100644 --- a/rust/worker/src/execution/orchestration/orchestrator.rs +++ b/rust/system/src/execution/orchestrator.rs @@ -1,3 +1,4 @@ +use crate::{ChannelError, Component, ComponentContext, ComponentHandle, PanicError, System}; use async_trait::async_trait; use chroma_error::ChromaError; use core::fmt::Debug; @@ -5,11 +6,7 @@ use std::any::type_name; use tokio::sync::oneshot::{self, error::RecvError, Sender}; use tracing::Span; -use crate::{ - execution::{dispatcher::Dispatcher, operator::TaskMessage}, - system::{ChannelError, Component, ComponentContext, ComponentHandle, System}, - utils::PanicError, -}; +use crate::{Dispatcher, TaskMessage}; #[async_trait] pub trait Orchestrator: Debug + Send + Sized + 'static { diff --git a/rust/worker/src/execution/worker_thread.rs b/rust/system/src/execution/worker_thread.rs similarity index 95% rename from rust/worker/src/execution/worker_thread.rs rename to rust/system/src/execution/worker_thread.rs index da5c54d59e8..070830bb790 100644 --- a/rust/worker/src/execution/worker_thread.rs +++ b/rust/system/src/execution/worker_thread.rs @@ -1,5 +1,5 @@ use super::{dispatcher::TaskRequestMessage, operator::TaskMessage}; -use crate::system::{Component, ComponentContext, ComponentRuntime, Handler, ReceiverForMessage}; +use crate::{Component, ComponentContext, ComponentRuntime, Handler, ReceiverForMessage}; use async_trait::async_trait; use std::fmt::{Debug, Formatter, Result}; use tracing::{trace_span, Instrument, Span}; diff --git a/rust/worker/src/system/executor.rs b/rust/system/src/executor.rs similarity index 95% rename from rust/worker/src/system/executor.rs rename to rust/system/src/executor.rs index 09779e45340..9255c3b30b6 100644 --- a/rust/worker/src/system/executor.rs +++ b/rust/system/src/executor.rs @@ -1,5 +1,7 @@ -use super::{scheduler::Scheduler, system::System, Component, ComponentSender, WrappedMessage}; -use crate::system::ComponentContext; +use super::{ + scheduler::Scheduler, system::System, Component, ComponentContext, ComponentSender, + WrappedMessage, +}; use std::sync::Arc; use tokio::select; use tracing::{trace_span, Instrument, Span}; diff --git a/rust/system/src/lib.rs b/rust/system/src/lib.rs new file mode 100644 index 00000000000..5be4faf2ac0 --- /dev/null +++ b/rust/system/src/lib.rs @@ -0,0 +1,17 @@ +pub mod execution; +pub mod executor; +pub mod receiver; +pub mod scheduler; +#[allow(clippy::module_inception)] +pub mod system; +pub mod types; +pub mod utils; +pub mod wrapped_message; + +// Re-export types +pub use execution::*; +pub use receiver::*; +pub use system::*; +pub use types::*; +pub use utils::*; +pub(crate) use wrapped_message::*; diff --git a/rust/worker/src/system/receiver.rs b/rust/system/src/receiver.rs similarity index 92% rename from rust/worker/src/system/receiver.rs rename to rust/system/src/receiver.rs index cb688249247..aa6e4b869ee 100644 --- a/rust/worker/src/system/receiver.rs +++ b/rust/system/src/receiver.rs @@ -6,9 +6,7 @@ use thiserror::Error; /// A ReceiverForMessage is generic over a message type, and useful if you want to send a given message type to any component that can handle it. #[async_trait] -pub(crate) trait ReceiverForMessage: - Send + Sync + Debug + ReceiverForMessageClone -{ +pub trait ReceiverForMessage: Send + Sync + Debug + ReceiverForMessageClone { async fn send( &self, message: M, @@ -16,7 +14,7 @@ pub(crate) trait ReceiverForMessage: ) -> Result<(), ChannelError>; } -pub(crate) trait ReceiverForMessageClone { +pub trait ReceiverForMessageClone { fn clone_box(&self) -> Box>; } diff --git a/rust/worker/src/system/scheduler.rs b/rust/system/src/scheduler.rs similarity index 99% rename from rust/worker/src/system/scheduler.rs rename to rust/system/src/scheduler.rs index 725dca0c671..6bf7252b529 100644 --- a/rust/worker/src/system/scheduler.rs +++ b/rust/system/src/scheduler.rs @@ -14,7 +14,7 @@ pub(crate) struct SchedulerTaskHandle { } #[derive(Clone, Debug)] -pub(crate) struct Scheduler { +pub struct Scheduler { handles: Arc>>, } @@ -28,7 +28,7 @@ impl Scheduler { /// Schedule a message to be sent to the component after the specified duration. /// /// `span_factory` is called immediately before sending the scheduled message to the component. - pub(crate) fn schedule( + pub fn schedule( &self, message: M, duration: Duration, diff --git a/rust/worker/src/system/system.rs b/rust/system/src/system.rs similarity index 98% rename from rust/worker/src/system/system.rs rename to rust/system/src/system.rs index b94458baafa..d754b5094fd 100644 --- a/rust/worker/src/system/system.rs +++ b/rust/system/src/system.rs @@ -24,7 +24,7 @@ struct Inner { } impl System { - pub(crate) fn new() -> System { + pub fn new() -> System { System { inner: Arc::new(Inner { scheduler: Scheduler::new(), @@ -87,11 +87,11 @@ impl System { tokio::spawn(async move { stream_loop(stream, &ctx).await }); } - pub(crate) async fn stop(&self) { + pub async fn stop(&self) { self.inner.scheduler.stop(); } - pub(crate) async fn join(&self) { + pub async fn join(&self) { self.inner.scheduler.join().await; } } diff --git a/rust/worker/src/system/types.rs b/rust/system/src/types.rs similarity index 95% rename from rust/worker/src/system/types.rs rename to rust/system/src/types.rs index a423d89202d..98d4ed9435d 100644 --- a/rust/worker/src/system/types.rs +++ b/rust/system/src/types.rs @@ -8,7 +8,7 @@ use tokio::task::JoinError; use super::{system::System, ReceiverForMessage}; -pub(crate) trait Message: Debug + Send + 'static {} +pub trait Message: Debug + Send + 'static {} impl Message for M {} #[derive(Debug, PartialEq, Clone, Copy)] @@ -55,7 +55,7 @@ pub trait Component: Send + Sized + Debug + 'static { /// # Methods /// - handle: Handle a message #[async_trait] -pub(crate) trait Handler +pub trait Handler where Self: Component + Sized + 'static, { @@ -72,7 +72,7 @@ where /// # Methods /// - handle: Handle a message from a stream /// - register_stream: Register a stream to be processed, this is provided and you do not need to implement it -pub(crate) trait StreamHandler +pub trait StreamHandler where Self: Component + 'static + Handler, M: Message, @@ -216,14 +216,14 @@ impl ComponentHandle { } } - pub(crate) fn stop(&mut self) { + pub fn stop(&mut self) { let mut state = self.state.lock(); self.cancellation_token.cancel(); *state = ComponentState::Stopped; } /// Consumes the underlying join handle. Panics if it is consumed twice. - pub(crate) async fn join(&mut self) -> Result<(), JoinError> { + pub async fn join(&mut self) -> Result<(), JoinError> { if let Some(join_handle) = &mut self.join_handle { join_handle.consume().await } else { @@ -236,7 +236,7 @@ impl ComponentHandle { return *self.state.lock(); } - pub(crate) fn receiver(&self) -> Box> + pub fn receiver(&self) -> Box> where C: Component + Handler, M: Message, @@ -244,7 +244,7 @@ impl ComponentHandle { Box::new(self.sender.clone()) } - pub(crate) async fn send( + pub async fn send( &mut self, message: M, tracing_context: Option, @@ -277,12 +277,12 @@ where { pub(crate) system: System, pub(crate) sender: ComponentSender, - pub(crate) cancellation_token: tokio_util::sync::CancellationToken, - pub(crate) scheduler: Scheduler, + pub cancellation_token: tokio_util::sync::CancellationToken, + pub scheduler: Scheduler, } impl ComponentContext { - pub(crate) fn receiver(&self) -> Box> + pub fn receiver(&self) -> Box> where C: Component + Handler, M: Message, diff --git a/rust/system/src/utils/mod.rs b/rust/system/src/utils/mod.rs new file mode 100644 index 00000000000..2c388efa24e --- /dev/null +++ b/rust/system/src/utils/mod.rs @@ -0,0 +1,3 @@ +pub mod panic; + +pub use panic::*; diff --git a/rust/worker/src/utils/panic.rs b/rust/system/src/utils/panic.rs similarity index 100% rename from rust/worker/src/utils/panic.rs rename to rust/system/src/utils/panic.rs diff --git a/rust/worker/src/system/wrapped_message.rs b/rust/system/src/wrapped_message.rs similarity index 100% rename from rust/worker/src/system/wrapped_message.rs rename to rust/system/src/wrapped_message.rs diff --git a/rust/worker/Cargo.toml b/rust/worker/Cargo.toml index 26bdc63bd9d..7419b59915a 100644 --- a/rust/worker/Cargo.toml +++ b/rust/worker/Cargo.toml @@ -56,6 +56,7 @@ chroma-types = { workspace = true } chroma-cache = { workspace = true } chroma-index = { workspace = true } chroma-distance = { workspace = true } +chroma-system = { workspace = true } fastrace = "0.7" fastrace-opentelemetry = "0.8" diff --git a/rust/worker/benches/filter.rs b/rust/worker/benches/filter.rs index 57f54644b26..824a410e18d 100644 --- a/rust/worker/benches/filter.rs +++ b/rust/worker/benches/filter.rs @@ -1,13 +1,13 @@ use std::iter::once; use chroma_benchmark::benchmark::{bench_run, tokio_multi_thread}; +use chroma_system::Operator; use chroma_types::{ BooleanOperator, Chunk, DirectWhereComparison, MetadataValue, PrimitiveOperator, Where, WhereChildren, WhereComparison, }; use criterion::Criterion; use criterion::{criterion_group, criterion_main}; -use worker::execution::operator::Operator; use worker::execution::operators::filter::{FilterInput, FilterOperator}; use worker::log::test::upsert_generator; use worker::segment::test::TestSegment; diff --git a/rust/worker/benches/get.rs b/rust/worker/benches/get.rs index 263f4d07f80..5960d48ad6f 100644 --- a/rust/worker/benches/get.rs +++ b/rust/worker/benches/get.rs @@ -3,6 +3,7 @@ mod load; use chroma_benchmark::benchmark::{bench_run, tokio_multi_thread}; use chroma_config::Configurable; +use chroma_system::{ComponentHandle, Dispatcher, Orchestrator, System}; use criterion::{criterion_group, criterion_main, Criterion}; use load::{ all_projection, always_false_filter_for_modulo_metadata, @@ -10,13 +11,7 @@ use load::{ trivial_filter, trivial_limit, trivial_projection, }; use worker::{ - config::RootConfig, - execution::{ - dispatcher::Dispatcher, - orchestration::{get::GetOrchestrator, orchestrator::Orchestrator}, - }, - segment::test::TestSegment, - system::{ComponentHandle, System}, + config::RootConfig, execution::orchestration::get::GetOrchestrator, segment::test::TestSegment, }; fn trivial_get( diff --git a/rust/worker/benches/limit.rs b/rust/worker/benches/limit.rs index 7a13c174acc..bd47a923fe9 100644 --- a/rust/worker/benches/limit.rs +++ b/rust/worker/benches/limit.rs @@ -1,8 +1,8 @@ use chroma_benchmark::benchmark::{bench_run, tokio_multi_thread}; +use chroma_system::Operator; use chroma_types::{Chunk, SignedRoaringBitmap}; use criterion::Criterion; use criterion::{criterion_group, criterion_main}; -use worker::execution::operator::Operator; use worker::execution::operators::limit::{LimitInput, LimitOperator}; use worker::log::test::upsert_generator; use worker::segment::test::TestSegment; diff --git a/rust/worker/benches/query.rs b/rust/worker/benches/query.rs index 5fac8d05829..7d44206f595 100644 --- a/rust/worker/benches/query.rs +++ b/rust/worker/benches/query.rs @@ -6,6 +6,7 @@ use chroma_benchmark::{ datasets::sift::Sift1MData, }; use chroma_config::Configurable; +use chroma_system::{ComponentHandle, Dispatcher, Orchestrator, System}; use criterion::{criterion_group, criterion_main, Criterion}; use futures::{stream, StreamExt, TryStreamExt}; use load::{ @@ -16,16 +17,13 @@ use rand::{seq::SliceRandom, thread_rng}; use worker::{ config::RootConfig, execution::{ - dispatcher::Dispatcher, operators::{knn::KnnOperator, knn_projection::KnnProjectionOperator}, orchestration::{ knn::KnnOrchestrator, knn_filter::{KnnFilterOrchestrator, KnnFilterOutput}, - orchestrator::Orchestrator, }, }, segment::test::TestSegment, - system::{ComponentHandle, System}, }; fn trivial_knn_filter( diff --git a/rust/worker/benches/spann.rs b/rust/worker/benches/spann.rs index 46fe99b0141..ec5ef4aec80 100644 --- a/rust/worker/benches/spann.rs +++ b/rust/worker/benches/spann.rs @@ -14,17 +14,15 @@ use chroma_index::{ }, }; use chroma_storage::{local::LocalStorage, Storage}; +use chroma_system::Operator; use chroma_types::CollectionUuid; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use futures::StreamExt; use rand::seq::SliceRandom; use roaring::RoaringBitmap; -use worker::execution::{ - operator::Operator, - operators::{ - spann_bf_pl::{SpannBfPlInput, SpannBfPlOperator}, - spann_knn_merge::{SpannKnnMergeInput, SpannKnnMergeOperator}, - }, +use worker::execution::operators::{ + spann_bf_pl::{SpannBfPlInput, SpannBfPlOperator}, + spann_knn_merge::{SpannKnnMergeInput, SpannKnnMergeOperator}, }; fn get_records(runtime: &tokio::runtime::Runtime) -> Vec<(u32, Vec)> { diff --git a/rust/worker/src/compactor/compaction_manager.rs b/rust/worker/src/compactor/compaction_manager.rs index 9e6e3147c25..99354581277 100644 --- a/rust/worker/src/compactor/compaction_manager.rs +++ b/rust/worker/src/compactor/compaction_manager.rs @@ -3,21 +3,21 @@ use super::scheduler_policy::LasCompactionTimeSchedulerPolicy; use crate::compactor::types::CompactionJob; use crate::compactor::types::ScheduleMessage; use crate::config::CompactionServiceConfig; -use crate::execution::dispatcher::Dispatcher; -use crate::execution::orchestration::orchestrator::Orchestrator; use crate::execution::orchestration::CompactOrchestrator; use crate::execution::orchestration::CompactionResponse; use crate::log::log::Log; use crate::memberlist::Memberlist; use crate::sysdb; use crate::sysdb::sysdb::SysDb; -use crate::system::{Component, ComponentContext, ComponentHandle, Handler, System}; use async_trait::async_trait; use chroma_blockstore::provider::BlockfileProvider; use chroma_config::Configurable; use chroma_error::{ChromaError, ErrorCodes}; use chroma_index::hnsw_provider::HnswIndexProvider; use chroma_storage::Storage; +use chroma_system::Dispatcher; +use chroma_system::Orchestrator; +use chroma_system::{Component, ComponentContext, ComponentHandle, Handler, System}; use chroma_types::CollectionUuid; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -276,7 +276,7 @@ impl Component for CompactionManager { self.compaction_manager_queue_size } - async fn start(&mut self, ctx: &crate::system::ComponentContext) -> () { + async fn start(&mut self, ctx: &ComponentContext) -> () { println!("Starting CompactionManager"); ctx.scheduler .schedule(ScheduleMessage {}, self.compaction_interval, ctx, || { @@ -329,13 +329,13 @@ mod tests { use super::*; use crate::assignment::assignment_policy::AssignmentPolicy; use crate::assignment::assignment_policy::RendezvousHashingAssignmentPolicy; - use crate::execution::dispatcher::Dispatcher; use crate::log::log::InMemoryLog; use crate::log::log::InternalLogRecord; use crate::sysdb::test_sysdb::TestSysDb; use chroma_blockstore::arrow::config::TEST_MAX_BLOCK_SIZE_BYTES; use chroma_cache::{new_cache_for_test, new_non_persistent_cache_for_test}; use chroma_storage::local::LocalStorage; + use chroma_system::Dispatcher; use chroma_types::SegmentUuid; use chroma_types::{Collection, LogRecord, Operation, OperationRecord, Segment}; use std::collections::HashMap; diff --git a/rust/worker/src/config.rs b/rust/worker/src/config.rs index 63b9b524e8b..24b1f4586dd 100644 --- a/rust/worker/src/config.rs +++ b/rust/worker/src/config.rs @@ -108,7 +108,7 @@ pub struct QueryServiceConfig { pub(crate) sysdb: crate::sysdb::config::SysDbConfig, pub(crate) storage: chroma_storage::config::StorageConfig, pub(crate) log: crate::log::config::LogConfig, - pub dispatcher: crate::execution::config::DispatcherConfig, + pub dispatcher: chroma_system::DispatcherConfig, pub(crate) blockfile_provider: chroma_blockstore::config::BlockfileProviderConfig, pub(crate) hnsw_provider: chroma_index::config::HnswProviderConfig, } @@ -135,7 +135,7 @@ pub struct CompactionServiceConfig { pub(crate) sysdb: crate::sysdb::config::SysDbConfig, pub(crate) storage: chroma_storage::config::StorageConfig, pub(crate) log: crate::log::config::LogConfig, - pub(crate) dispatcher: crate::execution::config::DispatcherConfig, + pub(crate) dispatcher: chroma_system::DispatcherConfig, pub(crate) compactor: crate::compactor::config::CompactorConfig, pub(crate) blockfile_provider: chroma_blockstore::config::BlockfileProviderConfig, pub(crate) hnsw_provider: chroma_index::config::HnswProviderConfig, diff --git a/rust/worker/src/execution/mod.rs b/rust/worker/src/execution/mod.rs index 91a2f089e27..eb43815c3b7 100644 --- a/rust/worker/src/execution/mod.rs +++ b/rust/worker/src/execution/mod.rs @@ -1,8 +1,2 @@ -pub(crate) mod config; -mod worker_thread; - -// Required for benchmark -pub mod dispatcher; -pub mod operator; pub mod operators; pub mod orchestration; diff --git a/rust/worker/src/execution/operators/apply_log_to_segment_writer.rs b/rust/worker/src/execution/operators/apply_log_to_segment_writer.rs index 1652bb7b99b..cdefbd7361d 100644 --- a/rust/worker/src/execution/operators/apply_log_to_segment_writer.rs +++ b/rust/worker/src/execution/operators/apply_log_to_segment_writer.rs @@ -1,4 +1,3 @@ -use crate::execution::operator::Operator; use crate::segment::metadata_segment::MetadataSegmentError; use crate::segment::record_segment::ApplyMaterializedLogError; use crate::segment::record_segment::RecordSegmentReader; @@ -9,6 +8,7 @@ use crate::segment::MaterializeLogsResult; use async_trait::async_trait; use chroma_error::ChromaError; use chroma_error::ErrorCodes; +use chroma_system::Operator; use chroma_types::SegmentUuid; use thiserror::Error; use tracing::Instrument; diff --git a/rust/worker/src/execution/operators/commit_segment_writer.rs b/rust/worker/src/execution/operators/commit_segment_writer.rs index e6e15a60ccf..e8d74e8db93 100644 --- a/rust/worker/src/execution/operators/commit_segment_writer.rs +++ b/rust/worker/src/execution/operators/commit_segment_writer.rs @@ -1,9 +1,9 @@ -use crate::execution::operator::Operator; use crate::segment::ChromaSegmentFlusher; use crate::segment::ChromaSegmentWriter; use async_trait::async_trait; use chroma_error::ChromaError; use chroma_error::ErrorCodes; +use chroma_system::Operator; use thiserror::Error; use tracing::Instrument; diff --git a/rust/worker/src/execution/operators/count_records.rs b/rust/worker/src/execution/operators/count_records.rs index da4796d5580..3538f4e86ff 100644 --- a/rust/worker/src/execution/operators/count_records.rs +++ b/rust/worker/src/execution/operators/count_records.rs @@ -1,10 +1,8 @@ -use crate::{ - execution::operator::Operator, - segment::record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, -}; +use crate::segment::record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}; use async_trait::async_trait; use chroma_blockstore::provider::BlockfileProvider; use chroma_error::{ChromaError, ErrorCodes}; +use chroma_system::Operator; use chroma_types::{Chunk, LogRecord, Operation, Segment}; use std::collections::HashSet; use thiserror::Error; @@ -202,13 +200,11 @@ mod tests { use crate::segment::materialize_logs; use crate::segment::record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}; use crate::{ - execution::{ - operator::Operator, - operators::count_records::{CountRecordsInput, CountRecordsOperator}, - }, + execution::operators::count_records::{CountRecordsInput, CountRecordsOperator}, segment::record_segment::RecordSegmentWriter, }; use chroma_blockstore::provider::BlockfileProvider; + use chroma_system::Operator; use chroma_types::{Chunk, CollectionUuid, LogRecord, Operation, OperationRecord, SegmentUuid}; use std::{collections::HashMap, str::FromStr}; use tracing::{Instrument, Span}; diff --git a/rust/worker/src/execution/operators/fetch_log.rs b/rust/worker/src/execution/operators/fetch_log.rs index e49f06ae0e3..ba942ffb553 100644 --- a/rust/worker/src/execution/operators/fetch_log.rs +++ b/rust/worker/src/execution/operators/fetch_log.rs @@ -1,16 +1,13 @@ use std::time::{SystemTime, SystemTimeError, UNIX_EPOCH}; +use crate::log::log::{Log, PullLogsError}; use async_trait::async_trait; use chroma_error::{ChromaError, ErrorCodes}; +use chroma_system::{Operator, OperatorType}; use chroma_types::{Chunk, CollectionUuid, LogRecord}; use thiserror::Error; use tracing::trace; -use crate::{ - execution::operator::{Operator, OperatorType}, - log::log::{Log, PullLogsError}, -}; - /// The `FetchLogOperator` fetches logs from the log service /// /// # Parameters @@ -111,10 +108,11 @@ impl Operator for FetchLogOperator { #[cfg(test)] mod tests { + use chroma_system::Operator; use chroma_types::CollectionUuid; use crate::{ - execution::{operator::Operator, operators::fetch_log::FetchLogOperator}, + execution::operators::fetch_log::FetchLogOperator, log::{ log::{InMemoryLog, InternalLogRecord}, test::{upsert_generator, LogGenerator}, diff --git a/rust/worker/src/execution/operators/filter.rs b/rust/worker/src/execution/operators/filter.rs index 890488b7407..570312616f3 100644 --- a/rust/worker/src/execution/operators/filter.rs +++ b/rust/worker/src/execution/operators/filter.rs @@ -3,10 +3,17 @@ use std::{ ops::{BitAnd, BitOr, Bound}, }; +use crate::segment::{ + materialize_logs, + metadata_segment::{MetadataSegmentError, MetadataSegmentReader}, + record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, + LogMaterializerError, MaterializeLogsResult, +}; use async_trait::async_trait; use chroma_blockstore::provider::BlockfileProvider; use chroma_error::{ChromaError, ErrorCodes}; use chroma_index::metadata::types::MetadataIndexError; +use chroma_system::Operator; use chroma_types::{ BooleanOperator, Chunk, DirectDocumentComparison, DirectWhereComparison, DocumentOperator, LogRecord, MaterializedLogOperation, MetadataSetValue, MetadataValue, PrimitiveOperator, @@ -16,16 +23,6 @@ use roaring::RoaringBitmap; use thiserror::Error; use tracing::{trace, Instrument, Span}; -use crate::{ - execution::operator::Operator, - segment::{ - materialize_logs, - metadata_segment::{MetadataSegmentError, MetadataSegmentReader}, - record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, - LogMaterializerError, MaterializeLogsResult, - }, -}; - /// The `FilterOperator` filters the collection with specified criteria /// /// # Parameters @@ -504,6 +501,7 @@ impl Operator for FilterOperator { #[cfg(test)] mod tests { + use chroma_system::Operator; use chroma_types::{ BooleanOperator, DirectDocumentComparison, DirectWhereComparison, MetadataSetValue, MetadataValue, PrimitiveOperator, SetOperator, SignedRoaringBitmap, Where, WhereChildren, @@ -511,7 +509,7 @@ mod tests { }; use crate::{ - execution::{operator::Operator, operators::filter::FilterOperator}, + execution::operators::filter::FilterOperator, log::test::{add_delete_generator, int_as_id, LogGenerator}, segment::test::TestSegment, }; diff --git a/rust/worker/src/execution/operators/flush_segment_writer.rs b/rust/worker/src/execution/operators/flush_segment_writer.rs index 38574a4e2e4..c7b8fd6f71f 100644 --- a/rust/worker/src/execution/operators/flush_segment_writer.rs +++ b/rust/worker/src/execution/operators/flush_segment_writer.rs @@ -1,4 +1,4 @@ -use crate::execution::operator::Operator; +use chroma_system::Operator; use crate::segment::ChromaSegmentFlusher; use async_trait::async_trait; use chroma_error::ChromaError; @@ -61,8 +61,8 @@ impl Operator for FlushSegmen "FlushSegmentWriterOperator" } - fn get_type(&self) -> crate::execution::operator::OperatorType { - crate::execution::operator::OperatorType::IO + fn get_type(&self) -> chroma_system::OperatorType { + chroma_system::OperatorType::IO } async fn run( diff --git a/rust/worker/src/execution/operators/knn_hnsw.rs b/rust/worker/src/execution/operators/knn_hnsw.rs index 4e80971d026..29b02e2d1ff 100644 --- a/rust/worker/src/execution/operators/knn_hnsw.rs +++ b/rust/worker/src/execution/operators/knn_hnsw.rs @@ -4,9 +4,8 @@ use chroma_error::{ChromaError, ErrorCodes}; use chroma_types::SignedRoaringBitmap; use thiserror::Error; -use crate::{ - execution::operator::Operator, segment::distributed_hnsw_segment::DistributedHNSWSegmentReader, -}; +use crate::segment::distributed_hnsw_segment::DistributedHNSWSegmentReader; +use chroma_system::Operator; use super::knn::{KnnOperator, RecordDistance}; diff --git a/rust/worker/src/execution/operators/knn_log.rs b/rust/worker/src/execution/operators/knn_log.rs index e192d95d20d..c6c68be9f79 100644 --- a/rust/worker/src/execution/operators/knn_log.rs +++ b/rust/worker/src/execution/operators/knn_log.rs @@ -1,21 +1,18 @@ use std::collections::BinaryHeap; +use crate::segment::{ + materialize_logs, + record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, + LogMaterializerError, +}; use async_trait::async_trait; use chroma_blockstore::provider::BlockfileProvider; use chroma_distance::{normalize, DistanceFunction}; use chroma_error::ChromaError; +use chroma_system::Operator; use chroma_types::{MaterializedLogOperation, Segment, SignedRoaringBitmap}; use thiserror::Error; -use crate::{ - execution::operator::Operator, - segment::{ - materialize_logs, - record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, - LogMaterializerError, - }, -}; - use super::{ fetch_log::{FetchLogError, FetchLogOutput}, knn::{KnnOperator, RecordDistance}, @@ -131,10 +128,11 @@ impl Operator for KnnOperator { #[cfg(test)] mod tests { use chroma_distance::{normalize, DistanceFunction}; + use chroma_system::Operator; use chroma_types::SignedRoaringBitmap; use crate::{ - execution::{operator::Operator, operators::knn::KnnOperator}, + execution::operators::knn::KnnOperator, log::test::{random_embedding, upsert_generator, LogGenerator, TEST_EMBEDDING_DIMENSION}, segment::test::TestSegment, }; diff --git a/rust/worker/src/execution/operators/knn_merge.rs b/rust/worker/src/execution/operators/knn_merge.rs index fa3981328bb..cd482c04d94 100644 --- a/rust/worker/src/execution/operators/knn_merge.rs +++ b/rust/worker/src/execution/operators/knn_merge.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; -use crate::execution::operator::Operator; +use chroma_system::Operator; use super::knn::RecordDistance; @@ -83,10 +83,8 @@ impl Operator for KnnMergeOperator { #[cfg(test)] mod tests { - use crate::execution::{ - operator::Operator, - operators::{knn::RecordDistance, knn_merge::KnnMergeOperator}, - }; + use crate::execution::operators::{knn::RecordDistance, knn_merge::KnnMergeOperator}; + use chroma_system::Operator; use super::KnnMergeInput; diff --git a/rust/worker/src/execution/operators/knn_projection.rs b/rust/worker/src/execution/operators/knn_projection.rs index 7883006320f..a5df82d66ca 100644 --- a/rust/worker/src/execution/operators/knn_projection.rs +++ b/rust/worker/src/execution/operators/knn_projection.rs @@ -1,12 +1,12 @@ +use crate::execution::operators::projection::ProjectionInput; use async_trait::async_trait; use chroma_blockstore::provider::BlockfileProvider; use chroma_error::ChromaError; +use chroma_system::Operator; use chroma_types::Segment; use thiserror::Error; use tracing::trace; -use crate::execution::{operator::Operator, operators::projection::ProjectionInput}; - use super::{ fetch_log::FetchLogOutput, knn::RecordDistance, @@ -121,16 +121,14 @@ impl Operator for KnnProjectionOperator #[cfg(test)] mod tests { use crate::{ - execution::{ - operator::Operator, - operators::{ - knn::RecordDistance, knn_projection::KnnProjectionOperator, - projection::ProjectionOperator, - }, + execution::operators::{ + knn::RecordDistance, knn_projection::KnnProjectionOperator, + projection::ProjectionOperator, }, log::test::{int_as_id, upsert_generator, LogGenerator}, segment::test::TestSegment, }; + use chroma_system::Operator; use super::KnnProjectionInput; diff --git a/rust/worker/src/execution/operators/limit.rs b/rust/worker/src/execution/operators/limit.rs index df1f2cc5c5e..3d59ec53201 100644 --- a/rust/worker/src/execution/operators/limit.rs +++ b/rust/worker/src/execution/operators/limit.rs @@ -1,23 +1,20 @@ use std::{cmp::Ordering, num::TryFromIntError}; +use crate::segment::{ + materialize_logs, + record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, + LogMaterializerError, +}; use async_trait::async_trait; use chroma_blockstore::provider::BlockfileProvider; use chroma_error::{ChromaError, ErrorCodes}; +use chroma_system::Operator; use chroma_types::{Chunk, LogRecord, MaterializedLogOperation, Segment, SignedRoaringBitmap}; use futures::StreamExt; use roaring::RoaringBitmap; use thiserror::Error; use tracing::{trace, Instrument, Span}; -use crate::{ - execution::operator::Operator, - segment::{ - materialize_logs, - record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, - LogMaterializerError, - }, -}; - /// The `LimitOperator` selects a range or records sorted by their offset ids /// /// # Parameters @@ -280,11 +277,12 @@ impl Operator for LimitOperator { #[cfg(test)] mod tests { + use chroma_system::Operator; use chroma_types::SignedRoaringBitmap; use roaring::RoaringBitmap; use crate::{ - execution::{operator::Operator, operators::limit::LimitOperator}, + execution::operators::limit::LimitOperator, log::test::{upsert_generator, LogGenerator}, segment::test::TestSegment, }; diff --git a/rust/worker/src/execution/operators/materialize_logs.rs b/rust/worker/src/execution/operators/materialize_logs.rs index afb4ea3ac43..da4a2f0f1fc 100644 --- a/rust/worker/src/execution/operators/materialize_logs.rs +++ b/rust/worker/src/execution/operators/materialize_logs.rs @@ -1,4 +1,4 @@ -use crate::execution::operator::Operator; +use chroma_system::Operator; use crate::segment::record_segment::RecordSegmentReaderCreationError; use crate::segment::{materialize_logs, record_segment::RecordSegmentReader}; use crate::segment::{LogMaterializerError, MaterializeLogsResult}; diff --git a/rust/worker/src/execution/operators/partition.rs b/rust/worker/src/execution/operators/partition.rs index 16461598321..ca077395c42 100644 --- a/rust/worker/src/execution/operators/partition.rs +++ b/rust/worker/src/execution/operators/partition.rs @@ -1,4 +1,4 @@ -use crate::execution::operator::Operator; +use chroma_system::Operator; use async_trait::async_trait; use chroma_error::{ChromaError, ErrorCodes}; use chroma_types::{Chunk, LogRecord}; diff --git a/rust/worker/src/execution/operators/prefetch_record.rs b/rust/worker/src/execution/operators/prefetch_record.rs index 3ae9a66b6a7..faf3ad34690 100644 --- a/rust/worker/src/execution/operators/prefetch_record.rs +++ b/rust/worker/src/execution/operators/prefetch_record.rs @@ -1,19 +1,16 @@ use std::collections::HashSet; +use crate::segment::{ + materialize_logs, + record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, + LogMaterializerError, +}; use async_trait::async_trait; use chroma_error::{ChromaError, ErrorCodes}; +use chroma_system::Operator; use thiserror::Error; use tracing::{trace, Instrument, Span}; -use crate::{ - execution::operator::Operator, - segment::{ - materialize_logs, - record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, - LogMaterializerError, - }, -}; - use super::projection::ProjectionInput; /// The `PrefetchRecordOperator` prefetches the relevant records from the record segments to the cache diff --git a/rust/worker/src/execution/operators/projection.rs b/rust/worker/src/execution/operators/projection.rs index 99c0149debb..8045437efc3 100644 --- a/rust/worker/src/execution/operators/projection.rs +++ b/rust/worker/src/execution/operators/projection.rs @@ -1,21 +1,18 @@ use std::collections::{HashMap, HashSet}; +use crate::segment::{ + materialize_logs, + record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, + LogMaterializerError, +}; use async_trait::async_trait; use chroma_blockstore::provider::BlockfileProvider; use chroma_error::{ChromaError, ErrorCodes}; +use chroma_system::Operator; use chroma_types::{Chunk, LogRecord, Metadata, Segment}; use thiserror::Error; use tracing::{error, trace, Instrument, Span}; -use crate::{ - execution::operator::Operator, - segment::{ - materialize_logs, - record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, - LogMaterializerError, - }, -}; - /// The `ProjectionOperator` retrieves record content by offset ids /// /// # Parameters @@ -180,10 +177,11 @@ impl Operator for ProjectionOperator { #[cfg(test)] mod tests { use crate::{ - execution::{operator::Operator, operators::projection::ProjectionOperator}, + execution::operators::projection::ProjectionOperator, log::test::{int_as_id, upsert_generator, LogGenerator}, segment::test::TestSegment, }; + use chroma_system::Operator; use super::ProjectionInput; diff --git a/rust/worker/src/execution/operators/register.rs b/rust/worker/src/execution/operators/register.rs index 36568326cfb..e11bf5d7bb3 100644 --- a/rust/worker/src/execution/operators/register.rs +++ b/rust/worker/src/execution/operators/register.rs @@ -1,4 +1,4 @@ -use crate::execution::operator::Operator; +use chroma_system::Operator; use crate::log::log::Log; use crate::log::log::UpdateCollectionLogOffsetError; use crate::sysdb::sysdb::FlushCompactionError; diff --git a/rust/worker/src/execution/operators/spann_bf_pl.rs b/rust/worker/src/execution/operators/spann_bf_pl.rs index c274717f664..d3747c448d3 100644 --- a/rust/worker/src/execution/operators/spann_bf_pl.rs +++ b/rust/worker/src/execution/operators/spann_bf_pl.rs @@ -7,7 +7,7 @@ use chroma_index::spann::types::SpannPosting; use chroma_types::SignedRoaringBitmap; use thiserror::Error; -use crate::execution::operator::Operator; +use chroma_system::Operator; use super::knn::RecordDistance; @@ -93,13 +93,11 @@ impl Operator for SpannBfPlOperator { mod test { use chroma_distance::DistanceFunction; use chroma_index::spann::types::SpannPosting; + use chroma_system::Operator; use chroma_types::SignedRoaringBitmap; use roaring::RoaringBitmap; - use crate::execution::{ - operator::Operator, - operators::spann_bf_pl::{SpannBfPlInput, SpannBfPlOperator}, - }; + use crate::execution::operators::spann_bf_pl::{SpannBfPlInput, SpannBfPlOperator}; // Basic operator test. #[tokio::test] diff --git a/rust/worker/src/execution/operators/spann_centers_search.rs b/rust/worker/src/execution/operators/spann_centers_search.rs index 6dc6d9edf06..864a0d90570 100644 --- a/rust/worker/src/execution/operators/spann_centers_search.rs +++ b/rust/worker/src/execution/operators/spann_centers_search.rs @@ -1,14 +1,11 @@ +use crate::segment::spann_segment::{SpannSegmentReader, SpannSegmentReaderContext}; use async_trait::async_trait; use chroma_distance::DistanceFunction; use chroma_error::{ChromaError, ErrorCodes}; use chroma_index::spann::utils::rng_query; +use chroma_system::Operator; use thiserror::Error; -use crate::{ - execution::operator::Operator, - segment::spann_segment::{SpannSegmentReader, SpannSegmentReaderContext}, -}; - #[derive(Debug)] pub(crate) struct SpannCentersSearchInput { // TODO(Sanket): Ship the reader instead of constructing here. diff --git a/rust/worker/src/execution/operators/spann_fetch_pl.rs b/rust/worker/src/execution/operators/spann_fetch_pl.rs index 0732ef364c4..894a4d4c656 100644 --- a/rust/worker/src/execution/operators/spann_fetch_pl.rs +++ b/rust/worker/src/execution/operators/spann_fetch_pl.rs @@ -1,13 +1,10 @@ +use crate::segment::spann_segment::{SpannSegmentReader, SpannSegmentReaderContext}; use async_trait::async_trait; use chroma_error::{ChromaError, ErrorCodes}; use chroma_index::spann::types::SpannPosting; +use chroma_system::{Operator, OperatorType}; use thiserror::Error; -use crate::{ - execution::operator::{Operator, OperatorType}, - segment::spann_segment::{SpannSegmentReader, SpannSegmentReaderContext}, -}; - #[derive(Debug)] pub(crate) struct SpannFetchPlInput { // TODO(Sanket): Ship the reader instead of constructing here. diff --git a/rust/worker/src/execution/operators/spann_knn_merge.rs b/rust/worker/src/execution/operators/spann_knn_merge.rs index 85b6fd42320..cced99b965f 100644 --- a/rust/worker/src/execution/operators/spann_knn_merge.rs +++ b/rust/worker/src/execution/operators/spann_knn_merge.rs @@ -2,7 +2,7 @@ use std::{cmp::Ordering, collections::BinaryHeap}; use async_trait::async_trait; -use crate::execution::operator::Operator; +use chroma_system::Operator; use super::knn::RecordDistance; @@ -100,7 +100,7 @@ impl Operator for SpannKnnMergeOperator #[cfg(test)] mod test { - use crate::execution::operator::Operator; + use chroma_system::Operator; // Basic operator test. #[tokio::test] diff --git a/rust/worker/src/execution/orchestration/compact.rs b/rust/worker/src/execution/orchestration/compact.rs index 4de1394fda8..97c9f67a50c 100644 --- a/rust/worker/src/execution/orchestration/compact.rs +++ b/rust/worker/src/execution/orchestration/compact.rs @@ -1,10 +1,4 @@ -use super::super::operator::wrap; -use super::orchestrator::Orchestrator; use crate::compactor::CompactionJob; -use crate::execution::dispatcher::Dispatcher; -use crate::execution::operator::TaskError; -use crate::execution::operator::TaskMessage; -use crate::execution::operator::TaskResult; use crate::execution::operators::apply_log_to_segment_writer::ApplyLogToSegmentWriterInput; use crate::execution::operators::apply_log_to_segment_writer::ApplyLogToSegmentWriterOperator; use crate::execution::operators::apply_log_to_segment_writer::ApplyLogToSegmentWriterOperatorError; @@ -43,17 +37,23 @@ use crate::segment::MaterializeLogsResult; use crate::sysdb::sysdb::GetCollectionsError; use crate::sysdb::sysdb::GetSegmentsError; use crate::sysdb::sysdb::SysDb; -use crate::system::ChannelError; -use crate::system::ComponentContext; -use crate::system::ComponentHandle; -use crate::system::Handler; -use crate::system::ReceiverForMessage; -use crate::utils::PanicError; use async_trait::async_trait; use chroma_blockstore::provider::BlockfileProvider; use chroma_error::ChromaError; use chroma_error::ErrorCodes; use chroma_index::hnsw_provider::HnswIndexProvider; +use chroma_system::wrap; +use chroma_system::ChannelError; +use chroma_system::ComponentContext; +use chroma_system::ComponentHandle; +use chroma_system::Dispatcher; +use chroma_system::Handler; +use chroma_system::Orchestrator; +use chroma_system::PanicError; +use chroma_system::ReceiverForMessage; +use chroma_system::TaskError; +use chroma_system::TaskMessage; +use chroma_system::TaskResult; use chroma_types::Chunk; use chroma_types::SegmentUuid; use chroma_types::{CollectionUuid, LogRecord, Segment, SegmentFlushInfo, SegmentType}; @@ -259,7 +259,7 @@ impl CompactOrchestrator { async fn partition( &mut self, records: Chunk, - ctx: &crate::system::ComponentContext, + ctx: &ComponentContext, ) { self.state = ExecutionState::Partition; let operator = PartitionOperator::new(); @@ -276,7 +276,7 @@ impl CompactOrchestrator { self_address: Box< dyn ReceiverForMessage>, >, - ctx: &crate::system::ComponentContext, + ctx: &ComponentContext, ) { self.state = ExecutionState::MaterializeApplyCommitFlush; @@ -330,7 +330,7 @@ impl CompactOrchestrator { TaskResult, >, >, - ctx: &crate::system::ComponentContext, + ctx: &ComponentContext, ) { let writers = self.get_segment_writers().await; let writers = match self.ok_or_terminate(writers, ctx) { @@ -434,7 +434,7 @@ impl CompactOrchestrator { TaskResult, >, >, - ctx: &crate::system::ComponentContext, + ctx: &ComponentContext, ) { let span = self.get_segment_writer_span(&segment_writer); let operator = CommitSegmentWriterOperator::new(); @@ -452,7 +452,7 @@ impl CompactOrchestrator { TaskResult, >, >, - ctx: &crate::system::ComponentContext, + ctx: &ComponentContext, ) { let span = self.get_segment_flusher_span(&segment_flusher); let operator = FlushSegmentWriterOperator::new(); @@ -462,11 +462,7 @@ impl CompactOrchestrator { self.ok_or_terminate(res, ctx); } - async fn register( - &mut self, - log_position: i64, - ctx: &crate::system::ComponentContext, - ) { + async fn register(&mut self, log_position: i64, ctx: &ComponentContext) { self.state = ExecutionState::Register; let operator = RegisterOperator::new(); let input = RegisterInput::new( @@ -701,7 +697,7 @@ impl Handler> for CompactOrchestrator async fn handle( &mut self, message: TaskResult, - ctx: &crate::system::ComponentContext, + ctx: &ComponentContext, ) { let records = match self.ok_or_terminate(message.into_inner(), ctx) { Some(recs) => recs, @@ -732,7 +728,7 @@ impl Handler> for CompactOrchestrato async fn handle( &mut self, message: TaskResult, - ctx: &crate::system::ComponentContext, + ctx: &ComponentContext, ) { let records = match self.ok_or_terminate(message.into_inner(), ctx) { Some(recs) => recs.records, @@ -751,7 +747,7 @@ impl Handler> async fn handle( &mut self, message: TaskResult, - ctx: &crate::system::ComponentContext, + ctx: &ComponentContext, ) { let materialized_result = match self.ok_or_terminate(message.into_inner(), ctx) { Some(result) => result, @@ -788,7 +784,7 @@ impl Handler, - ctx: &crate::system::ComponentContext, + ctx: &ComponentContext, ) { let message = match self.ok_or_terminate(message.into_inner(), ctx) { Some(message) => message, @@ -837,7 +833,7 @@ impl Handler, - ctx: &crate::system::ComponentContext, + ctx: &ComponentContext, ) { let message = match self.ok_or_terminate(message.into_inner(), ctx) { Some(message) => message, @@ -858,7 +854,7 @@ impl Handler, - ctx: &crate::system::ComponentContext, + ctx: &ComponentContext, ) { let message = match self.ok_or_terminate(message.into_inner(), ctx) { Some(message) => message, @@ -887,7 +883,7 @@ impl Handler> for CompactOrchestrator async fn handle( &mut self, message: TaskResult, - ctx: &crate::system::ComponentContext, + ctx: &ComponentContext, ) { self.terminate_with_result( message diff --git a/rust/worker/src/execution/orchestration/count.rs b/rust/worker/src/execution/orchestration/count.rs index eb38d608ebd..3b48d0206ef 100644 --- a/rust/worker/src/execution/orchestration/count.rs +++ b/rust/worker/src/execution/orchestration/count.rs @@ -1,27 +1,18 @@ use async_trait::async_trait; use chroma_blockstore::provider::BlockfileProvider; use chroma_error::{ChromaError, ErrorCodes}; +use chroma_system::{wrap, ChannelError, ComponentContext, ComponentHandle, Dispatcher, Handler, Orchestrator, PanicError, TaskError, TaskMessage, TaskResult}; use chroma_types::CollectionAndSegments; use thiserror::Error; use tokio::sync::oneshot::{error::RecvError, Sender}; -use crate::{ - execution::{ - dispatcher::Dispatcher, - operator::{wrap, TaskError, TaskMessage, TaskResult}, - operators::{ - count_records::{ - CountRecordsError, CountRecordsInput, CountRecordsOperator, CountRecordsOutput, - }, - fetch_log::{FetchLogError, FetchLogOperator, FetchLogOutput}, - }, +use crate::execution::operators::{ + count_records::{ + CountRecordsError, CountRecordsInput, CountRecordsOperator, CountRecordsOutput, }, - system::{ChannelError, ComponentContext, ComponentHandle, Handler}, - utils::PanicError, + fetch_log::{FetchLogError, FetchLogOperator, FetchLogOutput}, }; -use super::orchestrator::Orchestrator; - #[derive(Error, Debug)] pub enum CountError { #[error("Error sending message through channel: {0}")] diff --git a/rust/worker/src/execution/orchestration/get.rs b/rust/worker/src/execution/orchestration/get.rs index f41d2e27254..672ec500b3c 100644 --- a/rust/worker/src/execution/orchestration/get.rs +++ b/rust/worker/src/execution/orchestration/get.rs @@ -1,28 +1,19 @@ use async_trait::async_trait; use chroma_blockstore::provider::BlockfileProvider; use chroma_error::{ChromaError, ErrorCodes}; +use chroma_system::{wrap, ChannelError, ComponentContext, ComponentHandle, Dispatcher, Handler, Orchestrator, PanicError, TaskError, TaskMessage, TaskResult}; use chroma_types::CollectionAndSegments; use thiserror::Error; use tokio::sync::oneshot::{error::RecvError, Sender}; -use crate::{ - execution::{ - dispatcher::Dispatcher, - operator::{wrap, TaskError, TaskMessage, TaskResult}, - operators::{ - fetch_log::{FetchLogError, FetchLogOperator, FetchLogOutput}, - filter::{FilterError, FilterInput, FilterOperator, FilterOutput}, - limit::{LimitError, LimitInput, LimitOperator, LimitOutput}, - prefetch_record::{PrefetchRecordError, PrefetchRecordOperator, PrefetchRecordOutput}, - projection::{ProjectionError, ProjectionInput, ProjectionOperator, ProjectionOutput}, - }, - }, - system::{ChannelError, ComponentContext, ComponentHandle, Handler}, - utils::PanicError, +use crate::execution::operators::{ + fetch_log::{FetchLogError, FetchLogOperator, FetchLogOutput}, + filter::{FilterError, FilterInput, FilterOperator, FilterOutput}, + limit::{LimitError, LimitInput, LimitOperator, LimitOutput}, + prefetch_record::{PrefetchRecordError, PrefetchRecordOperator, PrefetchRecordOutput}, + projection::{ProjectionError, ProjectionInput, ProjectionOperator, ProjectionOutput}, }; -use super::orchestrator::Orchestrator; - #[derive(Error, Debug)] pub enum GetError { #[error("Error sending message through channel: {0}")] diff --git a/rust/worker/src/execution/orchestration/knn.rs b/rust/worker/src/execution/orchestration/knn.rs index 2ffcd26ee1a..8d51728b7f8 100644 --- a/rust/worker/src/execution/orchestration/knn.rs +++ b/rust/worker/src/execution/orchestration/knn.rs @@ -1,32 +1,22 @@ use async_trait::async_trait; use chroma_blockstore::provider::BlockfileProvider; +use chroma_system::{wrap, ComponentContext, ComponentHandle, Dispatcher, Handler, Orchestrator, TaskMessage, TaskResult}; use tokio::sync::oneshot::Sender; -use crate::{ - execution::{ - dispatcher::Dispatcher, - operator::{wrap, TaskMessage, TaskResult}, - operators::{ - knn::{KnnOperator, RecordDistance}, - knn_hnsw::{KnnHnswError, KnnHnswInput, KnnHnswOutput}, - knn_log::{KnnLogError, KnnLogInput, KnnLogOutput}, - knn_merge::{KnnMergeError, KnnMergeInput, KnnMergeOperator, KnnMergeOutput}, - knn_projection::{ - KnnProjectionError, KnnProjectionInput, KnnProjectionOperator, KnnProjectionOutput, - }, - prefetch_record::{ - PrefetchRecordError, PrefetchRecordInput, PrefetchRecordOperator, - PrefetchRecordOutput, - }, - }, +use crate::execution::operators::{ + knn::{KnnOperator, RecordDistance}, + knn_hnsw::{KnnHnswError, KnnHnswInput, KnnHnswOutput}, + knn_log::{KnnLogError, KnnLogInput, KnnLogOutput}, + knn_merge::{KnnMergeError, KnnMergeInput, KnnMergeOperator, KnnMergeOutput}, + knn_projection::{ + KnnProjectionError, KnnProjectionInput, KnnProjectionOperator, KnnProjectionOutput, + }, + prefetch_record::{ + PrefetchRecordError, PrefetchRecordInput, PrefetchRecordOperator, PrefetchRecordOutput, }, - system::{ComponentContext, ComponentHandle, Handler}, }; -use super::{ - knn_filter::{KnnError, KnnFilterOutput, KnnOutput, KnnResult}, - orchestrator::Orchestrator, -}; +use super::knn_filter::{KnnError, KnnFilterOutput, KnnOutput, KnnResult}; /// The `KnnOrchestrator` finds the nearest neighbor of a target embedding given the search domain. /// When used together with `KnnFilterOrchestrator`, they evaluate a `.query(...)` query diff --git a/rust/worker/src/execution/orchestration/knn_filter.rs b/rust/worker/src/execution/orchestration/knn_filter.rs index e24c16dba85..48717b8a755 100644 --- a/rust/worker/src/execution/orchestration/knn_filter.rs +++ b/rust/worker/src/execution/orchestration/knn_filter.rs @@ -3,24 +3,21 @@ use chroma_blockstore::provider::BlockfileProvider; use chroma_distance::DistanceFunction; use chroma_error::{ChromaError, ErrorCodes}; use chroma_index::hnsw_provider::HnswIndexProvider; +use chroma_system::{wrap, ChannelError, ComponentContext, ComponentHandle, Dispatcher, Handler, Orchestrator, PanicError, TaskError, TaskMessage, TaskResult}; use chroma_types::{CollectionAndSegments, Segment}; use thiserror::Error; use tokio::sync::oneshot::{error::RecvError, Sender}; use crate::{ - execution::{ - dispatcher::Dispatcher, - operator::{wrap, TaskError, TaskMessage, TaskResult}, - operators::{ - fetch_log::{FetchLogError, FetchLogOperator, FetchLogOutput}, - filter::{FilterError, FilterInput, FilterOperator, FilterOutput}, - knn_hnsw::KnnHnswError, - knn_log::KnnLogError, - knn_projection::{KnnProjectionError, KnnProjectionOutput}, - spann_bf_pl::SpannBfPlError, - spann_centers_search::SpannCentersSearchError, - spann_fetch_pl::SpannFetchPlError, - }, + execution::operators::{ + fetch_log::{FetchLogError, FetchLogOperator, FetchLogOutput}, + filter::{FilterError, FilterInput, FilterOperator, FilterOutput}, + knn_hnsw::KnnHnswError, + knn_log::KnnLogError, + knn_projection::{KnnProjectionError, KnnProjectionOutput}, + spann_bf_pl::SpannBfPlError, + spann_centers_search::SpannCentersSearchError, + spann_fetch_pl::SpannFetchPlError, }, segment::{ distributed_hnsw_segment::{ @@ -28,12 +25,8 @@ use crate::{ }, utils::distance_function_from_segment, }, - system::{ChannelError, ComponentContext, ComponentHandle, Handler}, - utils::PanicError, }; -use super::orchestrator::Orchestrator; - #[derive(Error, Debug)] pub enum KnnError { #[error("Error sending message through channel: {0}")] diff --git a/rust/worker/src/execution/orchestration/mod.rs b/rust/worker/src/execution/orchestration/mod.rs index 58a9c0eb942..fa28dc70a61 100644 --- a/rust/worker/src/execution/orchestration/mod.rs +++ b/rust/worker/src/execution/orchestration/mod.rs @@ -7,4 +7,3 @@ pub(crate) use count::*; pub mod get; pub mod knn; pub mod knn_filter; -pub mod orchestrator; diff --git a/rust/worker/src/execution/orchestration/spann_knn.rs b/rust/worker/src/execution/orchestration/spann_knn.rs index a190055b800..96235a64a39 100644 --- a/rust/worker/src/execution/orchestration/spann_knn.rs +++ b/rust/worker/src/execution/orchestration/spann_knn.rs @@ -2,43 +2,35 @@ use async_trait::async_trait; use chroma_blockstore::provider::BlockfileProvider; use chroma_distance::{normalize, DistanceFunction}; use chroma_index::hnsw_provider::HnswIndexProvider; +use chroma_system::{wrap, ComponentContext, ComponentHandle, Dispatcher, Handler, Orchestrator, TaskMessage, TaskResult}; use tokio::sync::oneshot::Sender; use crate::{ - execution::{ - dispatcher::Dispatcher, - operator::{wrap, TaskMessage, TaskResult}, - operators::{ - knn::{KnnOperator, RecordDistance}, - knn_log::{KnnLogError, KnnLogInput, KnnLogOutput}, - knn_projection::{ - KnnProjectionError, KnnProjectionInput, KnnProjectionOperator, KnnProjectionOutput, - }, - prefetch_record::{ - PrefetchRecordError, PrefetchRecordInput, PrefetchRecordOperator, - PrefetchRecordOutput, - }, - spann_bf_pl::{SpannBfPlError, SpannBfPlInput, SpannBfPlOperator, SpannBfPlOutput}, - spann_centers_search::{ - SpannCentersSearchError, SpannCentersSearchInput, SpannCentersSearchOperator, - SpannCentersSearchOutput, - }, - spann_fetch_pl::{ - SpannFetchPlError, SpannFetchPlInput, SpannFetchPlOperator, SpannFetchPlOutput, - }, - spann_knn_merge::{ - SpannKnnMergeError, SpannKnnMergeInput, SpannKnnMergeOperator, SpannKnnMergeOutput, - }, + execution::operators::{ + knn::{KnnOperator, RecordDistance}, + knn_log::{KnnLogError, KnnLogInput, KnnLogOutput}, + knn_projection::{ + KnnProjectionError, KnnProjectionInput, KnnProjectionOperator, KnnProjectionOutput, + }, + prefetch_record::{ + PrefetchRecordError, PrefetchRecordInput, PrefetchRecordOperator, PrefetchRecordOutput, + }, + spann_bf_pl::{SpannBfPlError, SpannBfPlInput, SpannBfPlOperator, SpannBfPlOutput}, + spann_centers_search::{ + SpannCentersSearchError, SpannCentersSearchInput, SpannCentersSearchOperator, + SpannCentersSearchOutput, + }, + spann_fetch_pl::{ + SpannFetchPlError, SpannFetchPlInput, SpannFetchPlOperator, SpannFetchPlOutput, + }, + spann_knn_merge::{ + SpannKnnMergeError, SpannKnnMergeInput, SpannKnnMergeOperator, SpannKnnMergeOutput, }, }, segment::spann_segment::SpannSegmentReaderContext, - system::{ComponentContext, ComponentHandle, Handler}, }; -use super::{ - knn_filter::{KnnError, KnnFilterOutput, KnnOutput, KnnResult}, - orchestrator::Orchestrator, -}; +use super::knn_filter::{KnnError, KnnFilterOutput, KnnOutput, KnnResult}; // TODO(Sanket): Make these configurable. const RNG_FACTOR: f32 = 1.0; diff --git a/rust/worker/src/lib.rs b/rust/worker/src/lib.rs index 83e9c813d1d..90892b0cc7c 100644 --- a/rust/worker/src/lib.rs +++ b/rust/worker/src/lib.rs @@ -17,7 +17,6 @@ pub mod config; pub mod execution; pub mod log; pub mod segment; -pub mod system; const CONFIG_PATH_ENV_VAR: &str = "CONFIG_PATH"; @@ -35,15 +34,14 @@ pub async fn query_service_entrypoint() { &config.otel_endpoint, ); - let system: system::System = system::System::new(); - let dispatcher = - match execution::dispatcher::Dispatcher::try_from_config(&config.dispatcher).await { - Ok(dispatcher) => dispatcher, - Err(err) => { - println!("Failed to create dispatcher component: {:?}", err); - return; - } - }; + let system = chroma_system::System::new(); + let dispatcher = match chroma_system::Dispatcher::try_from_config(&config.dispatcher).await { + Ok(dispatcher) => dispatcher, + Err(err) => { + println!("Failed to create dispatcher component: {:?}", err); + return; + } + }; let mut dispatcher_handle = system.start_component(dispatcher); let mut worker_server = match server::WorkerServer::try_from_config(&config).await { Ok(worker_server) => worker_server, @@ -96,7 +94,7 @@ pub async fn compaction_service_entrypoint() { &config.otel_endpoint, ); - let system: system::System = system::System::new(); + let system = chroma_system::System::new(); let mut memberlist = match memberlist::CustomResourceMemberlistProvider::try_from_config( &config.memberlist_provider, @@ -110,14 +108,13 @@ pub async fn compaction_service_entrypoint() { } }; - let dispatcher = - match execution::dispatcher::Dispatcher::try_from_config(&config.dispatcher).await { - Ok(dispatcher) => dispatcher, - Err(err) => { - println!("Failed to create dispatcher component: {:?}", err); - return; - } - }; + let dispatcher = match chroma_system::Dispatcher::try_from_config(&config.dispatcher).await { + Ok(dispatcher) => dispatcher, + Err(err) => { + println!("Failed to create dispatcher component: {:?}", err); + return; + } + }; let mut dispatcher_handle = system.start_component(dispatcher); let mut compaction_manager = match crate::compactor::CompactionManager::try_from_config(&config).await { diff --git a/rust/worker/src/memberlist/memberlist_provider.rs b/rust/worker/src/memberlist/memberlist_provider.rs index 0857ebc8f38..0604d87c62f 100644 --- a/rust/worker/src/memberlist/memberlist_provider.rs +++ b/rust/worker/src/memberlist/memberlist_provider.rs @@ -1,9 +1,8 @@ use super::config::MemberlistProviderConfig; -use crate::system::ReceiverForMessage; -use crate::system::{Component, ComponentContext, Handler, StreamHandler}; use async_trait::async_trait; use chroma_config::Configurable; use chroma_error::{ChromaError, ErrorCodes}; +use chroma_system::{Component, ComponentContext, Handler, ReceiverForMessage, StreamHandler}; use futures::StreamExt; use kube::runtime::watcher::Config; use kube::{ @@ -241,7 +240,7 @@ impl MemberlistProvider for CustomResourceMemberlistProvider { #[cfg(test)] mod tests { use super::*; - use crate::system::System; + use chroma_system::System; #[tokio::test] // Naming this "test_k8s_integration_" means that the Tilt stack is required. See rust/worker/README.md. diff --git a/rust/worker/src/server.rs b/rust/worker/src/server.rs index b58dec8b018..943476350db 100644 --- a/rust/worker/src/server.rs +++ b/rust/worker/src/server.rs @@ -5,6 +5,7 @@ use chroma_blockstore::provider::BlockfileProvider; use chroma_config::Configurable; use chroma_error::ChromaError; use chroma_index::hnsw_provider::HnswIndexProvider; +use chroma_system::{ComponentHandle, Dispatcher, Orchestrator, System}; use chroma_types::{ chroma_proto::{ self, query_executor_server::QueryExecutor, CountPlan, CountResult, GetPlan, GetResult, @@ -20,16 +21,14 @@ use tracing::{trace_span, Instrument}; use crate::{ config::QueryServiceConfig, execution::{ - dispatcher::Dispatcher, operators::{fetch_log::FetchLogOperator, knn_projection::KnnProjectionOperator}, orchestration::{ get::GetOrchestrator, knn::KnnOrchestrator, knn_filter::KnnFilterOrchestrator, - orchestrator::Orchestrator, CountOrchestrator, + CountOrchestrator, }, }, log::log::Log, sysdb::sysdb::SysDb, - system::{ComponentHandle, System}, tracing::util::wrap_span_with_parent_context, utils::convert::{from_proto_knn, to_proto_knn_batch_result}, }; @@ -398,15 +397,15 @@ mod tests { use std::collections::HashMap; use super::*; - use crate::execution::dispatcher; use crate::log::log::InMemoryLog; use crate::segment::test::TestSegment; use crate::sysdb::test_sysdb::TestSysDb; - use crate::system; use chroma_index::test_hnsw_index_provider; #[cfg(debug_assertions)] use chroma_proto::debug_client::DebugClient; use chroma_proto::query_executor_client::QueryExecutorClient; + use chroma_system::dispatcher; + use chroma_system::system; use uuid::Uuid; fn run_server() -> String { diff --git a/rust/worker/src/system/mod.rs b/rust/worker/src/system/mod.rs deleted file mode 100644 index 7c92abb7e26..00000000000 --- a/rust/worker/src/system/mod.rs +++ /dev/null @@ -1,13 +0,0 @@ -mod executor; -mod receiver; -mod scheduler; -#[allow(clippy::module_inception)] -mod system; -mod types; -mod wrapped_message; - -// Re-export types -pub(crate) use receiver::*; -pub use system::*; -pub use types::*; -pub(crate) use wrapped_message::*; diff --git a/rust/worker/src/utils/mod.rs b/rust/worker/src/utils/mod.rs index e05d8cfd813..02d63d4e141 100644 --- a/rust/worker/src/utils/mod.rs +++ b/rust/worker/src/utils/mod.rs @@ -1,4 +1 @@ pub(crate) mod convert; -mod panic; - -pub(crate) use panic::*;