Skip to content

Commit

Permalink
feature: move words into clickhouse
Browse files Browse the repository at this point in the history
  • Loading branch information
densumesh authored and cdxker committed Aug 13, 2024
1 parent 1ff8d94 commit 566d775
Show file tree
Hide file tree
Showing 20 changed files with 486 additions and 372 deletions.
2 changes: 1 addition & 1 deletion server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

DROP TABLE IF EXISTS words_datasets;
12 changes: 12 additions & 0 deletions server/ch_migrations/1723258343_store_words_in_clickhouse/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS words_datasets (
id UUID NOT NULL,
dataset_id UUID NOT NULL,
word String NOT NULL,
count Int32 NOT NULL,
created_at DateTime DEFAULT now() NOT NULL,
INDEX idx_created_at created_at TYPE minmax GRANULARITY 8192,
INDEX idx_id id TYPE minmax GRANULARITY 8192
) ENGINE = SummingMergeTree(created_at)
ORDER BY (dataset_id, word)
PARTITION BY dataset_id;

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS dataset_words_last_processed;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS dataset_words_last_processed (
last_processed DateTime DEFAULT now() NOT NULL,
dataset_id UUID NOT NULL,
) ENGINE = ReplacingMergeTree(last_processed)
ORDER BY (dataset_id)
PARTITION BY dataset_id;
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ BEGIN
SET chunk_count = dataset_usage_counts.chunk_count + new_count;

-- Update dataset
UPDATE dataset
UPDATE datasets
SET updated_at = CURRENT_TIMESTAMP
WHERE id = d_id;

Expand All @@ -29,7 +29,7 @@ BEGIN
WHERE dataset_id = d_id;

-- Update dataset
UPDATE dataset
UPDATE datasets
SET updated_at = CURRENT_TIMESTAMP
WHERE id = d_id;
END IF;
Expand Down
16 changes: 16 additions & 0 deletions server/migrations/2024-08-10-032512_delete_tables/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- This file should undo anything in `up.sql`
CREATE TABLE IF NOT EXISTS "words_in_datasets" (
id UUID PRIMARY KEY,
word TEXT NOT NULL,
UNIQUE(word)
);

