Skip to content

Commit

Permalink
cleanup: use a set for bktree creation rather than a list
Browse files Browse the repository at this point in the history
fix debug
  • Loading branch information
densumesh committed Aug 26, 2024
1 parent 93f4f4d commit 52d97ed
Show file tree
Hide file tree
Showing 8 changed files with 408 additions and 152 deletions.
62 changes: 37 additions & 25 deletions server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,11 @@ tantivy = "0.22.0"
strsim = "0.11.1"
levenshtein_automata = "0.2.1"
bktree = "1.0.1"
rmp-serde = "1.3.0"
flate2 = "1.0.31"
bincode = "1.3"
rayon = "1.10.0"
crossbeam = "0.8.4"


[build-dependencies]
dotenvy = "0.15.7"
Expand Down
114 changes: 59 additions & 55 deletions server/src/bin/bktree-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::{
};

use chm::tools::migrations::SetupArgs;
use rand::Rng;
use sentry::{Hub, SentryFutureExt};
use signal_hook::consts::SIGTERM;
use tracing_subscriber::{prelude::*, EnvFilter, Layer};
Expand All @@ -12,8 +13,9 @@ use trieve_server::{
errors::ServiceError,
get_env,
operators::{
chunk_operator::get_last_processed_from_clickhouse,
dataset_operator::{scroll_words_from_dataset, update_dataset_last_processed_query},
words_operator::{get_bktree_from_redis_query, BkTree, CreateBkTreeMessage},
words_operator::{BkTree, CreateBkTreeMessage},
},
};

Expand Down Expand Up @@ -116,6 +118,7 @@ fn main() {
);
}

