Skip to content

Commit

Permalink
[ENH] Implement compaction client
Browse files Browse the repository at this point in the history
  • Loading branch information
Sicheng Pan committed Jan 15, 2025
1 parent 2e0c103 commit fa2d027
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions rust/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ version = "0.1.0"
edition = "2021"

[[bin]]
name = "query_service"
path = "src/bin/query_service.rs"
name = "compaction_client"
path = "src/bin/compaction_client.rs"

[[bin]]
name = "compaction_service"
path = "src/bin/compaction_service.rs"

[[bin]]
name = "query_service"
path = "src/bin/query_service.rs"

[dependencies]
rand = "0.8.5"
murmur3 = "0.5.2"
Expand Down Expand Up @@ -47,6 +51,7 @@ prost-types = { workspace = true }
num_cpus = { workspace = true }
flatbuffers = { workspace = true }
tantivy = { workspace = true }
clap = { workspace = true }

chroma-blockstore = { workspace = true }
chroma-error = { workspace = true }
Expand Down
11 changes: 11 additions & 0 deletions rust/worker/src/bin/compaction_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;

#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

#[tokio::main]
async fn main() {
worker::compaction_client_entrypoint().await;
}
66 changes: 66 additions & 0 deletions rust/worker/src/compactor/compaction_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use chroma_types::chroma_proto::{
compactor_client::CompactorClient, CollectionIds, CompactionRequest,
};
use clap::{Parser, Subcommand};
use thiserror::Error;
use tonic::transport::Channel;
use uuid::Uuid;

/// Error for compaction client
#[derive(Debug, Error)]
pub enum CompactionClientError {
#[error("Compactor failed: {0}")]
Compactor(String),
#[error("Unable to connect to compactor: {0}")]
Connection(#[from] tonic::transport::Error),
}

/// Tool to control compaction service
#[derive(Debug, Parser)]
#[command(version, about, long_about = None)]
pub struct CompactionClient {
/// Url of the target compactor
#[arg(short, long)]
url: String,
/// Subcommand for compaction
#[command(subcommand)]
command: CompactionCommand,
}

#[derive(Debug, Subcommand)]
pub enum CompactionCommand {
/// Trigger a one-off compaction
Compact {
/// Specify Uuids of the collections to compact. If unspecified, no compaction will occur unless --all flag is specified
#[arg(short, long)]
id: Vec<Uuid>,
/// Compact all collections available. If specified, the Uuids specified with --id will be ignored
#[arg(short, long)]
all: bool,
},
}

impl CompactionClient {
async fn grpc_client(&self) -> Result<CompactorClient<Channel>, CompactionClientError> {
Ok(CompactorClient::connect(self.url.clone()).await?)
}

pub async fn run(&self) -> Result<(), CompactionClientError> {
match &self.command {
CompactionCommand::Compact { id, all } => {
let mut client = self.grpc_client().await?;
let response = client
.compact(CompactionRequest {
ids: (!all).then_some(CollectionIds {
ids: id.iter().map(ToString::to_string).collect(),
}),
})
.await;
if let Err(status) = response {
return Err(CompactionClientError::Compactor(status.to_string()));
}
}
};
Ok(())
}
}
2 changes: 1 addition & 1 deletion rust/worker/src/compactor/compaction_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct CompactionServer {
impl CompactionServer {
pub async fn run(self) -> Result<(), Box<dyn std::error::Error>> {
let addr = format!("[::]:{}", self.port).parse().unwrap();
tracing::info!("Compaction server listing at {addr}");
tracing::info!("Compaction server listening at {addr}");
let server = Server::builder().add_service(CompactorServer::new(self));
server
.serve_with_shutdown(addr, async {
Expand Down
1 change: 1 addition & 0 deletions rust/worker/src/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ mod types;
pub(crate) use compaction_manager::*;
pub(crate) use types::*;

pub mod compaction_client;
pub mod compaction_server;
11 changes: 10 additions & 1 deletion rust/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ mod tracing;
mod utils;

use chroma_config::Configurable;
use clap::Parser;
use compactor::compaction_client::CompactionClient;
use compactor::compaction_server::CompactionServer;
use memberlist::MemberlistProvider;

Expand Down Expand Up @@ -139,7 +141,7 @@ pub async fn compaction_service_entrypoint() {
};

let server_join_handle = tokio::spawn(async move {
let _ = CompactionServer::run(compaction_server).await;
let _ = compaction_server.run().await;
});

let mut sigterm = match signal(SignalKind::terminate()) {
Expand Down Expand Up @@ -167,3 +169,10 @@ pub async fn compaction_service_entrypoint() {
};
println!("Server stopped");
}

pub async fn compaction_client_entrypoint() {
let client = CompactionClient::parse();
if let Err(e) = client.run().await {
eprintln!("{e}");
}
}

0 comments on commit fa2d027

Please sign in to comment.