CREATE TABLE IF NOT EXISTS "words_datasets" (
id UUID PRIMARY KEY,
dataset_id UUID NOT NULL,
word_id UUID NOT NULL,
count INT NOT NULL,
UNIQUE(dataset_id, word_id),
FOREIGN KEY (dataset_id) REFERENCES "datasets"(id) ON DELETE CASCADE,
FOREIGN KEY (word_id) REFERENCES "words_in_datasets"(id) ON DELETE CASCADE
);
5 changes: 5 additions & 0 deletions server/migrations/2024-08-10-032512_delete_tables/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Your SQL goes here
DROP TABLE IF EXISTS "words_datasets";
DROP TABLE IF EXISTS "words_in_datasets";


Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- This file should undo anything in `up.sql`
CREATE TABLE IF NOT EXISTS "dataset_words_last_processed" (
id UUID PRIMARY KEY,
last_processed TIMESTAMP NULL,
dataset_id UUID NOT NULL,
FOREIGN KEY (dataset_id) REFERENCES "datasets"(id) ON DELETE CASCADE,
UNIQUE(dataset_id)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Your SQL goes
DROP TABLE IF EXISTS "dataset_words_last_processed";
100 changes: 67 additions & 33 deletions server/src/bin/bktree-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ use std::sync::{
Arc,
};

use diesel_async::pooled_connection::{AsyncDieselConnectionManager, ManagerConfig};
use chm::tools::migrations::SetupArgs;
use sentry::{Hub, SentryFutureExt};
use signal_hook::consts::SIGTERM;
use tracing_subscriber::{prelude::*, EnvFilter, Layer};
use trieve_server::{
data::models::{Pool, RedisPool},
data::models::RedisPool,
errors::ServiceError,
establish_connection, get_env,
get_env,
operators::{
dataset_operator::scroll_words_from_dataset,
dataset_operator::{scroll_words_from_dataset, update_dataset_last_processed_query},
words_operator::{get_bktree_from_redis_query, BkTree, CreateBkTreeMessage},
},
};
Expand Down Expand Up @@ -56,22 +56,6 @@ fn main() {
None
};

let database_url = get_env!("DATABASE_URL", "DATABASE_URL is not set");

let mut config = ManagerConfig::default();
config.custom_setup = Box::new(establish_connection);

let mgr = AsyncDieselConnectionManager::<diesel_async::AsyncPgConnection>::new_with_config(
database_url,
config,
);

let pool = diesel_async::pooled_connection::deadpool::Pool::builder(mgr)
.max_size(3)
.build()
.expect("Failed to create diesel_async pool");

let web_pool = actix_web::web::Data::new(pool.clone());
let should_terminate = Arc::new(AtomicBool::new(false));
signal_hook::flag::register(SIGTERM, Arc::clone(&should_terminate))
.expect("Failed to register shutdown hook");
Expand Down Expand Up @@ -100,11 +84,33 @@ fn main() {

let web_redis_pool = actix_web::web::Data::new(redis_pool);

let args = SetupArgs {
url: Some(get_env!("CLICKHOUSE_URL", "CLICKHOUSE_URL is not set").to_string()),
user: Some(
get_env!("CLICKHOUSE_USER", "CLICKHOUSE_USER is not set").to_string(),
),
password: Some(
get_env!("CLICKHOUSE_PASSWORD", "CLICKHOUSE_PASSWORD is not set")
.to_string(),
),
database: Some(
get_env!("CLICKHOUSE_DB", "CLICKHOUSE_DB is not set").to_string(),
),
};

let clickhouse_client = clickhouse::Client::default()
.with_url(args.url.as_ref().unwrap())
.with_user(args.user.as_ref().unwrap())
.with_password(args.password.as_ref().unwrap())
.with_database(args.database.as_ref().unwrap())
.with_option("async_insert", "1")
.with_option("wait_for_async_insert", "0");

let should_terminate = Arc::new(AtomicBool::new(false));
signal_hook::flag::register(SIGTERM, Arc::clone(&should_terminate))
.expect("Failed to register shutdown hook");

bktree_worker(should_terminate, web_redis_pool, web_pool).await
bktree_worker(should_terminate, web_redis_pool, clickhouse_client).await
}
.bind_hub(Hub::new_from_top(Hub::current())),
);
Expand All @@ -113,7 +119,7 @@ fn main() {
async fn bktree_worker(
should_terminate: Arc<AtomicBool>,
redis_pool: actix_web::web::Data<RedisPool>,
web_pool: actix_web::web::Data<Pool>,
clickhouse_client: clickhouse::Client,
) {
log::info!("Starting bk tree service thread");

Expand Down Expand Up @@ -195,8 +201,18 @@ async fn bktree_worker(
}
};

let mut word_id_offset = uuid::Uuid::nil();
let mut id_offset = uuid::Uuid::nil();
log::info!("Processing dataset {}", create_tree_msg.dataset_id);

match update_dataset_last_processed_query(create_tree_msg.dataset_id, &clickhouse_client)
.await
{
Ok(_) => {}
Err(err) => {
log::error!("Failed to update last processed {:?}", err);
}
}

let mut bk_tree = if let Ok(Some(bktree)) =
get_bktree_from_redis_query(create_tree_msg.dataset_id, redis_pool.clone()).await
{
Expand All @@ -205,26 +221,44 @@ async fn bktree_worker(
BkTree::new()
};

let mut failed = false;

while let Ok(Some(word_and_counts)) = scroll_words_from_dataset(
create_tree_msg.dataset_id,
word_id_offset,
id_offset,
1000,
web_pool.clone(),
&clickhouse_client,
)
.await
{
.map_err(|err| {
let err = err.clone();
let redis_pool = redis_pool.clone();
let create_tree_msg = create_tree_msg.clone();
tokio::spawn(async move {
let _ = readd_error_to_queue(create_tree_msg.clone(), &err, redis_pool.clone())
.await
.map_err(|e| {
eprintln!("Failed to readd error to queue: {:?}", e);
});
});
failed = true;
}) {
if let Some(last_word) = word_and_counts.last() {
word_id_offset = last_word.0;
id_offset = last_word.id;
}

let word_and_counts = word_and_counts
.into_iter()
.map(|(_, word, count)| (word, count))
.map(|words| (words.word, words.count))
.collect::<Vec<(String, i32)>>();

bk_tree.insert_all(word_and_counts);
}

if failed {
continue;
}

match rmp_serde::to_vec(&bk_tree) {
Ok(serialized_tree) => {
match redis::cmd("SET")
Expand Down Expand Up @@ -252,8 +286,8 @@ async fn bktree_worker(
}
Err(err) => {
let _ = readd_error_to_queue(
create_tree_msg,
ServiceError::InternalServerError(format!(
create_tree_msg.clone(),
&ServiceError::InternalServerError(format!(
"Failed to serialize tree: {:?}",
err
)),
Expand All @@ -265,8 +299,8 @@ async fn bktree_worker(
}
Err(err) => {
let _ = readd_error_to_queue(
create_tree_msg,
ServiceError::InternalServerError(format!(
create_tree_msg.clone(),
&ServiceError::InternalServerError(format!(
"Failed to serialize tree: {:?}",
err
)),
Expand All @@ -280,7 +314,7 @@ async fn bktree_worker(

pub async fn readd_error_to_queue(
message: CreateBkTreeMessage,
error: ServiceError,
error: &ServiceError,
redis_pool: actix_web::web::Data<RedisPool>,
) -> Result<(), ServiceError> {
let mut message = message;
Expand Down
13 changes: 13 additions & 0 deletions server/src/bin/file-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,19 @@ async fn file_worker(
Ok(Some(file_id)) => {
log::info!("Uploaded file: {:?}", file_id);

event_queue
.send(ClickHouseEvent::WorkerEvent(
models::WorkerEvent::from_details(
file_worker_message.dataset_id,
models::EventType::FileUploaded {
file_id,
file_name: file_worker_message.upload_file_data.file_name.clone(),
},
)
.into(),
))
.await;

let _ = redis::cmd("LREM")
.arg("file_processing")
.arg(1)
Expand Down
Loading

0 comments on commit 566d775

Please sign in to comment.