#[allow(clippy::print_stdout)]
async fn bktree_worker(
should_terminate: Arc<AtomicBool>,
redis_pool: actix_web::web::Data<RedisPool>,
Expand Down Expand Up @@ -157,10 +160,8 @@ async fn bktree_worker(
break;
}

let payload_result: Result<Vec<String>, redis::RedisError> = redis::cmd("brpoplpush")
let payload_result: Result<Vec<String>, redis::RedisError> = redis::cmd("SPOP")
.arg("bktree_creation")
.arg("bktree_processing")
.arg(1.0)
.query_async(&mut *redis_connection)
.await;

Expand All @@ -171,6 +172,10 @@ async fn bktree_worker(
if payload.is_empty() {
continue;
}
let _: Result<i32, redis::RedisError> = redis::cmd("SADD")
.arg("bktree_processing")
.query_async(&mut *redis_connection)
.await;

payload
.first()
Expand Down Expand Up @@ -204,17 +209,8 @@ async fn bktree_worker(
let mut id_offset = uuid::Uuid::nil();
log::info!("Processing dataset {}", create_tree_msg.dataset_id);

match update_dataset_last_processed_query(create_tree_msg.dataset_id, &clickhouse_client)
.await
{
Ok(_) => {}
Err(err) => {
log::error!("Failed to update last processed {:?}", err);
}
}

let mut bk_tree = if let Ok(Some(bktree)) =
get_bktree_from_redis_query(create_tree_msg.dataset_id, redis_pool.clone()).await
BkTree::from_redis(create_tree_msg.dataset_id, redis_pool.clone()).await
{
bktree
} else {
Expand All @@ -223,10 +219,27 @@ async fn bktree_worker(

let mut failed = false;

let last_processed =
get_last_processed_from_clickhouse(&clickhouse_client, create_tree_msg.dataset_id)
.await;

let last_processed = match last_processed {
Ok(last_processed) => last_processed.map(|lp| lp.last_processed),
Err(err) => {
let _ = readd_error_to_queue(create_tree_msg.clone(), &err, redis_pool.clone())
.await
.map_err(|e| {
eprintln!("Failed to readd error to queue: {:?}", e);
});
continue;
}
};

while let Ok(Some(word_and_counts)) = scroll_words_from_dataset(
create_tree_msg.dataset_id,
id_offset,
1000,
last_processed,
5000,
&clickhouse_client,
)
.await
Expand All @@ -243,6 +256,7 @@ async fn bktree_worker(
});
failed = true;
}) {
dbg!(id_offset);
if let Some(last_word) = word_and_counts.last() {
id_offset = last_word.id;
}
Expand All @@ -259,43 +273,22 @@ async fn bktree_worker(
continue;
}

match rmp_serde::to_vec(&bk_tree) {
Ok(serialized_tree) => {
match redis::cmd("SET")
.arg(format!("bk_tree_{}", create_tree_msg.dataset_id))
.arg(serialized_tree)
.query_async::<redis::aio::MultiplexedConnection, String>(
&mut *redis_connection,
)
.await
{
Ok(_) => {
let _ = redis::cmd("LREM")
.arg("bktree_processing")
.arg(1)
.arg(serialized_message.clone())
.query_async::<redis::aio::MultiplexedConnection, usize>(
&mut *redis_connection,
)
.await;

log::info!(
"Succesfully created bk-tree for {}",
create_tree_msg.dataset_id
);
}
Err(err) => {
let _ = readd_error_to_queue(
create_tree_msg.clone(),
&ServiceError::InternalServerError(format!(
"Failed to serialize tree: {:?}",
err
)),
redis_pool.clone(),
)
.await;
}
}
match bk_tree
.save(create_tree_msg.dataset_id, redis_pool.clone())
.await
{
Ok(()) => {
let _ = redis::cmd("LREM")
.arg("bktree_processing")
.arg(1)
.arg(serialized_message.clone())
.query_async::<redis::aio::MultiplexedConnection, usize>(&mut *redis_connection)
.await;

log::info!(
"Succesfully created bk-tree for {}",
create_tree_msg.dataset_id
);
}
Err(err) => {
let _ = readd_error_to_queue(
Expand All @@ -309,6 +302,17 @@ async fn bktree_worker(
.await;
}
}

match update_dataset_last_processed_query(create_tree_msg.dataset_id, &clickhouse_client)
.await
{
Ok(_) => {}
Err(err) => {
log::error!("Failed to update last processed {:?}", err);
}
}
let sleep_duration = rand::thread_rng().gen_range(1..=10);
tokio::time::sleep(std::time::Duration::from_secs(sleep_duration)).await;
}
}

Expand All @@ -328,7 +332,7 @@ pub async fn readd_error_to_queue(
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

let _ = redis::cmd("LREM")
let _ = redis::cmd("SREM")
.arg("bktree_processing")
.arg(1)
.arg(old_payload_message.clone())
Expand All @@ -344,7 +348,7 @@ pub async fn readd_error_to_queue(
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

redis::cmd("lpush")
redis::cmd("SADD")
.arg("bktree_dead_letters")
.arg(old_payload_message)
.query_async(&mut *redis_conn)
Expand All @@ -371,7 +375,7 @@ pub async fn readd_error_to_queue(
message.attempt_number
);

redis::cmd("lpush")
redis::cmd("SADD")
.arg("bktree_creation")
.arg(&new_payload_message)
.query_async(&mut *redis_conn)
Expand Down
1 change: 0 additions & 1 deletion server/src/bin/word-id-cronjob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ async fn main() -> Result<(), ServiceError> {
.collect_vec();

let dataset_ids_and_processed = join_all(dataset_ids_and_processed).await;
dbg!(&dataset_ids_and_processed);

for (dataset_id, last_processed) in dataset_ids_and_processed {
let mut chunk_id_offset = uuid::Uuid::nil();
Expand Down
4 changes: 2 additions & 2 deletions server/src/bin/word-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ async fn process_chunks(
})
.collect::<Result<Vec<String>, ServiceError>>()?;

redis::cmd("LPUSH")
redis::cmd("SADD")
.arg("bktree_creation")
.arg(create_tree_msgs)
.query_async::<redis::aio::MultiplexedConnection, usize>(&mut *redis_conn)
Expand All @@ -379,7 +379,7 @@ pub async fn readd_error_to_queue(
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

let _ = redis::cmd("LREM")
let _ = redis::cmd("lrem")
.arg("process_dictionary")
.arg(1)
.arg(old_payload_message.clone())
Expand Down
3 changes: 1 addition & 2 deletions server/src/operators/chunk_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2411,7 +2411,6 @@ pub async fn scroll_chunk_ids_for_dictionary_query(
.into_boxed();

if let Some(last_processed) = last_processed {
dbg!(&last_processed.last_processed.unix_timestamp());
let last_processed =
NaiveDateTime::from_timestamp(last_processed.last_processed.unix_timestamp(), 0);

Expand Down Expand Up @@ -2449,7 +2448,7 @@ pub async fn get_last_processed_from_clickhouse(
dataset_id: uuid::Uuid,
) -> Result<Option<DatasetLastProcessed>, ServiceError> {
let query = format!(
"SELECT ?fields FROM dataset_words_last_processed WHERE dataset_id = '{}' LIMIT 1",
"SELECT dataset_id, min(last_processed) as last_processed FROM dataset_words_last_processed WHERE dataset_id = '{}' GROUP BY dataset_id LIMIT 1",
dataset_id
);

Expand Down
Loading

0 comments on commit 52d97ed

Please sign in to comment.