From e758f18d7599b32520b1a85e82dd96673cfad12c Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Wed, 18 Dec 2024 22:48:25 +0800 Subject: [PATCH] chore: add metadata column to save embeding info (#1086) --- libs/database-entity/src/dto.rs | 61 +------------------ .../src/index/collab_embeddings_ops.rs | 8 ++- ...18090459_collab_embedding_add_metadata.sql | 42 +++++++++++++ .../src/indexer/document_indexer.rs | 4 ++ src/api/workspace.rs | 2 +- 5 files changed, 53 insertions(+), 64 deletions(-) create mode 100644 migrations/20241218090459_collab_embedding_add_metadata.sql diff --git a/libs/database-entity/src/dto.rs b/libs/database-entity/src/dto.rs index 432f2b381..6f77bf80b 100644 --- a/libs/database-entity/src/dto.rs +++ b/libs/database-entity/src/dto.rs @@ -746,44 +746,7 @@ pub struct AFCollabEmbeddedChunk { pub content_type: EmbeddingContentType, pub content: String, pub embedding: Option>, -} - -impl AFCollabEmbeddedChunk { - pub fn from_proto(proto: &proto::collab::CollabEmbeddingsParams) -> Result { - let collab_type_proto = proto::collab::CollabType::try_from(proto.collab_type).unwrap(); - let collab_type = CollabType::from_proto(&collab_type_proto); - let content_type_proto = - proto::collab::EmbeddingContentType::try_from(proto.content_type).unwrap(); - let content_type = EmbeddingContentType::from_proto(content_type_proto)?; - let embedding = if proto.embedding.is_empty() { - None - } else { - Some(proto.embedding.clone()) - }; - Ok(Self { - fragment_id: proto.fragment_id.clone(), - object_id: proto.object_id.clone(), - collab_type, - content_type, - content: proto.content.clone(), - embedding, - }) - } - - pub fn to_proto(&self) -> proto::collab::CollabEmbeddingsParams { - proto::collab::CollabEmbeddingsParams { - fragment_id: self.fragment_id.clone(), - object_id: self.object_id.clone(), - collab_type: self.collab_type.to_proto() as i32, - content_type: self.content_type.to_proto() as i32, - content: self.content.clone(), - embedding: self.embedding.clone().unwrap_or_default(), - } - } - - pub fn to_protobuf_bytes(&self) -> Vec { - self.to_proto().encode_to_vec() - } + pub metadata: serde_json::Value, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -792,28 +755,6 @@ pub struct AFCollabEmbeddings { pub params: Vec, } -impl AFCollabEmbeddings { - pub fn from_proto(proto: proto::collab::CollabEmbeddings) -> Result { - let mut params = vec![]; - for param in proto.embeddings { - params.push(AFCollabEmbeddedChunk::from_proto(¶m)?); - } - Ok(Self { - tokens_consumed: proto.tokens_consumed, - params, - }) - } - - pub fn to_proto(&self) -> proto::collab::CollabEmbeddings { - let embeddings: Vec = - self.params.iter().map(|param| param.to_proto()).collect(); - proto::collab::CollabEmbeddings { - tokens_consumed: self.tokens_consumed, - embeddings, - } - } -} - /// Type of content stored by the embedding. /// Currently only plain text of the document is supported. /// In the future, we might support other kinds like i.e. PDF, images or image-extracted text. diff --git a/libs/database/src/index/collab_embeddings_ops.rs b/libs/database/src/index/collab_embeddings_ops.rs index 874c271da..4063699ec 100644 --- a/libs/database/src/index/collab_embeddings_ops.rs +++ b/libs/database/src/index/collab_embeddings_ops.rs @@ -57,12 +57,13 @@ WHERE w.workspace_id = $1"#, } #[derive(sqlx::Type)] -#[sqlx(type_name = "af_fragment", no_pg_array)] +#[sqlx(type_name = "af_fragment_v2", no_pg_array)] struct Fragment { fragment_id: String, content_type: i32, contents: String, embedding: Option, + metadata: serde_json::Value, } impl From for Fragment { @@ -72,13 +73,14 @@ impl From for Fragment { content_type: value.content_type as i32, contents: value.content, embedding: value.embedding.map(Vector::from), + metadata: value.metadata, } } } impl PgHasArrayType for Fragment { fn array_type_info() -> PgTypeInfo { - PgTypeInfo::with_name("af_fragment[]") + PgTypeInfo::with_name("af_fragment_v2[]") } } @@ -96,7 +98,7 @@ pub async fn upsert_collab_embeddings( let collab_type = records[0].collab_type.clone(); let fragments = records.into_iter().map(Fragment::from).collect::>(); - sqlx::query(r#"CALL af_collab_embeddings_upsert($1, $2, $3, $4, $5::af_fragment[])"#) + sqlx::query(r#"CALL af_collab_embeddings_upsert($1, $2, $3, $4, $5::af_fragment_v2[])"#) .bind(*workspace_id) .bind(object_id) .bind(crate::collab::partition_key_from_collab_type(&collab_type)) diff --git a/migrations/20241218090459_collab_embedding_add_metadata.sql b/migrations/20241218090459_collab_embedding_add_metadata.sql new file mode 100644 index 000000000..92ebaabea --- /dev/null +++ b/migrations/20241218090459_collab_embedding_add_metadata.sql @@ -0,0 +1,42 @@ +-- Add migration script here +ALTER TABLE af_collab_embeddings +ADD COLUMN metadata JSONB DEFAULT '{}'::jsonb; + +CREATE TYPE af_fragment_v2 AS ( + fragment_id TEXT, + content_type INT, + contents TEXT, + embedding VECTOR(1536), + metadata JSONB +); + +CREATE OR REPLACE PROCEDURE af_collab_embeddings_upsert( + IN p_workspace_id UUID, + IN p_oid TEXT, + IN p_partition_key INT, + IN p_tokens_used INT, + IN p_fragments af_fragment_v2[] +) +LANGUAGE plpgsql +AS $$ +BEGIN + DELETE FROM af_collab_embeddings WHERE oid = p_oid; + INSERT INTO af_collab_embeddings (fragment_id, oid, partition_key, content_type, content, embedding, indexed_at, metadata) + SELECT + f.fragment_id, + p_oid, + p_partition_key, + f.content_type, + f.contents, + f.embedding, + NOW(), + f.metadata + FROM UNNEST(p_fragments) as f; + + -- Update the usage tracking table + INSERT INTO af_workspace_ai_usage(created_at, workspace_id, search_requests, search_tokens_consumed, index_tokens_consumed) + VALUES (now()::date, p_workspace_id, 0, 0, p_tokens_used) + ON CONFLICT (created_at, workspace_id) + DO UPDATE SET index_tokens_consumed = af_workspace_ai_usage.index_tokens_consumed + p_tokens_used; +END +$$; \ No newline at end of file diff --git a/services/appflowy-collaborate/src/indexer/document_indexer.rs b/services/appflowy-collaborate/src/indexer/document_indexer.rs index 1ce9c8b54..fec2d845f 100644 --- a/services/appflowy-collaborate/src/indexer/document_indexer.rs +++ b/services/appflowy-collaborate/src/indexer/document_indexer.rs @@ -12,6 +12,7 @@ use collab_document::document::DocumentBody; use collab_document::error::DocumentError; use collab_entity::CollabType; use database_entity::dto::{AFCollabEmbeddedChunk, AFCollabEmbeddings, EmbeddingContentType}; +use serde_json::json; use tracing::trace; use uuid::Uuid; @@ -106,6 +107,8 @@ fn split_text_into_chunks( // We assume that every token is ~4 bytes. We're going to split document content into fragments // of ~2000 tokens each. let split_contents = split_text_by_max_content_len(content, 8000)?; + let metadata = + json!({"id": object_id, "source": "appflowy", "name": "document", "collab_type": collab_type }); Ok( split_contents .into_iter() @@ -116,6 +119,7 @@ fn split_text_into_chunks( content_type: EmbeddingContentType::PlainText, content, embedding: None, + metadata: metadata.clone(), }) .collect(), ) diff --git a/src/api/workspace.rs b/src/api/workspace.rs index aa3f5e00d..fa18bb2d5 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -2397,7 +2397,7 @@ async fn collab_full_sync_handler( uid, device_id, connect_at: timestamp(), - session_id: uuid::Uuid::new_v4().to_string(), + session_id: Uuid::new_v4().to_string(), app_version, };