Skip to content

Commit

Permalink
chore: add metadata column to save embeding info (#1086)
Browse files Browse the repository at this point in the history
  • Loading branch information
appflowy authored Dec 18, 2024
1 parent 7ff9a92 commit e758f18
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 64 deletions.
61 changes: 1 addition & 60 deletions libs/database-entity/src/dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,44 +746,7 @@ pub struct AFCollabEmbeddedChunk {
pub content_type: EmbeddingContentType,
pub content: String,
pub embedding: Option<Vec<f32>>,
}

impl AFCollabEmbeddedChunk {
pub fn from_proto(proto: &proto::collab::CollabEmbeddingsParams) -> Result<Self, EntityError> {
let collab_type_proto = proto::collab::CollabType::try_from(proto.collab_type).unwrap();
let collab_type = CollabType::from_proto(&collab_type_proto);
let content_type_proto =
proto::collab::EmbeddingContentType::try_from(proto.content_type).unwrap();
let content_type = EmbeddingContentType::from_proto(content_type_proto)?;
let embedding = if proto.embedding.is_empty() {
None
} else {
Some(proto.embedding.clone())
};
Ok(Self {
fragment_id: proto.fragment_id.clone(),
object_id: proto.object_id.clone(),
collab_type,
content_type,
content: proto.content.clone(),
embedding,
})
}

pub fn to_proto(&self) -> proto::collab::CollabEmbeddingsParams {
proto::collab::CollabEmbeddingsParams {
fragment_id: self.fragment_id.clone(),
object_id: self.object_id.clone(),
collab_type: self.collab_type.to_proto() as i32,
content_type: self.content_type.to_proto() as i32,
content: self.content.clone(),
embedding: self.embedding.clone().unwrap_or_default(),
}
}

pub fn to_protobuf_bytes(&self) -> Vec<u8> {
self.to_proto().encode_to_vec()
}
pub metadata: serde_json::Value,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
Expand All @@ -792,28 +755,6 @@ pub struct AFCollabEmbeddings {
pub params: Vec<AFCollabEmbeddedChunk>,
}

impl AFCollabEmbeddings {
pub fn from_proto(proto: proto::collab::CollabEmbeddings) -> Result<Self, EntityError> {
let mut params = vec![];
for param in proto.embeddings {
params.push(AFCollabEmbeddedChunk::from_proto(&param)?);
}
Ok(Self {
tokens_consumed: proto.tokens_consumed,
params,
})
}

pub fn to_proto(&self) -> proto::collab::CollabEmbeddings {
let embeddings: Vec<proto::collab::CollabEmbeddingsParams> =
self.params.iter().map(|param| param.to_proto()).collect();
proto::collab::CollabEmbeddings {
tokens_consumed: self.tokens_consumed,
embeddings,
}
}
}

/// Type of content stored by the embedding.
/// Currently only plain text of the document is supported.
/// In the future, we might support other kinds like i.e. PDF, images or image-extracted text.
Expand Down
8 changes: 5 additions & 3 deletions libs/database/src/index/collab_embeddings_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ WHERE w.workspace_id = $1"#,
}

#[derive(sqlx::Type)]
#[sqlx(type_name = "af_fragment", no_pg_array)]
#[sqlx(type_name = "af_fragment_v2", no_pg_array)]
struct Fragment {
fragment_id: String,
content_type: i32,
contents: String,
embedding: Option<Vector>,
metadata: serde_json::Value,
}

impl From<AFCollabEmbeddedChunk> for Fragment {
Expand All @@ -72,13 +73,14 @@ impl From<AFCollabEmbeddedChunk> for Fragment {
content_type: value.content_type as i32,
contents: value.content,
embedding: value.embedding.map(Vector::from),
metadata: value.metadata,
}
}
}

impl PgHasArrayType for Fragment {
fn array_type_info() -> PgTypeInfo {
PgTypeInfo::with_name("af_fragment[]")
PgTypeInfo::with_name("af_fragment_v2[]")
}
}

Expand All @@ -96,7 +98,7 @@ pub async fn upsert_collab_embeddings(
let collab_type = records[0].collab_type.clone();
let fragments = records.into_iter().map(Fragment::from).collect::<Vec<_>>();

sqlx::query(r#"CALL af_collab_embeddings_upsert($1, $2, $3, $4, $5::af_fragment[])"#)
sqlx::query(r#"CALL af_collab_embeddings_upsert($1, $2, $3, $4, $5::af_fragment_v2[])"#)
.bind(*workspace_id)
.bind(object_id)
.bind(crate::collab::partition_key_from_collab_type(&collab_type))
Expand Down
42 changes: 42 additions & 0 deletions migrations/20241218090459_collab_embedding_add_metadata.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
-- Add migration script here
ALTER TABLE af_collab_embeddings
ADD COLUMN metadata JSONB DEFAULT '{}'::jsonb;

CREATE TYPE af_fragment_v2 AS (
fragment_id TEXT,
content_type INT,
contents TEXT,
embedding VECTOR(1536),
metadata JSONB
);

CREATE OR REPLACE PROCEDURE af_collab_embeddings_upsert(
IN p_workspace_id UUID,
IN p_oid TEXT,
IN p_partition_key INT,
IN p_tokens_used INT,
IN p_fragments af_fragment_v2[]
)
LANGUAGE plpgsql
AS $$
BEGIN
DELETE FROM af_collab_embeddings WHERE oid = p_oid;
INSERT INTO af_collab_embeddings (fragment_id, oid, partition_key, content_type, content, embedding, indexed_at, metadata)
SELECT
f.fragment_id,
p_oid,
p_partition_key,
f.content_type,
f.contents,
f.embedding,
NOW(),
f.metadata
FROM UNNEST(p_fragments) as f;

-- Update the usage tracking table
INSERT INTO af_workspace_ai_usage(created_at, workspace_id, search_requests, search_tokens_consumed, index_tokens_consumed)
VALUES (now()::date, p_workspace_id, 0, 0, p_tokens_used)
ON CONFLICT (created_at, workspace_id)
DO UPDATE SET index_tokens_consumed = af_workspace_ai_usage.index_tokens_consumed + p_tokens_used;
END
$$;
4 changes: 4 additions & 0 deletions services/appflowy-collaborate/src/indexer/document_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use collab_document::document::DocumentBody;
use collab_document::error::DocumentError;
use collab_entity::CollabType;
use database_entity::dto::{AFCollabEmbeddedChunk, AFCollabEmbeddings, EmbeddingContentType};
use serde_json::json;
use tracing::trace;
use uuid::Uuid;

Expand Down Expand Up @@ -106,6 +107,8 @@ fn split_text_into_chunks(
// We assume that every token is ~4 bytes. We're going to split document content into fragments
// of ~2000 tokens each.
let split_contents = split_text_by_max_content_len(content, 8000)?;
let metadata =
json!({"id": object_id, "source": "appflowy", "name": "document", "collab_type": collab_type });
Ok(
split_contents
.into_iter()
Expand All @@ -116,6 +119,7 @@ fn split_text_into_chunks(
content_type: EmbeddingContentType::PlainText,
content,
embedding: None,
metadata: metadata.clone(),
})
.collect(),
)
Expand Down
2 changes: 1 addition & 1 deletion src/api/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2397,7 +2397,7 @@ async fn collab_full_sync_handler(
uid,
device_id,
connect_at: timestamp(),
session_id: uuid::Uuid::new_v4().to_string(),
session_id: Uuid::new_v4().to_string(),
app_version,
};

Expand Down

0 comments on commit e758f18

Please sign in to comment.