Skip to content

Commit

Permalink
Merge pull request #1017 from AppFlowy-IO/feat/database-row-update
Browse files Browse the repository at this point in the history
feat: add a polling api for getting database row id updates
  • Loading branch information
speed2exe authored Nov 25, 2024
2 parents fa80a4c + c4d5285 commit 9e067e6
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 10 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 23 additions & 1 deletion libs/client-api/src/http_collab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use crate::http::log_request_id;
use crate::{blocking_brotli_compress, brotli_compress, Client};
use app_error::AppError;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use client_api_entity::workspace_dto::{
AFDatabase, AFDatabaseRow, AFDatabaseRowDetail, ListDatabaseRowDetailParam,
AFDatabase, AFDatabaseRow, AFDatabaseRowDetail, DatabaseRowUpdatedItem,
ListDatabaseRowDetailParam, ListDatabaseRowUpdatedParam,
};
use client_api_entity::{
BatchQueryCollabParams, BatchQueryCollabResult, CollabParams, CreateCollabParams,
Expand Down Expand Up @@ -190,6 +192,26 @@ impl Client {
AppResponse::from_response(resp).await?.into_data()
}

pub async fn list_database_row_ids_updated(
&self,
workspace_id: &str,
database_id: &str,
after: Option<DateTime<Utc>>,
) -> Result<Vec<DatabaseRowUpdatedItem>, AppResponseError> {
let url = format!(
"{}/api/workspace/{}/database/{}/row/updated",
self.base_url, workspace_id, database_id
);
let resp = self
.http_client_with_auth(Method::GET, &url)
.await?
.query(&ListDatabaseRowUpdatedParam { after })
.send()
.await?;
log_request_id(&resp);
AppResponse::from_response(resp).await?.into_data()
}

pub async fn list_database_row_details(
&self,
workspace_id: &str,
Expand Down
29 changes: 28 additions & 1 deletion libs/database/src/collab/collab_db_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use database_entity::dto::{
AFAccessLevel, AFCollabMember, AFPermission, AFSnapshotMeta, AFSnapshotMetas, CollabParams,
QueryCollab, QueryCollabResult, RawData,
};
use shared_entity::dto::workspace_dto::DatabaseRowUpdatedItem;

use crate::collab::{partition_key_from_collab_type, SNAPSHOT_PER_HOUR};
use crate::pg_row::AFSnapshotRow;
use crate::pg_row::{AFCollabMemberAccessLevelRow, AFCollabRowMeta};
use app_error::AppError;
use chrono::{Duration, Utc};
use chrono::{DateTime, Duration, Utc};
use futures_util::stream::BoxStream;

use sqlx::postgres::PgRow;
Expand Down Expand Up @@ -792,3 +793,29 @@ pub async fn select_workspace_database_oid<'a, E: Executor<'a, Database = Postgr
.fetch_one(executor)
.await
}

pub async fn select_last_updated_database_row_ids(
pg_pool: &PgPool,
workspace_id: &Uuid,
row_ids: &[String],
after: &DateTime<Utc>,
) -> Result<Vec<DatabaseRowUpdatedItem>, sqlx::Error> {
let updated_row_items = sqlx::query_as!(
DatabaseRowUpdatedItem,
r#"
SELECT
updated_at as updated_at,
oid as row_id
FROM af_collab_database_row
WHERE workspace_id = $1
AND oid = ANY($2)
AND updated_at > $3
"#,
workspace_id,
row_ids,
after,
)
.fetch_all(pg_pool)
.await?;
Ok(updated_row_items)
}
11 changes: 11 additions & 0 deletions libs/shared-entity/src/dto/workspace_dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,17 @@ pub struct ListDatabaseRowDetailParam {
pub ids: String,
}

#[derive(Default, Debug, Deserialize, Serialize)]
pub struct ListDatabaseRowUpdatedParam {
pub after: Option<DateTime<Utc>>,
}

#[derive(Default, Debug, Deserialize, Serialize)]
pub struct DatabaseRowUpdatedItem {
pub updated_at: DateTime<Utc>,
pub row_id: String,
}

impl ListDatabaseRowDetailParam {
pub fn from(ids: &[&str]) -> Self {
Self { ids: ids.join(",") }
Expand Down
22 changes: 22 additions & 0 deletions migrations/20241124212630_af_collab_updated_at.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- Add `updated_at` column to `af_collab` table
ALTER TABLE public.af_collab
ADD COLUMN updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP;

-- Create or replace function to update `updated_at` column
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Create trigger to update `updated_at` column
CREATE TRIGGER set_updated_at
BEFORE INSERT OR UPDATE ON public.af_collab
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();

-- Create index on `updated_at` column
CREATE INDEX idx_af_collab_updated_at
ON public.af_collab (updated_at);
44 changes: 41 additions & 3 deletions src/api/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use actix_web::{web, Scope};
use actix_web::{HttpRequest, Result};
use anyhow::{anyhow, Context};
use bytes::BytesMut;
use chrono::{DateTime, Duration, Utc};
use collab::entity::EncodedCollab;
use collab_entity::CollabType;
use futures_util::future::try_join_all;
Expand Down Expand Up @@ -260,6 +261,10 @@ pub fn workspace_scope() -> Scope {
web::resource("/{workspace_id}/database/{database_id}/row")
.route(web::get().to(list_database_row_id_handler)),
)
.service(
web::resource("/{workspace_id}/database/{database_id}/row/updated")
.route(web::get().to(list_database_row_id_updated_handler)),
)
.service(
web::resource("/{workspace_id}/database/{database_id}/row/detail")
.route(web::get().to(list_database_row_details_handler)),
Expand Down Expand Up @@ -1892,9 +1897,42 @@ async fn list_database_row_id_handler(
.enforce_action(&uid, &workspace_id, Action::Read)
.await?;

let db_rows =
biz::collab::ops::list_database_row(&state.collab_access_control_storage, workspace_id, db_id)
.await?;
let db_rows = biz::collab::ops::list_database_row_ids(
&state.collab_access_control_storage,
&workspace_id,
&db_id,
)
.await?;
Ok(Json(AppResponse::Ok().with_data(db_rows)))
}

async fn list_database_row_id_updated_handler(
user_uuid: UserUuid,
path_param: web::Path<(String, String)>,
state: Data<AppState>,
param: web::Query<ListDatabaseRowUpdatedParam>,
) -> Result<Json<AppResponse<Vec<DatabaseRowUpdatedItem>>>> {
let (workspace_id, db_id) = path_param.into_inner();
let uid = state.user_cache.get_user_uid(&user_uuid).await?;

state
.workspace_access_control
.enforce_action(&uid, &workspace_id, Action::Read)
.await?;

// Default to 1 hour ago
let after: DateTime<Utc> = param
.after
.unwrap_or_else(|| Utc::now() - Duration::hours(1));

let db_rows = biz::collab::ops::list_database_row_ids_updated(
&state.collab_access_control_storage,
&state.pg_pool,
&workspace_id,
&db_id,
&after,
)
.await?;
Ok(Json(AppResponse::Ok().with_data(db_rows)))
}

Expand Down
33 changes: 28 additions & 5 deletions src/biz/collab/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::sync::Arc;

use app_error::AppError;
use appflowy_collaborate::collab::storage::CollabAccessControlStorage;
use chrono::DateTime;
use chrono::Utc;
use collab::preclude::Collab;
use collab_database::database::DatabaseBody;
use collab_database::entity::FieldType;
Expand All @@ -15,6 +17,7 @@ use collab_entity::CollabType;
use collab_entity::EncodedCollab;
use collab_folder::SectionItem;
use collab_folder::{CollabOrigin, Folder};
use database::collab::select_last_updated_database_row_ids;
use database::collab::select_workspace_database_oid;
use database::collab::{CollabStorage, GetCollabOrigin};
use database::publish::select_published_view_ids_for_workspace;
Expand All @@ -24,6 +27,7 @@ use database_entity::dto::{QueryCollab, QueryCollabParams};
use shared_entity::dto::workspace_dto::AFDatabase;
use shared_entity::dto::workspace_dto::AFDatabaseRow;
use shared_entity::dto::workspace_dto::AFDatabaseRowDetail;
use shared_entity::dto::workspace_dto::DatabaseRowUpdatedItem;
use shared_entity::dto::workspace_dto::FavoriteFolderView;
use shared_entity::dto::workspace_dto::FolderViewMinimal;
use shared_entity::dto::workspace_dto::RecentFolderView;
Expand Down Expand Up @@ -435,16 +439,16 @@ pub async fn list_database(
Ok(af_databases)
}

pub async fn list_database_row(
pub async fn list_database_row_ids(
collab_storage: &CollabAccessControlStorage,
workspace_uuid_str: String,
database_uuid_str: String,
workspace_uuid_str: &str,
database_uuid_str: &str,
) -> Result<Vec<AFDatabaseRow>, AppError> {
let db_collab = get_latest_collab(
collab_storage,
GetCollabOrigin::Server,
&workspace_uuid_str,
&database_uuid_str,
workspace_uuid_str,
database_uuid_str,
CollabType::Database,
)
.await?;
Expand Down Expand Up @@ -479,6 +483,25 @@ pub async fn list_database_row(
Ok(db_rows)
}

pub async fn list_database_row_ids_updated(
collab_storage: &CollabAccessControlStorage,
pg_pool: &PgPool,
workspace_uuid_str: &str,
database_uuid_str: &str,
after: &DateTime<Utc>,
) -> Result<Vec<DatabaseRowUpdatedItem>, AppError> {
let row_ids = list_database_row_ids(collab_storage, workspace_uuid_str, database_uuid_str)
.await?
.into_iter()
.map(|row| row.id)
.collect::<Vec<String>>();

let workspace_uuid: Uuid = workspace_uuid_str.parse()?;
let updated_row_ids =
select_last_updated_database_row_ids(pg_pool, &workspace_uuid, &row_ids, after).await?;
Ok(updated_row_ids)
}

pub async fn list_database_row_details(
collab_storage: &CollabAccessControlStorage,
uid: i64,
Expand Down
7 changes: 7 additions & 0 deletions tests/workspace/workspace_crud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ async fn workspace_list_database() {
.unwrap();
assert_eq!(db_row_ids.len(), 5, "{:?}", db_row_ids);
}
{
let db_row_ids = c
.list_database_row_ids_updated(&workspace_id, &todos_db.id, None)
.await
.unwrap();
assert_eq!(db_row_ids.len(), 5, "{:?}", db_row_ids);
}

{
let db_row_ids = c
Expand Down

0 comments on commit 9e067e6

Please sign in to comment.