Skip to content

Commit

Permalink
cleanup: use a set for bktree creation rather than a list
Browse files Browse the repository at this point in the history
fix debug
  • Loading branch information
densumesh committed Aug 22, 2024
1 parent 6a18478 commit fa3541e
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 20 deletions.
29 changes: 16 additions & 13 deletions server/src/bin/bktree-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,8 @@ async fn bktree_worker(
break;
}

let payload_result: Result<Vec<String>, redis::RedisError> = redis::cmd("brpoplpush")
let payload_result: Result<Vec<String>, redis::RedisError> = redis::cmd("SPOP")
.arg("bktree_creation")
.arg("bktree_processing")
.arg(1.0)
.query_async(&mut *redis_connection)
.await;

Expand All @@ -171,6 +169,10 @@ async fn bktree_worker(
if payload.is_empty() {
continue;
}
let _: Result<i32, redis::RedisError> = redis::cmd("SADD")
.arg("bktree_processing")
.query_async(&mut *redis_connection)
.await;

payload
.first()
Expand Down Expand Up @@ -204,15 +206,6 @@ async fn bktree_worker(
let mut id_offset = uuid::Uuid::nil();
log::info!("Processing dataset {}", create_tree_msg.dataset_id);

match update_dataset_last_processed_query(create_tree_msg.dataset_id, &clickhouse_client)
.await
{
Ok(_) => {}
Err(err) => {
log::error!("Failed to update last processed {:?}", err);
}
}

let mut bk_tree = if let Ok(Some(bktree)) =
get_bktree_from_redis_query(create_tree_msg.dataset_id, redis_pool.clone()).await
{
Expand All @@ -226,7 +219,7 @@ async fn bktree_worker(
while let Ok(Some(word_and_counts)) = scroll_words_from_dataset(
create_tree_msg.dataset_id,
id_offset,
1000,
10000,
&clickhouse_client,
)
.await
Expand All @@ -243,6 +236,7 @@ async fn bktree_worker(
});
failed = true;
}) {
println!("Processing offset: {:?}", id_offset);
if let Some(last_word) = word_and_counts.last() {
id_offset = last_word.id;
}
Expand Down Expand Up @@ -309,6 +303,15 @@ async fn bktree_worker(
.await;
}
}
match update_dataset_last_processed_query(create_tree_msg.dataset_id, &clickhouse_client)
.await
{
Ok(_) => {}
Err(err) => {
log::error!("Failed to update last processed {:?}", err);
}
}
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
}

Expand Down
1 change: 0 additions & 1 deletion server/src/bin/word-id-cronjob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ async fn main() -> Result<(), ServiceError> {
.collect_vec();

let dataset_ids_and_processed = join_all(dataset_ids_and_processed).await;
dbg!(&dataset_ids_and_processed);

for (dataset_id, last_processed) in dataset_ids_and_processed {
let mut chunk_id_offset = uuid::Uuid::nil();
Expand Down
6 changes: 3 additions & 3 deletions server/src/bin/word-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ async fn process_chunks(
})
.collect::<Result<Vec<String>, ServiceError>>()?;

redis::cmd("LPUSH")
redis::cmd("SADD")
.arg("bktree_creation")
.arg(create_tree_msgs)
.query_async::<redis::aio::MultiplexedConnection, usize>(&mut *redis_conn)
Expand All @@ -379,7 +379,7 @@ pub async fn readd_error_to_queue(
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

let _ = redis::cmd("LREM")
let _ = redis::cmd("SPOP")
.arg("process_dictionary")
.arg(1)
.arg(old_payload_message.clone())
Expand All @@ -406,7 +406,7 @@ pub async fn readd_error_to_queue(
ServiceError::InternalServerError("Failed to reserialize input for retry".to_string())
})?;

redis::cmd("lpush")
redis::cmd("SADD")
.arg("create_dictionary")
.arg(&new_payload_message)
.query_async(&mut *redis_conn)
Expand Down
1 change: 0 additions & 1 deletion server/src/operators/chunk_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2411,7 +2411,6 @@ pub async fn scroll_chunk_ids_for_dictionary_query(
.into_boxed();

if let Some(last_processed) = last_processed {
dbg!(&last_processed.last_processed.unix_timestamp());
let last_processed =
NaiveDateTime::from_timestamp(last_processed.last_processed.unix_timestamp(), 0);

Expand Down
2 changes: 0 additions & 2 deletions server/src/operators/words_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,6 @@ fn correct_query_helper(tree: &BkTree, query: String, options: &TypoOptions) ->
}
}

dbg!(&query_split_to_correction);

let mut corrected_query = query.clone();

if !query_split_to_correction.is_empty() {
Expand Down

0 comments on commit fa3541e

Please sign in to comment.