diff --git a/server/src/bin/bktree-worker.rs b/server/src/bin/bktree-worker.rs index 3208058051..f39fe02a1f 100644 --- a/server/src/bin/bktree-worker.rs +++ b/server/src/bin/bktree-worker.rs @@ -157,10 +157,8 @@ async fn bktree_worker( break; } - let payload_result: Result, redis::RedisError> = redis::cmd("brpoplpush") + let payload_result: Result, redis::RedisError> = redis::cmd("SPOP") .arg("bktree_creation") - .arg("bktree_processing") - .arg(1.0) .query_async(&mut *redis_connection) .await; @@ -171,6 +169,10 @@ async fn bktree_worker( if payload.is_empty() { continue; } + let _: Result = redis::cmd("SADD") + .arg("bktree_processing") + .query_async(&mut *redis_connection) + .await; payload .first() @@ -204,15 +206,6 @@ async fn bktree_worker( let mut id_offset = uuid::Uuid::nil(); log::info!("Processing dataset {}", create_tree_msg.dataset_id); - match update_dataset_last_processed_query(create_tree_msg.dataset_id, &clickhouse_client) - .await - { - Ok(_) => {} - Err(err) => { - log::error!("Failed to update last processed {:?}", err); - } - } - let mut bk_tree = if let Ok(Some(bktree)) = get_bktree_from_redis_query(create_tree_msg.dataset_id, redis_pool.clone()).await { @@ -226,7 +219,7 @@ async fn bktree_worker( while let Ok(Some(word_and_counts)) = scroll_words_from_dataset( create_tree_msg.dataset_id, id_offset, - 1000, + 10000, &clickhouse_client, ) .await @@ -243,6 +236,7 @@ async fn bktree_worker( }); failed = true; }) { + println!("Processing offset: {:?}", id_offset); if let Some(last_word) = word_and_counts.last() { id_offset = last_word.id; } @@ -309,6 +303,15 @@ async fn bktree_worker( .await; } } + match update_dataset_last_processed_query(create_tree_msg.dataset_id, &clickhouse_client) + .await + { + Ok(_) => {} + Err(err) => { + log::error!("Failed to update last processed {:?}", err); + } + } + tokio::time::sleep(std::time::Duration::from_secs(10)).await; } } diff --git a/server/src/bin/word-id-cronjob.rs b/server/src/bin/word-id-cronjob.rs index 0fd32326f0..6b1f09861f 100644 --- a/server/src/bin/word-id-cronjob.rs +++ b/server/src/bin/word-id-cronjob.rs @@ -121,7 +121,6 @@ async fn main() -> Result<(), ServiceError> { .collect_vec(); let dataset_ids_and_processed = join_all(dataset_ids_and_processed).await; - dbg!(&dataset_ids_and_processed); for (dataset_id, last_processed) in dataset_ids_and_processed { let mut chunk_id_offset = uuid::Uuid::nil(); diff --git a/server/src/bin/word-worker.rs b/server/src/bin/word-worker.rs index 6f16974d35..b1f8d8b0bd 100644 --- a/server/src/bin/word-worker.rs +++ b/server/src/bin/word-worker.rs @@ -352,7 +352,7 @@ async fn process_chunks( }) .collect::, ServiceError>>()?; - redis::cmd("LPUSH") + redis::cmd("SADD") .arg("bktree_creation") .arg(create_tree_msgs) .query_async::(&mut *redis_conn) @@ -379,7 +379,7 @@ pub async fn readd_error_to_queue( .await .map_err(|err| ServiceError::BadRequest(err.to_string()))?; - let _ = redis::cmd("LREM") + let _ = redis::cmd("SPOP") .arg("process_dictionary") .arg(1) .arg(old_payload_message.clone()) @@ -406,7 +406,7 @@ pub async fn readd_error_to_queue( ServiceError::InternalServerError("Failed to reserialize input for retry".to_string()) })?; - redis::cmd("lpush") + redis::cmd("SADD") .arg("create_dictionary") .arg(&new_payload_message) .query_async(&mut *redis_conn) diff --git a/server/src/operators/chunk_operator.rs b/server/src/operators/chunk_operator.rs index 26c1c21018..9f37438382 100644 --- a/server/src/operators/chunk_operator.rs +++ b/server/src/operators/chunk_operator.rs @@ -2411,7 +2411,6 @@ pub async fn scroll_chunk_ids_for_dictionary_query( .into_boxed(); if let Some(last_processed) = last_processed { - dbg!(&last_processed.last_processed.unix_timestamp()); let last_processed = NaiveDateTime::from_timestamp(last_processed.last_processed.unix_timestamp(), 0); diff --git a/server/src/operators/words_operator.rs b/server/src/operators/words_operator.rs index e8fa7d110e..a60a65ddbb 100644 --- a/server/src/operators/words_operator.rs +++ b/server/src/operators/words_operator.rs @@ -313,8 +313,6 @@ fn correct_query_helper(tree: &BkTree, query: String, options: &TypoOptions) -> } } - dbg!(&query_split_to_correction); - let mut corrected_query = query.clone(); if !query_split_to_correction.is_empty() {