Skip to content

Commit

Permalink
[CLN] Move log module outside of worker crate (#3547)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
   - Move log module outside of worker crate
 - New functionality
   - ...

## Test plan
*How are these changes tested?*

- [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs repository](https://github.com/chroma-core/docs)?*
  • Loading branch information
Sicheng-Pan authored Jan 24, 2025
1 parent e4ec7f3 commit c5f7d2e
Show file tree
Hide file tree
Showing 31 changed files with 304 additions and 268 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[workspace]
resolver = "2"

members = ["rust/benchmark", "rust/blockstore", "rust/cache", "rust/chroma", "rust/config", "rust/distance", "rust/error", "rust/garbage_collector", "rust/index", "rust/load", "rust/storage", "rust/system", "rust/sysdb", "rust/types", "rust/worker", "rust/segment", "rust/frontend"]
members = ["rust/benchmark", "rust/blockstore", "rust/cache", "rust/chroma", "rust/config", "rust/distance", "rust/error", "rust/frontend", "rust/garbage_collector", "rust/index", "rust/load", "rust/log", "rust/storage", "rust/system", "rust/sysdb", "rust/types", "rust/worker", "rust/segment", ]

[workspace.dependencies]
arrow = "52.2.0"
Expand Down Expand Up @@ -37,17 +37,18 @@ uuid = { version = "1.11.0", features = ["v4", "fast-rng", "macro-diagnostics",

chroma-benchmark = { path = "rust/benchmark" }
chroma-blockstore = { path = "rust/blockstore" }
chroma-error = { path = "rust/error" }
chroma-config = { path = "rust/config" }
chroma-storage = { path = "rust/storage" }
chroma-cache = { path = "rust/cache" }
chroma-types = { path = "rust/types" }
chroma-index = { path = "rust/index" }
chroma-config = { path = "rust/config" }
chroma-distance = { path = "rust/distance" }
chroma-error = { path = "rust/error" }
chroma-frontend = { path = "rust/frontend" }
chroma-index = { path = "rust/index" }
chroma-log = { path = "rust/log" }
chroma-segment = { path="rust/segment" }
chroma-storage = { path = "rust/storage" }
chroma-system = { path = "rust/system" }
chroma-sysdb = { path = "rust/sysdb" }
chroma-frontend = { path = "rust/frontend" }
chroma-types = { path = "rust/types" }
worker = { path = "rust/worker" }

# Dev dependencies
Expand Down
23 changes: 23 additions & 0 deletions rust/log/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "chroma-log"
version = "0.1.0"
edition = "2021"

[dependencies]
async-trait = { workspace = true }
# Used by tracing
opentelemetry = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
tonic = { workspace = true }
tracing = { workspace = true }
# Used by tracing
tracing-opentelemetry = { workspace = true }
uuid = { workspace = true }

chroma-config = { workspace = true }
chroma-error = { workspace = true }
chroma-segment = { workspace = true }
chroma-types = { workspace = true }

14 changes: 14 additions & 0 deletions rust/log/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use serde::Deserialize;

#[derive(Deserialize)]
pub struct GrpcLogConfig {
pub host: String,
pub port: u16,
pub connect_timeout_ms: u64,
pub request_timeout_ms: u64,
}

#[derive(Deserialize)]
pub enum LogConfig {
Grpc(GrpcLogConfig),
}
17 changes: 17 additions & 0 deletions rust/log/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
pub mod config;
#[allow(clippy::module_inception)]
pub mod log;
pub mod test;
pub mod tracing;

use chroma_config::Configurable;
use chroma_error::ChromaError;
use config::LogConfig;

pub async fn from_config(config: &LogConfig) -> Result<Box<log::Log>, Box<dyn ChromaError>> {
match &config {
config::LogConfig::Grpc(_) => Ok(Box::new(log::Log::Grpc(
log::GrpcLog::try_from_config(config).await?,
))),
}
}
46 changes: 23 additions & 23 deletions rust/worker/src/log/log.rs → rust/log/src/log.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::log::config::LogConfig;
use crate::tracing::util::client_interceptor;
use crate::tracing::client_interceptor;

use super::config::LogConfig;
use async_trait::async_trait;
use chroma_config::Configurable;
use chroma_error::{ChromaError, ErrorCodes};
Expand All @@ -22,21 +23,21 @@ use uuid::Uuid;
/// - first_log_offset: the offset of the first log entry in the collection that needs to be compacted
/// - first_log_ts: the timestamp of the first log entry in the collection that needs to be compacted
#[derive(Debug)]
pub(crate) struct CollectionInfo {
pub(crate) collection_id: CollectionUuid,
pub(crate) first_log_offset: i64,
pub(crate) first_log_ts: i64,
pub struct CollectionInfo {
pub collection_id: CollectionUuid,
pub first_log_offset: i64,
pub first_log_ts: i64,
}

#[derive(Clone, Debug)]
pub(crate) struct CollectionRecord {
pub(crate) collection_id: CollectionUuid,
pub(crate) tenant_id: String,
pub(crate) last_compaction_time: i64,
pub struct CollectionRecord {
pub collection_id: CollectionUuid,
pub tenant_id: String,
pub last_compaction_time: i64,
#[allow(dead_code)]
pub(crate) first_record_time: i64,
pub(crate) offset: i64,
pub(crate) collection_version: i32,
pub first_record_time: i64,
pub offset: i64,
pub collection_version: i32,
}

#[derive(Clone, Debug)]
Expand All @@ -47,7 +48,7 @@ pub enum Log {
}

impl Log {
pub(crate) async fn read(
pub async fn read(
&mut self,
collection_id: CollectionUuid,
offset: i64,
Expand All @@ -66,7 +67,7 @@ impl Log {
}
}

pub(crate) async fn get_collections_with_new_data(
pub async fn get_collections_with_new_data(
&mut self,
min_compaction_size: u64,
) -> Result<Vec<CollectionInfo>, GetCollectionsWithNewDataError> {
Expand All @@ -76,7 +77,7 @@ impl Log {
}
}

pub(crate) async fn update_collection_log_offset(
pub async fn update_collection_log_offset(
&mut self,
collection_id: CollectionUuid,
new_offset: i64,
Expand Down Expand Up @@ -295,7 +296,7 @@ impl ChromaError for PullLogsError {
}

#[derive(Error, Debug)]
pub(crate) enum GetCollectionsWithNewDataError {
pub enum GetCollectionsWithNewDataError {
#[error("Failed to fetch")]
FailedGetCollectionsWithNewData(#[from] tonic::Status),
}
Expand All @@ -311,7 +312,7 @@ impl ChromaError for GetCollectionsWithNewDataError {
}

#[derive(Error, Debug)]
pub(crate) enum UpdateCollectionLogOffsetError {
pub enum UpdateCollectionLogOffsetError {
#[error("Failed to update collection log offset")]
FailedToUpdateCollectionLogOffset(#[from] tonic::Status),
}
Expand All @@ -330,10 +331,10 @@ impl ChromaError for UpdateCollectionLogOffsetError {
// internal to a mock log implementation
#[derive(Clone)]
pub struct InternalLogRecord {
pub(crate) collection_id: CollectionUuid,
pub(crate) log_offset: i64,
pub(crate) log_ts: i64,
pub(crate) record: LogRecord,
pub collection_id: CollectionUuid,
pub log_offset: i64,
pub log_ts: i64,
pub record: LogRecord,
}

impl Debug for InternalLogRecord {
Expand Down Expand Up @@ -362,7 +363,6 @@ impl InMemoryLog {
}
}

#[cfg(test)]
pub fn add_log(&mut self, collection_id: CollectionUuid, log: InternalLogRecord) {
let logs = self.collection_to_log.entry(collection_id).or_default();
// Ensure that the log offset is correct. Since we only use the InMemoryLog for testing,
Expand Down
File renamed without changes.
47 changes: 47 additions & 0 deletions rust/log/src/tracing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Interceptor copies from worker::tracing::util
// TODO: A seperate tracing crate

use std::str::FromStr;

use opentelemetry::trace::TraceContextExt;
use tonic::{metadata::MetadataValue, Request, Status};
use tracing::Span;
use tracing_opentelemetry::OpenTelemetrySpanExt;

const TRACE_ID_HEADER_KEY: &str = "chroma-traceid";
const SPAN_ID_HEADER_KEY: &str = "chroma-spanid";

pub(crate) fn client_interceptor(request: Request<()>) -> Result<Request<()>, Status> {
// If span is disabled then nothing to append in the header.
if Span::current().is_disabled() {
return Ok(request);
}
let mut mut_request = request;
let metadata = mut_request.metadata_mut();
let trace_id = MetadataValue::from_str(
Span::current()
.context()
.span()
.span_context()
.trace_id()
.to_string()
.as_str(),
);
let span_id = MetadataValue::from_str(
Span::current()
.context()
.span()
.span_context()
.span_id()
.to_string()
.as_str(),
);
// Errors are not fatal.
if let Ok(id) = trace_id {
metadata.append(TRACE_ID_HEADER_KEY, id);
}
if let Ok(id) = span_id {
metadata.append(SPAN_ID_HEADER_KEY, id);
}
Ok(mut_request)
}
1 change: 1 addition & 0 deletions rust/segment/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ tokio = { workspace = true }
tempfile = { workspace = true }

chroma-cache = { workspace = true }
chroma-log = { workspace = true }
chroma-storage = { workspace = true }

Loading

0 comments on commit c5f7d2e

Please sign in to comment.