Skip to content

Commit

Permalink
[ENH] Move system and task into its own crate (#3464)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
   - GC service needs the system and the dispatcher/worker framework

## Test plan
*How are these changes tested?*
- [x] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
None
  • Loading branch information
sanketkedia authored Jan 10, 2025
1 parent 0d01630 commit e05cae8
Show file tree
Hide file tree
Showing 60 changed files with 305 additions and 350 deletions.
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[workspace]
resolver = "2"

members = ["rust/benchmark", "rust/blockstore", "rust/cache", "rust/chroma", "rust/config", "rust/distance", "rust/error", "rust/garbage_collector", "rust/index", "rust/load", "rust/storage", "rust/types", "rust/worker"]
members = ["rust/benchmark", "rust/blockstore", "rust/cache", "rust/chroma", "rust/config", "rust/distance", "rust/error", "rust/garbage_collector", "rust/index", "rust/load", "rust/storage", "rust/system", "rust/types", "rust/worker"]

[workspace.dependencies]
arrow = "52.2.0"
Expand Down Expand Up @@ -43,6 +43,7 @@ chroma-cache = { path = "rust/cache" }
chroma-types = { path = "rust/types" }
chroma-index = { path = "rust/index" }
chroma-distance = { path = "rust/distance" }
chroma-system = { path = "rust/system" }
worker = { path = "rust/worker" }

# Dev dependencies
Expand Down
8 changes: 4 additions & 4 deletions rust/garbage_collector/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use figment::providers::{Env, Format, Yaml};

const DEFAULT_CONFIG_PATH: &str = "./garbage_collector_config.yaml";

#[derive(Debug, serde::Deserialize)]
// TODO(rohitcpbot): Remove this dead code annotation.
#[allow(dead_code)]
#[derive(Debug, serde::Deserialize)]
// TODO(Sanket): Remove this dead code annotation.
pub(super) struct GarbageCollectorConfig {
pub(super) service_name: String,
pub(super) otel_endpoint: String,
Expand All @@ -15,9 +15,9 @@ pub(super) struct GarbageCollectorConfig {
sysdb_connection: SysdbConnectionConfig,
}

#[derive(Debug, serde::Deserialize)]
// TODO(rohitcpbot): Remove this dead code annotation.
#[allow(dead_code)]
#[derive(Debug, serde::Deserialize)]
// TODO(Sanket): Remove this dead code annotation.
pub(super) struct SysdbConnectionConfig {
host: String,
port: u32,
Expand Down
23 changes: 23 additions & 0 deletions rust/system/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "chroma-system"
version = "0.1.0"
edition = "2021"

[lib]
path = "src/lib.rs"

[dependencies]
thiserror = { workspace = true }
async-trait = { workspace = true }
futures = { workspace = true }
parking_lot = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
serde = { workspace = true }
uuid = { workspace = true }
rand = { workspace = true }
tempfile = { workspace = true }

chroma-error = { workspace = true }
chroma-config = { workspace = true }
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::operator::OperatorType;
use super::{operator::TaskMessage, worker_thread::WorkerThread};
use crate::execution::config::DispatcherConfig;
use crate::system::{Component, ComponentContext, Handler, ReceiverForMessage, System};
use crate::{Component, ComponentContext, Handler, ReceiverForMessage, System};
use async_trait::async_trait;
use chroma_config::Configurable;
use chroma_error::ChromaError;
Expand Down Expand Up @@ -223,10 +223,7 @@ mod tests {
use uuid::Uuid;

use super::*;
use crate::{
execution::operator::{wrap, Operator, TaskResult},
system::{ComponentHandle, System},
};
use crate::{operator::*, ComponentHandle};
use std::{
collections::HashSet,
sync::{
Expand Down
10 changes: 10 additions & 0 deletions rust/system/src/execution/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
pub mod config;
pub mod dispatcher;
pub mod operator;
pub mod orchestrator;
pub mod worker_thread;

pub use config::*;
pub use dispatcher::*;
pub use operator::*;
pub use orchestrator::*;
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{system::ReceiverForMessage, utils::PanicError};
use crate::{utils::PanicError, ReceiverForMessage};
use async_trait::async_trait;
use chroma_error::{ChromaError, ErrorCodes};
use futures::FutureExt;
Expand Down Expand Up @@ -32,7 +32,7 @@ where
}

#[derive(Debug, Error)]
pub(super) enum TaskError<Err> {
pub enum TaskError<Err> {
#[error("Panic occurred while handling task: {0:?}")]
Panic(PanicError),
#[error("Task failed with error: {0:?}")]
Expand All @@ -54,13 +54,13 @@ where
/// A task result is a wrapper around the result of a task.
/// It contains the task id for tracking purposes.
#[derive(Debug)]
pub(super) struct TaskResult<Output, Error> {
pub struct TaskResult<Output, Error> {
result: Result<Output, TaskError<Error>>,
task_id: Uuid,
}

impl<Output, Error> TaskResult<Output, Error> {
pub(super) fn into_inner(self) -> Result<Output, TaskError<Error>> {
pub fn into_inner(self) -> Result<Output, TaskError<Error>> {
self.result
}

Expand Down Expand Up @@ -183,7 +183,7 @@ where
}

/// Wrap an operator and its input into a task message.
pub(super) fn wrap<Input, Output, Error>(
pub fn wrap<Input, Output, Error>(
operator: Box<dyn Operator<Input, Output, Error = Error>>,
input: Input,
reply_channel: Box<dyn ReceiverForMessage<TaskResult<Output, Error>>>,
Expand Down Expand Up @@ -211,7 +211,7 @@ mod tests {

use crate::{
execution::dispatcher::Dispatcher,
system::{Component, ComponentContext, ComponentHandle, Handler, System},
{Component, ComponentContext, ComponentHandle, Handler, System},
};

use super::*;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use crate::{ChannelError, Component, ComponentContext, ComponentHandle, PanicError, System};
use async_trait::async_trait;
use chroma_error::ChromaError;
use core::fmt::Debug;
use std::any::type_name;
use tokio::sync::oneshot::{self, error::RecvError, Sender};
use tracing::Span;

use crate::{
execution::{dispatcher::Dispatcher, operator::TaskMessage},
system::{ChannelError, Component, ComponentContext, ComponentHandle, System},
utils::PanicError,
};
use crate::{Dispatcher, TaskMessage};

#[async_trait]
pub trait Orchestrator: Debug + Send + Sized + 'static {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{dispatcher::TaskRequestMessage, operator::TaskMessage};
use crate::system::{Component, ComponentContext, ComponentRuntime, Handler, ReceiverForMessage};
use crate::{Component, ComponentContext, ComponentRuntime, Handler, ReceiverForMessage};
use async_trait::async_trait;
use std::fmt::{Debug, Formatter, Result};
use tracing::{trace_span, Instrument, Span};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::{scheduler::Scheduler, system::System, Component, ComponentSender, WrappedMessage};
use crate::system::ComponentContext;
use super::{
scheduler::Scheduler, system::System, Component, ComponentContext, ComponentSender,
WrappedMessage,
};
use std::sync::Arc;
use tokio::select;
use tracing::{trace_span, Instrument, Span};
Expand Down
17 changes: 17 additions & 0 deletions rust/system/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
pub mod execution;
pub mod executor;
pub mod receiver;
pub mod scheduler;
#[allow(clippy::module_inception)]
pub mod system;
pub mod types;
pub mod utils;
pub mod wrapped_message;

// Re-export types
pub use execution::*;
pub use receiver::*;
pub use system::*;
pub use types::*;
pub use utils::*;
pub(crate) use wrapped_message::*;
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@ use thiserror::Error;

/// A ReceiverForMessage is generic over a message type, and useful if you want to send a given message type to any component that can handle it.
#[async_trait]
pub(crate) trait ReceiverForMessage<M>:
Send + Sync + Debug + ReceiverForMessageClone<M>
{
pub trait ReceiverForMessage<M>: Send + Sync + Debug + ReceiverForMessageClone<M> {
async fn send(
&self,
message: M,
tracing_context: Option<tracing::Span>,
) -> Result<(), ChannelError>;
}

pub(crate) trait ReceiverForMessageClone<M> {
pub trait ReceiverForMessageClone<M> {
fn clone_box(&self) -> Box<dyn ReceiverForMessage<M>>;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub(crate) struct SchedulerTaskHandle {
}

#[derive(Clone, Debug)]
pub(crate) struct Scheduler {
pub struct Scheduler {
handles: Arc<RwLock<Vec<SchedulerTaskHandle>>>,
}

Expand All @@ -28,7 +28,7 @@ impl Scheduler {
/// Schedule a message to be sent to the component after the specified duration.
///
/// `span_factory` is called immediately before sending the scheduled message to the component.
pub(crate) fn schedule<C, M, S>(
pub fn schedule<C, M, S>(
&self,
message: M,
duration: Duration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ struct Inner {
}

impl System {
pub(crate) fn new() -> System {
pub fn new() -> System {
System {
inner: Arc::new(Inner {
scheduler: Scheduler::new(),
Expand Down Expand Up @@ -87,11 +87,11 @@ impl System {
tokio::spawn(async move { stream_loop(stream, &ctx).await });
}

pub(crate) async fn stop(&self) {
pub async fn stop(&self) {
self.inner.scheduler.stop();
}

pub(crate) async fn join(&self) {
pub async fn join(&self) {
self.inner.scheduler.join().await;
}
}
Expand Down
20 changes: 10 additions & 10 deletions rust/worker/src/system/types.rs → rust/system/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tokio::task::JoinError;

use super::{system::System, ReceiverForMessage};

pub(crate) trait Message: Debug + Send + 'static {}
pub trait Message: Debug + Send + 'static {}
impl<M: Debug + Send + 'static> Message for M {}

#[derive(Debug, PartialEq, Clone, Copy)]
Expand Down Expand Up @@ -55,7 +55,7 @@ pub trait Component: Send + Sized + Debug + 'static {
/// # Methods
/// - handle: Handle a message
#[async_trait]
pub(crate) trait Handler<M>
pub trait Handler<M>
where
Self: Component + Sized + 'static,
{
Expand All @@ -72,7 +72,7 @@ where
/// # Methods
/// - handle: Handle a message from a stream
/// - register_stream: Register a stream to be processed, this is provided and you do not need to implement it
pub(crate) trait StreamHandler<M>
pub trait StreamHandler<M>
where
Self: Component + 'static + Handler<M>,
M: Message,
Expand Down Expand Up @@ -216,14 +216,14 @@ impl<C: Component> ComponentHandle<C> {
}
}

pub(crate) fn stop(&mut self) {
pub fn stop(&mut self) {
let mut state = self.state.lock();
self.cancellation_token.cancel();
*state = ComponentState::Stopped;
}

/// Consumes the underlying join handle. Panics if it is consumed twice.
pub(crate) async fn join(&mut self) -> Result<(), JoinError> {
pub async fn join(&mut self) -> Result<(), JoinError> {
if let Some(join_handle) = &mut self.join_handle {
join_handle.consume().await
} else {
Expand All @@ -236,15 +236,15 @@ impl<C: Component> ComponentHandle<C> {
return *self.state.lock();
}

pub(crate) fn receiver<M>(&self) -> Box<dyn ReceiverForMessage<M>>
pub fn receiver<M>(&self) -> Box<dyn ReceiverForMessage<M>>
where
C: Component + Handler<M>,
M: Message,
{
Box::new(self.sender.clone())
}

pub(crate) async fn send<M>(
pub async fn send<M>(
&mut self,
message: M,
tracing_context: Option<tracing::Span>,
Expand Down Expand Up @@ -277,12 +277,12 @@ where
{
pub(crate) system: System,
pub(crate) sender: ComponentSender<C>,
pub(crate) cancellation_token: tokio_util::sync::CancellationToken,
pub(crate) scheduler: Scheduler,
pub cancellation_token: tokio_util::sync::CancellationToken,
pub scheduler: Scheduler,
}

impl<C: Component> ComponentContext<C> {
pub(crate) fn receiver<M>(&self) -> Box<dyn ReceiverForMessage<M>>
pub fn receiver<M>(&self) -> Box<dyn ReceiverForMessage<M>>
where
C: Component + Handler<M>,
M: Message,
Expand Down
3 changes: 3 additions & 0 deletions rust/system/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod panic;

pub use panic::*;
File renamed without changes.
File renamed without changes.
1 change: 1 addition & 0 deletions rust/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ chroma-types = { workspace = true }
chroma-cache = { workspace = true }
chroma-index = { workspace = true }
chroma-distance = { workspace = true }
chroma-system = { workspace = true }

fastrace = "0.7"
fastrace-opentelemetry = "0.8"
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/benches/filter.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::iter::once;

use chroma_benchmark::benchmark::{bench_run, tokio_multi_thread};
use chroma_system::Operator;
use chroma_types::{
BooleanOperator, Chunk, DirectWhereComparison, MetadataValue, PrimitiveOperator, Where,
WhereChildren, WhereComparison,
};
use criterion::Criterion;
use criterion::{criterion_group, criterion_main};
use worker::execution::operator::Operator;
use worker::execution::operators::filter::{FilterInput, FilterOperator};
use worker::log::test::upsert_generator;
use worker::segment::test::TestSegment;
Expand Down
Loading

0 comments on commit e05cae8

Please sign in to comment.