Skip to content

Commit

Permalink
fix: using spawn_blocking in order to not block the runtime (#793)
Browse files Browse the repository at this point in the history
* fix: using blocking in order to not block the runtime

* chore: increase deps count
  • Loading branch information
appflowy authored Sep 5, 2024
1 parent 886376e commit 5d35671
Show file tree
Hide file tree
Showing 13 changed files with 37 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libs/database-entity/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ serde_repr = "0.1.18"
app-error = { workspace = true }
bincode = "1.3.3"
appflowy-ai-client = { workspace = true, features = ["dto"] }
bytes.workspace = true
11 changes: 6 additions & 5 deletions libs/database-entity/src/dto.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::util::{validate_not_empty_payload, validate_not_empty_str};
use appflowy_ai_client::dto::AIModel;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use collab_entity::CollabType;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -32,7 +33,7 @@ impl From<(String, CollabParams)> for CreateCollabParams {
Self {
workspace_id,
object_id: collab_params.object_id,
encoded_collab_v1: collab_params.encoded_collab_v1,
encoded_collab_v1: collab_params.encoded_collab_v1.to_vec(),
collab_type: collab_params.collab_type,
}
}
Expand All @@ -43,7 +44,7 @@ impl CreateCollabParams {
(
CollabParams {
object_id: self.object_id,
encoded_collab_v1: self.encoded_collab_v1,
encoded_collab_v1: Bytes::from(self.encoded_collab_v1),
collab_type: self.collab_type,
embeddings: None,
},
Expand All @@ -66,7 +67,7 @@ pub struct CollabParams {
#[validate(custom = "validate_not_empty_str")]
pub object_id: String,
#[validate(custom = "validate_not_empty_payload")]
pub encoded_collab_v1: Vec<u8>,
pub encoded_collab_v1: Bytes,
pub collab_type: CollabType,
#[serde(default)]
pub embeddings: Option<AFCollabEmbeddings>,
Expand All @@ -82,7 +83,7 @@ impl CollabParams {
Self {
object_id,
collab_type,
encoded_collab_v1,
encoded_collab_v1: Bytes::from(encoded_collab_v1),
embeddings: None,
}
}
Expand All @@ -99,7 +100,7 @@ impl CollabParams {
let old: CollabParamsV0 = bincode::deserialize(bytes)?;
Ok(Self {
object_id: old.object_id,
encoded_collab_v1: old.encoded_collab_v1,
encoded_collab_v1: old.encoded_collab_v1.into(),
collab_type: old.collab_type,
embeddings: None,
})
Expand Down
4 changes: 2 additions & 2 deletions libs/database/src/collab/collab_db_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub async fn insert_into_af_collab(
SET blob = $3, len = $4, encrypt = $5, owner_uid = $6 WHERE oid = $1 AND partition_key = $2;",
params.object_id,
partition_key,
params.encoded_collab_v1,
params.encoded_collab_v1.as_ref(),
params.encoded_collab_v1.len() as i32,
encrypt,
uid,
Expand Down Expand Up @@ -141,7 +141,7 @@ pub async fn insert_into_af_collab(
"INSERT INTO af_collab (oid, blob, len, partition_key, encrypt, owner_uid, workspace_id)\
VALUES ($1, $2, $3, $4, $5, $6, $7)",
params.object_id,
params.encoded_collab_v1,
params.encoded_collab_v1.as_ref(),
params.encoded_collab_v1.len() as i32,
partition_key,
encrypt,
Expand Down
2 changes: 1 addition & 1 deletion script/client_api_deps_check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Generate the current dependency list
cargo tree > current_deps.txt

BASELINE_COUNT=610
BASELINE_COUNT=611
CURRENT_COUNT=$(cat current_deps.txt | wc -l)

echo "Expected dependency count (baseline): $BASELINE_COUNT"
Expand Down
5 changes: 3 additions & 2 deletions services/appflowy-collaborate/src/collab/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, Context};
use bytes::Bytes;
use collab::lock::Mutex;
use collab_entity::CollabType;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -423,7 +424,7 @@ async fn write_pending_to_disk(
workspace_id: meta.workspace_id.clone(),
object_id: meta.object_id.clone(),
collab_type: meta.collab_type.clone(),
encode_collab_v1,
encode_collab_v1: encode_collab_v1.into(),
embeddings: meta.embeddings.clone(),
})
} else {
Expand Down Expand Up @@ -550,7 +551,7 @@ pub struct PendingWriteData {
pub workspace_id: String,
pub object_id: String,
pub collab_type: CollabType,
pub encode_collab_v1: Vec<u8>,
pub encode_collab_v1: Bytes,
pub embeddings: Option<AFCollabEmbeddings>,
}

Expand Down
2 changes: 1 addition & 1 deletion services/appflowy-collaborate/src/group/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ fn get_encode_collab(
// Construct and return collaboration parameters.
let params = CollabParams {
object_id: object_id.to_string(),
encoded_collab_v1: encoded_collab,
encoded_collab_v1: encoded_collab.into(),
collab_type: collab_type.clone(),
embeddings: None,
};
Expand Down
12 changes: 10 additions & 2 deletions services/appflowy-collaborate/src/indexer/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,16 @@ impl IndexerProvider {
&self,
params: &CollabParams,
) -> Result<Option<AFCollabEmbeddings>, AppError> {
if let Some(indexer) = self.indexer_for(params.collab_type.clone()) {
let encoded_collab = EncodedCollab::decode_from_bytes(&params.encoded_collab_v1)?;
let collab_type = params.collab_type.clone();
let data = params.encoded_collab_v1.clone();

if let Some(indexer) = self.indexer_for(collab_type) {
let encoded_collab = tokio::task::spawn_blocking(move || {
let encode_collab = EncodedCollab::decode_from_bytes(&data)?;
Ok::<_, AppError>(encode_collab)
})
.await??;

let embeddings = indexer.index(&params.object_id, encoded_collab).await?;
Ok(embeddings)
} else {
Expand Down
6 changes: 3 additions & 3 deletions src/biz/user/user_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ where
&uid,
CollabParams {
object_id: object_id.clone(),
encoded_collab_v1,
encoded_collab_v1: encoded_collab_v1.into(),
collab_type: object_type.clone(),
embeddings: None,
},
Expand Down Expand Up @@ -137,7 +137,7 @@ async fn create_user_awareness(
uid,
CollabParams {
object_id: object_id.to_string(),
encoded_collab_v1,
encoded_collab_v1: encoded_collab_v1.into(),
collab_type,
embeddings: None,
},
Expand Down Expand Up @@ -179,7 +179,7 @@ async fn create_workspace_database_collab(
uid,
CollabParams {
object_id: object_id.to_string(),
encoded_collab_v1,
encoded_collab_v1: encoded_collab_v1.into(),
collab_type,
embeddings: None,
},
Expand Down
6 changes: 3 additions & 3 deletions src/biz/workspace/publish_dup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl PublishCollabDuplicator {
&duplicator_uid,
CollabParams {
object_id: oid.clone(),
encoded_collab_v1: encoded_collab,
encoded_collab_v1: encoded_collab.into(),
collab_type,
embeddings: None,
},
Expand Down Expand Up @@ -198,7 +198,7 @@ impl PublishCollabDuplicator {
&duplicator_uid,
CollabParams {
object_id: ws_db_oid.clone(),
encoded_collab_v1: updated_ws_w_db_collab?,
encoded_collab_v1: updated_ws_w_db_collab?.into(),
collab_type: CollabType::WorkspaceDatabase,
embeddings: None,
},
Expand Down Expand Up @@ -285,7 +285,7 @@ impl PublishCollabDuplicator {
&duplicator_uid,
CollabParams {
object_id: dest_workspace_id.clone(),
encoded_collab_v1: updated_encoded_collab?,
encoded_collab_v1: updated_encoded_collab?.into(),
collab_type: CollabType::Folder,
embeddings: None,
},
Expand Down
6 changes: 3 additions & 3 deletions tests/collab/collab_curd_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async fn batch_insert_collab_success_test() {
let params_list = (0..5)
.map(|i| CollabParams {
object_id: Uuid::new_v4().to_string(),
encoded_collab_v1: mock_encoded_collab_v1[i].encode_to_bytes().unwrap(),
encoded_collab_v1: mock_encoded_collab_v1[i].encode_to_bytes().unwrap().into(),
collab_type: CollabType::Unknown,
embeddings: None,
})
Expand Down Expand Up @@ -181,7 +181,7 @@ async fn create_collab_compatibility_with_json_params_test() {
let params = OldCreateCollabParams {
inner: CollabParams {
object_id: object_id.clone(),
encoded_collab_v1: encoded_collab.encode_to_bytes().unwrap(),
encoded_collab_v1: encoded_collab.encode_to_bytes().unwrap().into(),
collab_type: CollabType::Unknown,
embeddings: None,
},
Expand Down Expand Up @@ -238,7 +238,7 @@ async fn batch_create_collab_compatibility_with_uncompress_params_test() {
workspace_id: workspace_id.to_string(),
params_list: vec![CollabParams {
object_id: object_id.clone(),
encoded_collab_v1: encoded_collab.encode_to_bytes().unwrap(),
encoded_collab_v1: encoded_collab.encode_to_bytes().unwrap().into(),
collab_type: CollabType::Unknown,
embeddings: None,
}],
Expand Down
4 changes: 2 additions & 2 deletions tests/collab/storage_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ async fn simulate_small_data_set_write(pool: PgPool) {
let params = CollabParams {
object_id: format!("object_id_{}", i),
collab_type: CollabType::Unknown,
encoded_collab_v1: encode_collab.encode_to_bytes().unwrap(),
encoded_collab_v1: encode_collab.encode_to_bytes().unwrap().into(),
embeddings: None,
};
cloned_storage_queue
Expand Down Expand Up @@ -542,7 +542,7 @@ async fn simulate_large_data_set_write(pool: PgPool) {
let params = CollabParams {
object_id: uuid::Uuid::new_v4().to_string(),
collab_type: CollabType::Unknown,
encoded_collab_v1: origin_encode_collab.encode_to_bytes().unwrap(),
encoded_collab_v1: origin_encode_collab.encode_to_bytes().unwrap().into(),
embeddings: None,
};
storage_queue
Expand Down
2 changes: 1 addition & 1 deletion tests/sql_test/workspace_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn insert_collab_sql_test(pool: PgPool) {
let params = CollabParams {
object_id,
collab_type: CollabType::Unknown,
encoded_collab_v1,
encoded_collab_v1: encoded_collab_v1.into(),
embeddings: None,
};
insert_into_af_collab(&mut txn, &user.uid, &user.workspace_id, &params)
Expand Down

0 comments on commit 5d35671

Please sign in to comment.