diff --git a/Cargo.lock b/Cargo.lock index 192bcb172..574780194 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2766,6 +2766,7 @@ dependencies = [ "app-error", "appflowy-ai-client", "bincode", + "bytes", "chrono", "collab-entity", "serde", diff --git a/libs/database-entity/Cargo.toml b/libs/database-entity/Cargo.toml index 254d568c4..5206caf95 100644 --- a/libs/database-entity/Cargo.toml +++ b/libs/database-entity/Cargo.toml @@ -21,3 +21,4 @@ serde_repr = "0.1.18" app-error = { workspace = true } bincode = "1.3.3" appflowy-ai-client = { workspace = true, features = ["dto"] } +bytes.workspace = true diff --git a/libs/database-entity/src/dto.rs b/libs/database-entity/src/dto.rs index a74b324f5..1e6fdaf84 100644 --- a/libs/database-entity/src/dto.rs +++ b/libs/database-entity/src/dto.rs @@ -1,5 +1,6 @@ use crate::util::{validate_not_empty_payload, validate_not_empty_str}; use appflowy_ai_client::dto::AIModel; +use bytes::Bytes; use chrono::{DateTime, Utc}; use collab_entity::CollabType; use serde::{Deserialize, Serialize}; @@ -32,7 +33,7 @@ impl From<(String, CollabParams)> for CreateCollabParams { Self { workspace_id, object_id: collab_params.object_id, - encoded_collab_v1: collab_params.encoded_collab_v1, + encoded_collab_v1: collab_params.encoded_collab_v1.to_vec(), collab_type: collab_params.collab_type, } } @@ -43,7 +44,7 @@ impl CreateCollabParams { ( CollabParams { object_id: self.object_id, - encoded_collab_v1: self.encoded_collab_v1, + encoded_collab_v1: Bytes::from(self.encoded_collab_v1), collab_type: self.collab_type, embeddings: None, }, @@ -66,7 +67,7 @@ pub struct CollabParams { #[validate(custom = "validate_not_empty_str")] pub object_id: String, #[validate(custom = "validate_not_empty_payload")] - pub encoded_collab_v1: Vec, + pub encoded_collab_v1: Bytes, pub collab_type: CollabType, #[serde(default)] pub embeddings: Option, @@ -82,7 +83,7 @@ impl CollabParams { Self { object_id, collab_type, - encoded_collab_v1, + encoded_collab_v1: Bytes::from(encoded_collab_v1), embeddings: None, } } @@ -99,7 +100,7 @@ impl CollabParams { let old: CollabParamsV0 = bincode::deserialize(bytes)?; Ok(Self { object_id: old.object_id, - encoded_collab_v1: old.encoded_collab_v1, + encoded_collab_v1: old.encoded_collab_v1.into(), collab_type: old.collab_type, embeddings: None, }) diff --git a/libs/database/src/collab/collab_db_ops.rs b/libs/database/src/collab/collab_db_ops.rs index 984391517..7bc21627c 100644 --- a/libs/database/src/collab/collab_db_ops.rs +++ b/libs/database/src/collab/collab_db_ops.rs @@ -79,7 +79,7 @@ pub async fn insert_into_af_collab( SET blob = $3, len = $4, encrypt = $5, owner_uid = $6 WHERE oid = $1 AND partition_key = $2;", params.object_id, partition_key, - params.encoded_collab_v1, + params.encoded_collab_v1.as_ref(), params.encoded_collab_v1.len() as i32, encrypt, uid, @@ -141,7 +141,7 @@ pub async fn insert_into_af_collab( "INSERT INTO af_collab (oid, blob, len, partition_key, encrypt, owner_uid, workspace_id)\ VALUES ($1, $2, $3, $4, $5, $6, $7)", params.object_id, - params.encoded_collab_v1, + params.encoded_collab_v1.as_ref(), params.encoded_collab_v1.len() as i32, partition_key, encrypt, diff --git a/script/client_api_deps_check.sh b/script/client_api_deps_check.sh index 36ea29266..6a9815924 100755 --- a/script/client_api_deps_check.sh +++ b/script/client_api_deps_check.sh @@ -3,7 +3,7 @@ # Generate the current dependency list cargo tree > current_deps.txt -BASELINE_COUNT=610 +BASELINE_COUNT=611 CURRENT_COUNT=$(cat current_deps.txt | wc -l) echo "Expected dependency count (baseline): $BASELINE_COUNT" diff --git a/services/appflowy-collaborate/src/collab/queue.rs b/services/appflowy-collaborate/src/collab/queue.rs index bcabca876..ae9ae0c3b 100644 --- a/services/appflowy-collaborate/src/collab/queue.rs +++ b/services/appflowy-collaborate/src/collab/queue.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, Context}; +use bytes::Bytes; use collab::lock::Mutex; use collab_entity::CollabType; use serde::{Deserialize, Serialize}; @@ -423,7 +424,7 @@ async fn write_pending_to_disk( workspace_id: meta.workspace_id.clone(), object_id: meta.object_id.clone(), collab_type: meta.collab_type.clone(), - encode_collab_v1, + encode_collab_v1: encode_collab_v1.into(), embeddings: meta.embeddings.clone(), }) } else { @@ -550,7 +551,7 @@ pub struct PendingWriteData { pub workspace_id: String, pub object_id: String, pub collab_type: CollabType, - pub encode_collab_v1: Vec, + pub encode_collab_v1: Bytes, pub embeddings: Option, } diff --git a/services/appflowy-collaborate/src/group/persistence.rs b/services/appflowy-collaborate/src/group/persistence.rs index 2968330b5..84c5360fa 100644 --- a/services/appflowy-collaborate/src/group/persistence.rs +++ b/services/appflowy-collaborate/src/group/persistence.rs @@ -209,7 +209,7 @@ fn get_encode_collab( // Construct and return collaboration parameters. let params = CollabParams { object_id: object_id.to_string(), - encoded_collab_v1: encoded_collab, + encoded_collab_v1: encoded_collab.into(), collab_type: collab_type.clone(), embeddings: None, }; diff --git a/services/appflowy-collaborate/src/indexer/provider.rs b/services/appflowy-collaborate/src/indexer/provider.rs index 78616f3be..036c0ade1 100644 --- a/services/appflowy-collaborate/src/indexer/provider.rs +++ b/services/appflowy-collaborate/src/indexer/provider.rs @@ -173,8 +173,16 @@ impl IndexerProvider { &self, params: &CollabParams, ) -> Result, AppError> { - if let Some(indexer) = self.indexer_for(params.collab_type.clone()) { - let encoded_collab = EncodedCollab::decode_from_bytes(¶ms.encoded_collab_v1)?; + let collab_type = params.collab_type.clone(); + let data = params.encoded_collab_v1.clone(); + + if let Some(indexer) = self.indexer_for(collab_type) { + let encoded_collab = tokio::task::spawn_blocking(move || { + let encode_collab = EncodedCollab::decode_from_bytes(&data)?; + Ok::<_, AppError>(encode_collab) + }) + .await??; + let embeddings = indexer.index(¶ms.object_id, encoded_collab).await?; Ok(embeddings) } else { diff --git a/src/biz/user/user_init.rs b/src/biz/user/user_init.rs index 879f389ee..f319c3a51 100644 --- a/src/biz/user/user_init.rs +++ b/src/biz/user/user_init.rs @@ -59,7 +59,7 @@ where &uid, CollabParams { object_id: object_id.clone(), - encoded_collab_v1, + encoded_collab_v1: encoded_collab_v1.into(), collab_type: object_type.clone(), embeddings: None, }, @@ -137,7 +137,7 @@ async fn create_user_awareness( uid, CollabParams { object_id: object_id.to_string(), - encoded_collab_v1, + encoded_collab_v1: encoded_collab_v1.into(), collab_type, embeddings: None, }, @@ -179,7 +179,7 @@ async fn create_workspace_database_collab( uid, CollabParams { object_id: object_id.to_string(), - encoded_collab_v1, + encoded_collab_v1: encoded_collab_v1.into(), collab_type, embeddings: None, }, diff --git a/src/biz/workspace/publish_dup.rs b/src/biz/workspace/publish_dup.rs index a3b8bec5a..a84350b5d 100644 --- a/src/biz/workspace/publish_dup.rs +++ b/src/biz/workspace/publish_dup.rs @@ -151,7 +151,7 @@ impl PublishCollabDuplicator { &duplicator_uid, CollabParams { object_id: oid.clone(), - encoded_collab_v1: encoded_collab, + encoded_collab_v1: encoded_collab.into(), collab_type, embeddings: None, }, @@ -198,7 +198,7 @@ impl PublishCollabDuplicator { &duplicator_uid, CollabParams { object_id: ws_db_oid.clone(), - encoded_collab_v1: updated_ws_w_db_collab?, + encoded_collab_v1: updated_ws_w_db_collab?.into(), collab_type: CollabType::WorkspaceDatabase, embeddings: None, }, @@ -285,7 +285,7 @@ impl PublishCollabDuplicator { &duplicator_uid, CollabParams { object_id: dest_workspace_id.clone(), - encoded_collab_v1: updated_encoded_collab?, + encoded_collab_v1: updated_encoded_collab?.into(), collab_type: CollabType::Folder, embeddings: None, }, diff --git a/tests/collab/collab_curd_test.rs b/tests/collab/collab_curd_test.rs index 502dff19d..9abdcee1f 100644 --- a/tests/collab/collab_curd_test.rs +++ b/tests/collab/collab_curd_test.rs @@ -85,7 +85,7 @@ async fn batch_insert_collab_success_test() { let params_list = (0..5) .map(|i| CollabParams { object_id: Uuid::new_v4().to_string(), - encoded_collab_v1: mock_encoded_collab_v1[i].encode_to_bytes().unwrap(), + encoded_collab_v1: mock_encoded_collab_v1[i].encode_to_bytes().unwrap().into(), collab_type: CollabType::Unknown, embeddings: None, }) @@ -181,7 +181,7 @@ async fn create_collab_compatibility_with_json_params_test() { let params = OldCreateCollabParams { inner: CollabParams { object_id: object_id.clone(), - encoded_collab_v1: encoded_collab.encode_to_bytes().unwrap(), + encoded_collab_v1: encoded_collab.encode_to_bytes().unwrap().into(), collab_type: CollabType::Unknown, embeddings: None, }, @@ -238,7 +238,7 @@ async fn batch_create_collab_compatibility_with_uncompress_params_test() { workspace_id: workspace_id.to_string(), params_list: vec![CollabParams { object_id: object_id.clone(), - encoded_collab_v1: encoded_collab.encode_to_bytes().unwrap(), + encoded_collab_v1: encoded_collab.encode_to_bytes().unwrap().into(), collab_type: CollabType::Unknown, embeddings: None, }], diff --git a/tests/collab/storage_test.rs b/tests/collab/storage_test.rs index 6c44eef3e..9c3d5583a 100644 --- a/tests/collab/storage_test.rs +++ b/tests/collab/storage_test.rs @@ -466,7 +466,7 @@ async fn simulate_small_data_set_write(pool: PgPool) { let params = CollabParams { object_id: format!("object_id_{}", i), collab_type: CollabType::Unknown, - encoded_collab_v1: encode_collab.encode_to_bytes().unwrap(), + encoded_collab_v1: encode_collab.encode_to_bytes().unwrap().into(), embeddings: None, }; cloned_storage_queue @@ -542,7 +542,7 @@ async fn simulate_large_data_set_write(pool: PgPool) { let params = CollabParams { object_id: uuid::Uuid::new_v4().to_string(), collab_type: CollabType::Unknown, - encoded_collab_v1: origin_encode_collab.encode_to_bytes().unwrap(), + encoded_collab_v1: origin_encode_collab.encode_to_bytes().unwrap().into(), embeddings: None, }; storage_queue diff --git a/tests/sql_test/workspace_test.rs b/tests/sql_test/workspace_test.rs index 544e74ca7..67b68c672 100644 --- a/tests/sql_test/workspace_test.rs +++ b/tests/sql_test/workspace_test.rs @@ -28,7 +28,7 @@ async fn insert_collab_sql_test(pool: PgPool) { let params = CollabParams { object_id, collab_type: CollabType::Unknown, - encoded_collab_v1, + encoded_collab_v1: encoded_collab_v1.into(), embeddings: None, }; insert_into_af_collab(&mut txn, &user.uid, &user.workspace_id, ¶ms)