Skip to content

Commit

Permalink
chore: query multiple collab embedding state (#1081)
Browse files Browse the repository at this point in the history
* chore: query multiple collab embedding state

* chore: clippy
  • Loading branch information
appflowy authored Dec 17, 2024
1 parent 6991d61 commit abf827f
Show file tree
Hide file tree
Showing 20 changed files with 502 additions and 208 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ jobs:
docker ps -a
docker compose -f docker-compose-ci.yml logs
- name: Docker Logs
- name: AI Logs
if: always()
run: |
docker logs appflowy-cloud-ai-1
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

42 changes: 36 additions & 6 deletions libs/client-api-test/src/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ use client_api::entity::{
};
use client_api::ws::{WSClient, WSClientConfig};
use database_entity::dto::{
AFRole, AFSnapshotMeta, AFSnapshotMetas, AFUserProfile, AFUserWorkspaceInfo, AFWorkspace,
AFWorkspaceInvitationStatus, AFWorkspaceMember, BatchQueryCollabResult, CollabParams,
CreateCollabParams, QueryCollab, QueryCollabParams, QuerySnapshotParams, SnapshotData,
AFCollabEmbedInfo, AFRole, AFSnapshotMeta, AFSnapshotMetas, AFUserProfile, AFUserWorkspaceInfo,
AFWorkspace, AFWorkspaceInvitationStatus, AFWorkspaceMember, BatchQueryCollabResult,
CollabParams, CreateCollabParams, QueryCollab, QueryCollabParams, QuerySnapshotParams,
SnapshotData,
};
use shared_entity::dto::ai_dto::CalculateSimilarityParams;
use shared_entity::dto::search_dto::SearchDocumentResponseItem;
use shared_entity::dto::workspace_dto::{
BlobMetadata, CollabResponse, PublishedDuplicate, WorkspaceMemberChangeset,
BlobMetadata, CollabResponse, EmbeddedCollabQuery, PublishedDuplicate, WorkspaceMemberChangeset,
WorkspaceMemberInvitation, WorkspaceSpaceUsage,
};
use shared_entity::response::AppResponseError;
Expand Down Expand Up @@ -555,6 +556,34 @@ impl TestClient {
self.api_client.get_profile().await.unwrap()
}

pub async fn wait_until_all_embedding(
&self,
workspace_id: &str,
query: Vec<EmbeddedCollabQuery>,
) -> Vec<AFCollabEmbedInfo> {
let timeout_duration = Duration::from_secs(30);
let poll_interval = Duration::from_millis(2000);
let poll_fut = async {
loop {
match self
.api_client
.batch_get_collab_embed_info(workspace_id, query.clone())
.await
{
Ok(items) if items.len() == query.len() => return Ok::<_, Error>(items),
_ => tokio::time::sleep(poll_interval).await,
}
}
};

// Enforce timeout
match timeout(timeout_duration, poll_fut).await {
Ok(Ok(items)) => items,
Ok(Err(e)) => panic!("Test failed: {}", e),
Err(_) => panic!("Test failed: Timeout after 30 seconds."),
}
}

pub async fn wait_until_get_embedding(&self, workspace_id: &str, object_id: &str) {
let result = timeout(Duration::from_secs(30), async {
while self
Expand Down Expand Up @@ -620,10 +649,11 @@ impl TestClient {
let resp = self.api_client.calculate_similarity(params).await.unwrap();
assert!(
resp.score > score,
"Similarity score is too low: {}.\nexpected: {},\ninput: {}",
"Similarity score is too low: {}.\nexpected: {},\ninput: {},\nexpected:{}",
resp.score,
score,
input
input,
expected
);
}

Expand Down
26 changes: 24 additions & 2 deletions libs/client-api/src/http_collab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use client_api_entity::workspace_dto::{
use client_api_entity::{
AFCollabEmbedInfo, BatchQueryCollabParams, BatchQueryCollabResult, CollabParams,
CreateCollabParams, DeleteCollabParams, PublishCollabItem, QueryCollab, QueryCollabParams,
UpdateCollabWebParams,
RepeatedAFCollabEmbedInfo, UpdateCollabWebParams,
};
use collab_rt_entity::collab_proto::{CollabDocStateParams, PayloadCompressionType};
use collab_rt_entity::HttpRealtimeMessage;
Expand All @@ -22,7 +22,7 @@ use prost::Message;
use rayon::prelude::*;
use reqwest::{Body, Method};
use serde::Serialize;
use shared_entity::dto::workspace_dto::{CollabResponse, CollabTypeParam};
use shared_entity::dto::workspace_dto::{CollabResponse, CollabTypeParam, EmbeddedCollabQuery};
use shared_entity::response::{AppResponse, AppResponseError};
use std::future::Future;
use std::io::Cursor;
Expand Down Expand Up @@ -432,6 +432,28 @@ impl Client {
.into_data()
}

pub async fn batch_get_collab_embed_info(
&self,
workspace_id: &str,
params: Vec<EmbeddedCollabQuery>,
) -> Result<Vec<AFCollabEmbedInfo>, AppResponseError> {
let url = format!(
"{}/api/workspace/{workspace_id}/collab/embed-info/list",
self.base_url
);
let resp = self
.http_client_with_auth(Method::POST, &url)
.await?
.json(&params)
.send()
.await?;
log_request_id(&resp);
let data = AppResponse::<RepeatedAFCollabEmbedInfo>::from_response(resp)
.await?
.into_data()?;
Ok(data.0)
}

pub async fn collab_full_sync(
&self,
workspace_id: &str,
Expand Down
33 changes: 31 additions & 2 deletions libs/collab-rt-entity/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,35 @@ impl RealtimeMessage {
}
}

fn object_id(&self) -> Option<String> {
match self {
RealtimeMessage::Collab(msg) => Some(msg.object_id().to_string()),
RealtimeMessage::ClientCollabV1(msgs) => msgs.first().map(|msg| msg.object_id().to_string()),
RealtimeMessage::ClientCollabV2(msgs) => {
if let Some((object_id, _)) = msgs.iter().next() {
Some(object_id.to_string())
} else {
None
}
},
_ => None,
}
}

#[cfg(feature = "rt_compress")]
pub fn encode(&self) -> Result<Vec<u8>, Error> {
let data = DefaultOptions::new()
.with_fixint_encoding()
.allow_trailing_bytes()
.with_limit(MAXIMUM_REALTIME_MESSAGE_SIZE)
.serialize(self)?;
.serialize(self)
.map_err(|e| {
anyhow!(
"Failed to encode realtime message: {}, object_id:{:?}",
e,
self.object_id()
)
})?;

let mut compressor = CompressorReader::new(&*data, 4096, 4, 22);
let mut compressed_data = Vec::new();
Expand All @@ -117,7 +139,14 @@ impl RealtimeMessage {
.with_fixint_encoding()
.allow_trailing_bytes()
.with_limit(MAXIMUM_REALTIME_MESSAGE_SIZE)
.serialize(self)?;
.serialize(self)
.map_err(|e| {
anyhow!(
"Failed to encode realtime message: {}, object_id:{:?}",
e,
self.object_id()
)
})?;
Ok(data)
}

Expand Down
7 changes: 6 additions & 1 deletion libs/database-entity/src/dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,10 +417,15 @@ pub struct AFCollabMember {
#[derive(Debug, Serialize, Deserialize)]
pub struct AFCollabEmbedInfo {
pub object_id: String,
/// The timestamp when the object embeddings updated
/// The timestamp when the object's embeddings updated
pub indexed_at: DateTime<Utc>,
/// The timestamp when the object's data updated
pub updated_at: DateTime<Utc>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct RepeatedAFCollabEmbedInfo(pub Vec<AFCollabEmbedInfo>);

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PublishInfo {
pub namespace: String,
Expand Down
Loading

0 comments on commit abf827f

Please sign in to comment.