Skip to content

Commit

Permalink
bugfix: ensure that the bktree gets built every time
Browse files Browse the repository at this point in the history
  • Loading branch information
densumesh committed Aug 26, 2024
1 parent fa3541e commit b3d4926
Show file tree
Hide file tree
Showing 7 changed files with 395 additions and 138 deletions.
62 changes: 37 additions & 25 deletions server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,11 @@ tantivy = "0.22.0"
strsim = "0.11.1"
levenshtein_automata = "0.2.1"
bktree = "1.0.1"
rmp-serde = "1.3.0"
flate2 = "1.0.31"
bincode = "1.3"
rayon = "1.10.0"
crossbeam = "0.8.4"


[build-dependencies]
dotenvy = "0.15.7"
Expand Down
91 changes: 46 additions & 45 deletions server/src/bin/bktree-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::{
};

use chm::tools::migrations::SetupArgs;
use rand::Rng;
use sentry::{Hub, SentryFutureExt};
use signal_hook::consts::SIGTERM;
use tracing_subscriber::{prelude::*, EnvFilter, Layer};
Expand All @@ -12,8 +13,9 @@ use trieve_server::{
errors::ServiceError,
get_env,
operators::{
chunk_operator::get_last_processed_from_clickhouse,
dataset_operator::{scroll_words_from_dataset, update_dataset_last_processed_query},
words_operator::{get_bktree_from_redis_query, BkTree, CreateBkTreeMessage},
words_operator::{BkTree, CreateBkTreeMessage},
},
};

Expand Down Expand Up @@ -116,6 +118,7 @@ fn main() {
);
}

#[allow(clippy::print_stdout)]
async fn bktree_worker(
should_terminate: Arc<AtomicBool>,
redis_pool: actix_web::web::Data<RedisPool>,
Expand Down Expand Up @@ -207,7 +210,7 @@ async fn bktree_worker(
log::info!("Processing dataset {}", create_tree_msg.dataset_id);

let mut bk_tree = if let Ok(Some(bktree)) =
get_bktree_from_redis_query(create_tree_msg.dataset_id, redis_pool.clone()).await
BkTree::from_redis(create_tree_msg.dataset_id, redis_pool.clone()).await
{
bktree
} else {
Expand All @@ -216,10 +219,27 @@ async fn bktree_worker(

let mut failed = false;

let last_processed =
get_last_processed_from_clickhouse(&clickhouse_client, create_tree_msg.dataset_id)
.await;

let last_processed = match last_processed {
Ok(last_processed) => last_processed.map(|lp| lp.last_processed),
Err(err) => {
let _ = readd_error_to_queue(create_tree_msg.clone(), &err, redis_pool.clone())
.await
.map_err(|e| {
eprintln!("Failed to readd error to queue: {:?}", e);
});
continue;
}
};

while let Ok(Some(word_and_counts)) = scroll_words_from_dataset(
create_tree_msg.dataset_id,
id_offset,
10000,
last_processed,
5000,
&clickhouse_client,
)
.await
Expand All @@ -236,7 +256,7 @@ async fn bktree_worker(
});
failed = true;
}) {
println!("Processing offset: {:?}", id_offset);
dbg!(id_offset);
if let Some(last_word) = word_and_counts.last() {
id_offset = last_word.id;
}
Expand All @@ -253,43 +273,22 @@ async fn bktree_worker(
continue;
}

match rmp_serde::to_vec(&bk_tree) {
Ok(serialized_tree) => {
match redis::cmd("SET")
.arg(format!("bk_tree_{}", create_tree_msg.dataset_id))
.arg(serialized_tree)
.query_async::<redis::aio::MultiplexedConnection, String>(
&mut *redis_connection,
)
.await
{
Ok(_) => {
let _ = redis::cmd("LREM")
.arg("bktree_processing")
.arg(1)
.arg(serialized_message.clone())
.query_async::<redis::aio::MultiplexedConnection, usize>(
&mut *redis_connection,
)
.await;

log::info!(
"Succesfully created bk-tree for {}",
create_tree_msg.dataset_id
);
}
Err(err) => {
let _ = readd_error_to_queue(
create_tree_msg.clone(),
&ServiceError::InternalServerError(format!(
"Failed to serialize tree: {:?}",
err
)),
redis_pool.clone(),
)
.await;
}
}
match bk_tree
.save(create_tree_msg.dataset_id, redis_pool.clone())
.await
{
Ok(()) => {
let _ = redis::cmd("LREM")
.arg("bktree_processing")
.arg(1)
.arg(serialized_message.clone())
.query_async::<redis::aio::MultiplexedConnection, usize>(&mut *redis_connection)
.await;

log::info!(
"Succesfully created bk-tree for {}",
create_tree_msg.dataset_id
);
}
Err(err) => {
let _ = readd_error_to_queue(
Expand All @@ -303,6 +302,7 @@ async fn bktree_worker(
.await;
}
}

match update_dataset_last_processed_query(create_tree_msg.dataset_id, &clickhouse_client)
.await
{
Expand All @@ -311,7 +311,8 @@ async fn bktree_worker(
log::error!("Failed to update last processed {:?}", err);
}
}
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
let sleep_duration = rand::thread_rng().gen_range(1..=10);
tokio::time::sleep(std::time::Duration::from_secs(sleep_duration)).await;
}
}

Expand All @@ -331,7 +332,7 @@ pub async fn readd_error_to_queue(
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

let _ = redis::cmd("LREM")
let _ = redis::cmd("SREM")
.arg("bktree_processing")
.arg(1)
.arg(old_payload_message.clone())
Expand All @@ -347,7 +348,7 @@ pub async fn readd_error_to_queue(
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

redis::cmd("lpush")
redis::cmd("SADD")
.arg("bktree_dead_letters")
.arg(old_payload_message)
.query_async(&mut *redis_conn)
Expand All @@ -374,7 +375,7 @@ pub async fn readd_error_to_queue(
message.attempt_number
);

redis::cmd("lpush")
redis::cmd("SADD")
.arg("bktree_creation")
.arg(&new_payload_message)
.query_async(&mut *redis_conn)
Expand Down
4 changes: 2 additions & 2 deletions server/src/bin/word-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ pub async fn readd_error_to_queue(
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

let _ = redis::cmd("SPOP")
let _ = redis::cmd("lrem")
.arg("process_dictionary")
.arg(1)
.arg(old_payload_message.clone())
Expand All @@ -406,7 +406,7 @@ pub async fn readd_error_to_queue(
ServiceError::InternalServerError("Failed to reserialize input for retry".to_string())
})?;

redis::cmd("SADD")
redis::cmd("lpush")
.arg("create_dictionary")
.arg(&new_payload_message)
.query_async(&mut *redis_conn)
Expand Down
2 changes: 1 addition & 1 deletion server/src/operators/chunk_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2448,7 +2448,7 @@ pub async fn get_last_processed_from_clickhouse(
dataset_id: uuid::Uuid,
) -> Result<Option<DatasetLastProcessed>, ServiceError> {
let query = format!(
"SELECT ?fields FROM dataset_words_last_processed WHERE dataset_id = '{}' LIMIT 1",
"SELECT dataset_id, min(last_processed) as last_processed FROM dataset_words_last_processed WHERE dataset_id = '{}' GROUP BY dataset_id LIMIT 1",
dataset_id
);

Expand Down
Loading

0 comments on commit b3d4926

Please sign in to comment.