diff --git a/.sqlx/query-1331f64dbbf63fc694e3358aefd2bdc4b3bcff64eda36420acde1a948884239d.json b/.sqlx/query-1331f64dbbf63fc694e3358aefd2bdc4b3bcff64eda36420acde1a948884239d.json new file mode 100644 index 000000000..58ad0c8c7 --- /dev/null +++ b/.sqlx/query-1331f64dbbf63fc694e3358aefd2bdc4b3bcff64eda36420acde1a948884239d.json @@ -0,0 +1,30 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n updated_at as updated_at,\n oid as row_id\n FROM af_collab_database_row\n WHERE workspace_id = $1\n AND oid = ANY($2)\n AND updated_at > $3\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "updated_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 1, + "name": "row_id", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Uuid", + "TextArray", + "Timestamptz" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "1331f64dbbf63fc694e3358aefd2bdc4b3bcff64eda36420acde1a948884239d" +} diff --git a/libs/client-api/src/http_collab.rs b/libs/client-api/src/http_collab.rs index 0269c9cc3..d02b9c540 100644 --- a/libs/client-api/src/http_collab.rs +++ b/libs/client-api/src/http_collab.rs @@ -2,8 +2,10 @@ use crate::http::log_request_id; use crate::{blocking_brotli_compress, brotli_compress, Client}; use app_error::AppError; use bytes::Bytes; +use chrono::{DateTime, Utc}; use client_api_entity::workspace_dto::{ - AFDatabase, AFDatabaseRow, AFDatabaseRowDetail, ListDatabaseRowDetailParam, + AFDatabase, AFDatabaseRow, AFDatabaseRowDetail, DatabaseRowUpdatedItem, + ListDatabaseRowDetailParam, ListDatabaseRowUpdatedParam, }; use client_api_entity::{ BatchQueryCollabParams, BatchQueryCollabResult, CollabParams, CreateCollabParams, @@ -190,6 +192,26 @@ impl Client { AppResponse::from_response(resp).await?.into_data() } + pub async fn list_database_row_ids_updated( + &self, + workspace_id: &str, + database_id: &str, + after: Option>, + ) -> Result, AppResponseError> { + let url = format!( + "{}/api/workspace/{}/database/{}/row/updated", + self.base_url, workspace_id, database_id + ); + let resp = self + .http_client_with_auth(Method::GET, &url) + .await? + .query(&ListDatabaseRowUpdatedParam { after }) + .send() + .await?; + log_request_id(&resp); + AppResponse::from_response(resp).await?.into_data() + } + pub async fn list_database_row_details( &self, workspace_id: &str, diff --git a/libs/database/src/collab/collab_db_ops.rs b/libs/database/src/collab/collab_db_ops.rs index 7bdf84ed8..2301c603b 100644 --- a/libs/database/src/collab/collab_db_ops.rs +++ b/libs/database/src/collab/collab_db_ops.rs @@ -4,12 +4,13 @@ use database_entity::dto::{ AFAccessLevel, AFCollabMember, AFPermission, AFSnapshotMeta, AFSnapshotMetas, CollabParams, QueryCollab, QueryCollabResult, RawData, }; +use shared_entity::dto::workspace_dto::DatabaseRowUpdatedItem; use crate::collab::{partition_key_from_collab_type, SNAPSHOT_PER_HOUR}; use crate::pg_row::AFSnapshotRow; use crate::pg_row::{AFCollabMemberAccessLevelRow, AFCollabRowMeta}; use app_error::AppError; -use chrono::{Duration, Utc}; +use chrono::{DateTime, Duration, Utc}; use futures_util::stream::BoxStream; use sqlx::postgres::PgRow; @@ -792,3 +793,29 @@ pub async fn select_workspace_database_oid<'a, E: Executor<'a, Database = Postgr .fetch_one(executor) .await } + +pub async fn select_last_updated_database_row_ids( + pg_pool: &PgPool, + workspace_id: &Uuid, + row_ids: &[String], + after: &DateTime, +) -> Result, sqlx::Error> { + let updated_row_items = sqlx::query_as!( + DatabaseRowUpdatedItem, + r#" + SELECT + updated_at as updated_at, + oid as row_id + FROM af_collab_database_row + WHERE workspace_id = $1 + AND oid = ANY($2) + AND updated_at > $3 + "#, + workspace_id, + row_ids, + after, + ) + .fetch_all(pg_pool) + .await?; + Ok(updated_row_items) +} diff --git a/libs/shared-entity/src/dto/workspace_dto.rs b/libs/shared-entity/src/dto/workspace_dto.rs index 72cf0b277..fb071bee8 100644 --- a/libs/shared-entity/src/dto/workspace_dto.rs +++ b/libs/shared-entity/src/dto/workspace_dto.rs @@ -300,6 +300,17 @@ pub struct ListDatabaseRowDetailParam { pub ids: String, } +#[derive(Default, Debug, Deserialize, Serialize)] +pub struct ListDatabaseRowUpdatedParam { + pub after: Option>, +} + +#[derive(Default, Debug, Deserialize, Serialize)] +pub struct DatabaseRowUpdatedItem { + pub updated_at: DateTime, + pub row_id: String, +} + impl ListDatabaseRowDetailParam { pub fn from(ids: &[&str]) -> Self { Self { ids: ids.join(",") } diff --git a/migrations/20241124212630_af_collab_updated_at.sql b/migrations/20241124212630_af_collab_updated_at.sql new file mode 100644 index 000000000..f61e0ff7e --- /dev/null +++ b/migrations/20241124212630_af_collab_updated_at.sql @@ -0,0 +1,22 @@ +-- Add `updated_at` column to `af_collab` table +ALTER TABLE public.af_collab +ADD COLUMN updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP; + +-- Create or replace function to update `updated_at` column +CREATE OR REPLACE FUNCTION update_updated_at_column() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = CURRENT_TIMESTAMP; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- Create trigger to update `updated_at` column +CREATE TRIGGER set_updated_at +BEFORE INSERT OR UPDATE ON public.af_collab +FOR EACH ROW +EXECUTE FUNCTION update_updated_at_column(); + +-- Create index on `updated_at` column +CREATE INDEX idx_af_collab_updated_at +ON public.af_collab (updated_at); diff --git a/src/api/workspace.rs b/src/api/workspace.rs index fd07b4923..03229f5c3 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -5,6 +5,7 @@ use actix_web::{web, Scope}; use actix_web::{HttpRequest, Result}; use anyhow::{anyhow, Context}; use bytes::BytesMut; +use chrono::{DateTime, Duration, Utc}; use collab::entity::EncodedCollab; use collab_entity::CollabType; use futures_util::future::try_join_all; @@ -260,6 +261,10 @@ pub fn workspace_scope() -> Scope { web::resource("/{workspace_id}/database/{database_id}/row") .route(web::get().to(list_database_row_id_handler)), ) + .service( + web::resource("/{workspace_id}/database/{database_id}/row/updated") + .route(web::get().to(list_database_row_id_updated_handler)), + ) .service( web::resource("/{workspace_id}/database/{database_id}/row/detail") .route(web::get().to(list_database_row_details_handler)), @@ -1892,9 +1897,42 @@ async fn list_database_row_id_handler( .enforce_action(&uid, &workspace_id, Action::Read) .await?; - let db_rows = - biz::collab::ops::list_database_row(&state.collab_access_control_storage, workspace_id, db_id) - .await?; + let db_rows = biz::collab::ops::list_database_row_ids( + &state.collab_access_control_storage, + &workspace_id, + &db_id, + ) + .await?; + Ok(Json(AppResponse::Ok().with_data(db_rows))) +} + +async fn list_database_row_id_updated_handler( + user_uuid: UserUuid, + path_param: web::Path<(String, String)>, + state: Data, + param: web::Query, +) -> Result>>> { + let (workspace_id, db_id) = path_param.into_inner(); + let uid = state.user_cache.get_user_uid(&user_uuid).await?; + + state + .workspace_access_control + .enforce_action(&uid, &workspace_id, Action::Read) + .await?; + + // Default to 1 hour ago + let after: DateTime = param + .after + .unwrap_or_else(|| Utc::now() - Duration::hours(1)); + + let db_rows = biz::collab::ops::list_database_row_ids_updated( + &state.collab_access_control_storage, + &state.pg_pool, + &workspace_id, + &db_id, + &after, + ) + .await?; Ok(Json(AppResponse::Ok().with_data(db_rows))) } diff --git a/src/biz/collab/ops.rs b/src/biz/collab/ops.rs index 379ac845b..c91407068 100644 --- a/src/biz/collab/ops.rs +++ b/src/biz/collab/ops.rs @@ -3,6 +3,8 @@ use std::sync::Arc; use app_error::AppError; use appflowy_collaborate::collab::storage::CollabAccessControlStorage; +use chrono::DateTime; +use chrono::Utc; use collab::preclude::Collab; use collab_database::database::DatabaseBody; use collab_database::entity::FieldType; @@ -15,6 +17,7 @@ use collab_entity::CollabType; use collab_entity::EncodedCollab; use collab_folder::SectionItem; use collab_folder::{CollabOrigin, Folder}; +use database::collab::select_last_updated_database_row_ids; use database::collab::select_workspace_database_oid; use database::collab::{CollabStorage, GetCollabOrigin}; use database::publish::select_published_view_ids_for_workspace; @@ -24,6 +27,7 @@ use database_entity::dto::{QueryCollab, QueryCollabParams}; use shared_entity::dto::workspace_dto::AFDatabase; use shared_entity::dto::workspace_dto::AFDatabaseRow; use shared_entity::dto::workspace_dto::AFDatabaseRowDetail; +use shared_entity::dto::workspace_dto::DatabaseRowUpdatedItem; use shared_entity::dto::workspace_dto::FavoriteFolderView; use shared_entity::dto::workspace_dto::FolderViewMinimal; use shared_entity::dto::workspace_dto::RecentFolderView; @@ -435,16 +439,16 @@ pub async fn list_database( Ok(af_databases) } -pub async fn list_database_row( +pub async fn list_database_row_ids( collab_storage: &CollabAccessControlStorage, - workspace_uuid_str: String, - database_uuid_str: String, + workspace_uuid_str: &str, + database_uuid_str: &str, ) -> Result, AppError> { let db_collab = get_latest_collab( collab_storage, GetCollabOrigin::Server, - &workspace_uuid_str, - &database_uuid_str, + workspace_uuid_str, + database_uuid_str, CollabType::Database, ) .await?; @@ -479,6 +483,25 @@ pub async fn list_database_row( Ok(db_rows) } +pub async fn list_database_row_ids_updated( + collab_storage: &CollabAccessControlStorage, + pg_pool: &PgPool, + workspace_uuid_str: &str, + database_uuid_str: &str, + after: &DateTime, +) -> Result, AppError> { + let row_ids = list_database_row_ids(collab_storage, workspace_uuid_str, database_uuid_str) + .await? + .into_iter() + .map(|row| row.id) + .collect::>(); + + let workspace_uuid: Uuid = workspace_uuid_str.parse()?; + let updated_row_ids = + select_last_updated_database_row_ids(pg_pool, &workspace_uuid, &row_ids, after).await?; + Ok(updated_row_ids) +} + pub async fn list_database_row_details( collab_storage: &CollabAccessControlStorage, uid: i64, diff --git a/tests/workspace/workspace_crud.rs b/tests/workspace/workspace_crud.rs index b798b29ae..4f23afa09 100644 --- a/tests/workspace/workspace_crud.rs +++ b/tests/workspace/workspace_crud.rs @@ -24,6 +24,13 @@ async fn workspace_list_database() { .unwrap(); assert_eq!(db_row_ids.len(), 5, "{:?}", db_row_ids); } + { + let db_row_ids = c + .list_database_row_ids_updated(&workspace_id, &todos_db.id, None) + .await + .unwrap(); + assert_eq!(db_row_ids.len(), 5, "{:?}", db_row_ids); + } { let db_row_ids = c