Skip to content

Commit

Permalink
cleanup: properly handle fallback types for redis for Rust '24 version
Browse files Browse the repository at this point in the history
  • Loading branch information
skeptrunedev committed Sep 6, 2024
1 parent a79731f commit 393ae80
Show file tree
Hide file tree
Showing 18 changed files with 40 additions and 34 deletions.
2 changes: 1 addition & 1 deletion server/src/bin/bktree-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ pub async fn readd_error_to_queue(
redis::cmd("SADD")
.arg("bktree_dead_letters")
.arg(old_payload_message)
.query_async(&mut *redis_conn)
.query_async::<redis::aio::MultiplexedConnection, ()>(&mut *redis_conn)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

Expand Down
4 changes: 2 additions & 2 deletions server/src/bin/delete-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ pub async fn readd_error_to_queue(
redis::cmd("lpush")
.arg("dead_letters_delete")
.arg(old_payload_message)
.query_async(&mut *redis_conn)
.query_async::<redis::aio::MultiplexedConnection, ()>(&mut *redis_conn)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

Expand Down Expand Up @@ -456,7 +456,7 @@ pub async fn readd_error_to_queue(
redis::cmd("lpush")
.arg("delete_dataset_queue")
.arg(&new_payload_message)
.query_async(&mut *redis_conn)
.query_async::<redis::aio::MultiplexedConnection, ()>(&mut *redis_conn)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

Expand Down
6 changes: 3 additions & 3 deletions server/src/bin/file-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ async fn upload_file(
let tika_html_parse_span = transaction.start_child("tika_html_parse", "Parse tika html");

let tika_response = tika_client
.put(&format!("{}/tika", tika_url))
.put(format!("{}/tika", tika_url))
.header("Accept", "text/html")
.body(file_data.clone())
.send()
Expand Down Expand Up @@ -434,7 +434,7 @@ pub async fn readd_error_to_queue(
redis::cmd("lpush")
.arg("dead_letters_file")
.arg(old_payload_message)
.query_async(&mut *redis_conn)
.query_async::<redis::aio::MultiplexedConnection, ()>(&mut *redis_conn)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

Expand Down Expand Up @@ -462,7 +462,7 @@ pub async fn readd_error_to_queue(
redis::cmd("lpush")
.arg("file_ingestion")
.arg(&new_payload_message)
.query_async(&mut *redis_conn)
.query_async::<redis::aio::MultiplexedConnection, ()>(&mut *redis_conn)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

Expand Down
4 changes: 2 additions & 2 deletions server/src/bin/grupdate-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ pub async fn readd_group_error_to_queue(
redis::cmd("lpush")
.arg("dead_letters_group")
.arg(old_payload_message)
.query_async(&mut *redis_conn)
.query_async::<redis::aio::MultiplexedConnection, ()>(&mut *redis_conn)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

Expand Down Expand Up @@ -369,7 +369,7 @@ pub async fn readd_group_error_to_queue(
redis::cmd("lpush")
.arg("group_update_queue")
.arg(&new_payload_message)
.query_async(&mut *redis_conn)
.query_async::<redis::aio::MultiplexedConnection, ()>(&mut *redis_conn)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

Expand Down
2 changes: 1 addition & 1 deletion server/src/bin/ingestion-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1269,7 +1269,7 @@ pub async fn readd_error_to_queue(
redis::cmd("lpush")
.arg("dead_letters")
.arg(old_payload_message)
.query_async(&mut *redis_conn)
.query_async::<redis::aio::MultiplexedConnection, ()>(&mut *redis_conn)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

Expand Down
2 changes: 1 addition & 1 deletion server/src/bin/queue-bm25-migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async fn main() -> Result<(), ServiceError> {
redis::cmd("lpush")
.arg("collection_migration")
.arg(&message)
.query_async(&mut *conn)
.query_async::<redis::aio::MultiplexedConnection, ()>(&mut *conn)
.await
.map_err(|_| {
ServiceError::BadRequest("Failed to send message to redis".to_string())
Expand Down
4 changes: 2 additions & 2 deletions server/src/bin/word-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ pub async fn readd_error_to_queue(
redis::cmd("lpush")
.arg("dictionary_dead_letters")
.arg(old_payload_message)
.query_async(&mut *redis_conn)
.query_async::<redis::aio::MultiplexedConnection, ()>(&mut *redis_conn)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;
return Err(ServiceError::InternalServerError(format!(
Expand All @@ -409,7 +409,7 @@ pub async fn readd_error_to_queue(
redis::cmd("lpush")
.arg("create_dictionary")
.arg(&new_payload_message)
.query_async(&mut *redis_conn)
.query_async::<redis::aio::MultiplexedConnection, ()>(&mut *redis_conn)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/auth_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ pub async fn callback(
})?;

redis_conn
.set(slim_user.id.to_string(), slim_user_string)
.set::<_, _, ()>(slim_user.id.to_string(), slim_user_string)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

Expand Down
4 changes: 2 additions & 2 deletions server/src/handlers/chunk_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ pub async fn update_chunk(
redis::cmd("lpush")
.arg("ingestion")
.arg(serde_json::to_string(&message)?)
.query_async(&mut *redis_conn)
.query_async::<_, ()>(&mut *redis_conn)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

Expand Down Expand Up @@ -869,7 +869,7 @@ pub async fn update_chunk_by_tracking_id(
redis::cmd("lpush")
.arg("ingestion")
.arg(serde_json::to_string(&message)?)
.query_async(&mut *redis_conn)
.query_async::<_, ()>(&mut *redis_conn)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/file_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ pub async fn upload_file_handler(
redis::cmd("lpush")
.arg("file_ingestion")
.arg(&serialized_message)
.query_async(&mut *redis_conn)
.query_async::<_, ()>(&mut *redis_conn)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;
push_to_redis_span.finish();
Expand Down
2 changes: 1 addition & 1 deletion server/src/middleware/auth_middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ async fn get_user(

let slim_user_string = serde_json::to_string(&slim_user).ok()?;
redis_conn
.set(slim_user.id.to_string(), slim_user_string)
.set::<_, _, ()>(slim_user.id.to_string(), slim_user_string)
.await
.ok()?;

Expand Down
4 changes: 2 additions & 2 deletions server/src/operators/dataset_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ pub async fn soft_delete_dataset_by_id_query(
redis::cmd("lpush")
.arg("delete_dataset_queue")
.arg(&serialized_message)
.query_async(&mut *redis_conn)
.query_async::<_, ()>(&mut *redis_conn)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

Expand Down Expand Up @@ -297,7 +297,7 @@ pub async fn clear_dataset_by_dataset_id_query(
redis::cmd("lpush")
.arg("delete_dataset_queue")
.arg(&serialized_message)
.query_async(&mut *redis_conn)
.query_async::<_, ()>(&mut *redis_conn)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

Expand Down
2 changes: 1 addition & 1 deletion server/src/operators/file_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ pub async fn create_file_chunks(
redis::cmd("lpush")
.arg("ingestion")
.arg(&serialized_message)
.query_async(&mut redis_conn)
.query_async::<redis::aio::MultiplexedConnection, ()>(&mut redis_conn)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/operators/group_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ pub async fn soft_update_grouped_chunks_query(
redis::cmd("lpush")
.arg("group_update_queue")
.arg(&serialized_message)
.query_async(&mut *redis_conn)
.query_async::<_, ()>(&mut *redis_conn)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

Expand Down
4 changes: 2 additions & 2 deletions server/src/operators/model_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ pub async fn get_dense_vectors(

let vectors_resp = async move {
let embeddings_resp = cur_client
.post(&format!("{}/embeddings?api-version=2023-05-15", url))
.post(format!("{}/embeddings?api-version=2023-05-15", url))
.header("Authorization", &format!("Bearer {}", &embedding_api_key.clone()))
.header("api-key", &embedding_api_key.clone())
.header("Content-Type", "application/json")
Expand Down Expand Up @@ -511,7 +511,7 @@ pub async fn get_dense_vectors(

let vectors_resp = async move {
let embeddings_resp = cur_client
.post(&format!("{}/embeddings?api-version=2023-05-15", url))
.post(format!("{}/embeddings?api-version=2023-05-15", url))
.header("Authorization", &format!("Bearer {}", &embedding_api_key.clone()))
.header("api-key", &embedding_api_key.clone())
.header("Content-Type", "application/json")
Expand Down
6 changes: 3 additions & 3 deletions server/src/operators/organization_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub async fn update_organization_query(
ServiceError::InternalServerError("Failed to get redis connection".to_string())
})?;

redis_conn.del(users).await.map_err(|_| {
redis_conn.del::<_, ()>(users).await.map_err(|_| {
ServiceError::InternalServerError("Failed to delete user from redis".to_string())
})?;

Expand Down Expand Up @@ -145,7 +145,7 @@ pub async fn delete_organization_query(
ServiceError::InternalServerError("Failed to get redis connection".to_string())
})?;

redis_conn.del(users).await.map_err(|_| {
redis_conn.del::<_, ()>(users).await.map_err(|_| {
ServiceError::InternalServerError("Failed to delete user from redis".to_string())
})?;

Expand Down Expand Up @@ -225,7 +225,7 @@ pub async fn delete_organization_query(
})?;

redis_conn
.sadd("deleted_organizations", org_id.to_string())
.sadd::<_, _, ()>("deleted_organizations", org_id.to_string())
.await
.map_err(|_| {
ServiceError::InternalServerError(
Expand Down
2 changes: 1 addition & 1 deletion server/src/operators/typo_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ impl BkTree {
redis::cmd("SET")
.arg(format!("bk_tree_{}", dataset_id))
.arg(serialized_bk_tree)
.query_async(&mut *redis_conn)
.query_async::<redis::aio::MultiplexedConnection, ()>(&mut *redis_conn)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

Expand Down
20 changes: 13 additions & 7 deletions server/src/operators/user_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,12 @@ pub async fn update_user_org_role_query(
ServiceError::InternalServerError("Failed to get redis connection".to_string())
})?;

redis_conn.del(user_id.to_string()).await.map_err(|_| {
ServiceError::InternalServerError("Failed to delete user from redis".to_string())
})?;
redis_conn
.del::<_, ()>(user_id.to_string())
.await
.map_err(|_| {
ServiceError::InternalServerError("Failed to delete user from redis".to_string())
})?;

Ok(())
}
Expand Down Expand Up @@ -486,7 +489,7 @@ pub async fn add_user_to_organization(
})?;

redis_conn
.del(user_id_refresh.to_string())
.del::<_, ()>(user_id_refresh.to_string())
.await
.map_err(|_| {
ServiceError::InternalServerError("Failed to delete user from redis".to_string())
Expand Down Expand Up @@ -611,9 +614,12 @@ pub async fn remove_user_from_org_query(
ServiceError::InternalServerError("Failed to get redis connection".to_string())
})?;

redis_conn.del(user_id.to_string()).await.map_err(|_| {
ServiceError::InternalServerError("Failed to delete user from redis".to_string())
})?;
redis_conn
.del::<_, ()>(user_id.to_string())
.await
.map_err(|_| {
ServiceError::InternalServerError("Failed to delete user from redis".to_string())
})?;

Ok(())
}

0 comments on commit 393ae80

Please sign in to comment.