diff --git a/libs/client-api/src/http_view.rs b/libs/client-api/src/http_view.rs index fefcfbafb..75e446e80 100644 --- a/libs/client-api/src/http_view.rs +++ b/libs/client-api/src/http_view.rs @@ -97,6 +97,40 @@ impl Client { AppResponse::<()>::from_response(resp).await?.into_error() } + pub async fn delete_workspace_page_view_from_trash( + &self, + workspace_id: Uuid, + view_id: &str, + ) -> Result<(), AppResponseError> { + let url = format!( + "{}/api/workspace/{}/trash/{}", + self.base_url, workspace_id, view_id + ); + let resp = self + .http_client_with_auth(Method::DELETE, &url) + .await? + .send() + .await?; + AppResponse::<()>::from_response(resp).await?.into_error() + } + + pub async fn delete_all_workspace_page_views_from_trash( + &self, + workspace_id: Uuid, + ) -> Result<(), AppResponseError> { + let url = format!( + "{}/api/workspace/{}/delete-all-pages-from-trash", + self.base_url, workspace_id + ); + let resp = self + .http_client_with_auth(Method::POST, &url) + .await? + .json(&json!({})) + .send() + .await?; + AppResponse::<()>::from_response(resp).await?.into_error() + } + pub async fn update_workspace_page_view( &self, workspace_id: Uuid, diff --git a/src/api/metrics.rs b/src/api/metrics.rs index 41d4f54cb..bf380182d 100644 --- a/src/api/metrics.rs +++ b/src/api/metrics.rs @@ -233,6 +233,7 @@ pub struct AppFlowyWebMetrics { pub update_size_bytes: Histogram, pub decoding_failure_count: Gauge, pub apply_update_failure_count: Gauge, + pub apply_update_timeout_count: Gauge, } impl AppFlowyWebMetrics { @@ -243,6 +244,7 @@ impl AppFlowyWebMetrics { update_size_bytes: Histogram::new(update_size_buckets), decoding_failure_count: Default::default(), apply_update_failure_count: Default::default(), + apply_update_timeout_count: Default::default(), } } @@ -264,6 +266,11 @@ impl AppFlowyWebMetrics { "Number of updates that failed to apply", metrics.apply_update_failure_count.clone(), ); + web_update_registry.register( + "apply_update_timeout_count", + "Number of updates that failed to apply within timeout", + metrics.apply_update_timeout_count.clone(), + ); metrics } @@ -278,4 +285,8 @@ impl AppFlowyWebMetrics { pub fn incr_apply_update_failure_count(&self, count: i64) { self.apply_update_failure_count.inc_by(count); } + + pub fn incr_apply_update_timeout_count(&self, count: i64) { + self.apply_update_timeout_count.inc_by(count); + } } diff --git a/src/api/util.rs b/src/api/util.rs index 240bf21ca..3d48b1590 100644 --- a/src/api/util.rs +++ b/src/api/util.rs @@ -7,10 +7,13 @@ use actix_web::HttpRequest; use appflowy_ai_client::dto::AIModel; use async_trait::async_trait; use byteorder::{ByteOrder, LittleEndian}; +use chrono::Utc; +use collab_rt_entity::user::RealtimeUser; use collab_rt_protocol::spawn_blocking_validate_encode_collab; use database_entity::dto::CollabParams; use std::str::FromStr; use tokio_stream::StreamExt; +use uuid::Uuid; #[inline] pub fn compress_type_from_header_value(headers: &HeaderMap) -> Result { @@ -86,6 +89,28 @@ pub fn device_id_from_headers(headers: &HeaderMap) -> Result<&str, AppError> { ) } +/// Create new realtime user for requests from appflowy web +pub fn realtime_user_for_web_request( + headers: &HeaderMap, + uid: i64, +) -> Result { + let app_version = client_version_from_headers(headers) + .map(|s| s.to_string()) + .unwrap_or_else(|_| "web".to_string()); + let device_id = device_id_from_headers(headers) + .map(|s| s.to_string()) + .unwrap_or_else(|_| Uuid::new_v4().to_string()); + let session_id = device_id.clone(); + let user = RealtimeUser { + uid, + device_id, + connect_at: Utc::now().timestamp(), + session_id, + app_version, + }; + Ok(user) +} + #[async_trait] pub trait CollabValidator { async fn check_encode_collab(&self) -> Result<(), AppError>; diff --git a/src/api/workspace.rs b/src/api/workspace.rs index 3fe3c5ce8..ba95eb618 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -1,4 +1,4 @@ -use crate::api::util::{client_version_from_headers, PayloadReader}; +use crate::api::util::{client_version_from_headers, realtime_user_for_web_request, PayloadReader}; use crate::api::util::{compress_type_from_header_value, device_id_from_headers, CollabValidator}; use crate::api::ws::RealtimeServerAddr; use crate::biz; @@ -12,9 +12,9 @@ use crate::biz::workspace::ops::{ get_reactions_on_published_view, remove_comment_on_published_view, remove_reaction_on_comment, }; use crate::biz::workspace::page_view::{ - create_page, create_space, get_page_view_collab, move_page, move_page_to_trash, - restore_all_pages_from_trash, restore_page_from_trash, update_page, update_page_collab_data, - update_space, + create_page, create_space, delete_all_pages_from_trash, delete_trash, get_page_view_collab, + move_page, move_page_to_trash, restore_all_pages_from_trash, restore_page_from_trash, + update_page, update_page_collab_data, update_space, }; use crate::biz::workspace::publish::get_workspace_default_publish_view_info_meta; use crate::domain::compression::{ @@ -172,6 +172,10 @@ pub fn workspace_scope() -> Scope { web::resource("/{workspace_id}/restore-all-pages-from-trash") .route(web::post().to(restore_all_pages_from_trash_handler)), ) + .service( + web::resource("/{workspace_id}/delete-all-pages-from-trash") + .route(web::post().to(delete_all_pages_from_trash_handler)), + ) .service( web::resource("/{workspace_id}/batch/collab") .route(web::post().to(batch_create_collab_handler)), @@ -254,6 +258,10 @@ pub fn workspace_scope() -> Scope { web::resource("/{workspace_id}/favorite").route(web::get().to(get_favorite_views_handler)), ) .service(web::resource("/{workspace_id}/trash").route(web::get().to(get_trash_views_handler))) + .service( + web::resource("/{workspace_id}/trash/{view_id}") + .route(web::delete().to(delete_page_from_trash_handler)), + ) .service( web::resource("/published-outline/{publish_namespace}") .route(web::get().to(get_workspace_publish_outline_handler)), @@ -910,32 +918,27 @@ async fn post_web_update_handler( server: Data, req: HttpRequest, ) -> Result>> { - let payload = payload.into_inner(); - let app_version = client_version_from_headers(req.headers()) - .map(|s| s.to_string()) - .unwrap_or_else(|_| "web".to_string()); - let device_id = device_id_from_headers(req.headers()) - .map(|s| s.to_string()) - .unwrap_or_else(|_| Uuid::new_v4().to_string()); - let session_id = device_id.clone(); - - let (workspace_id, object_id) = path.into_inner(); - let collab_type = payload.collab_type.clone(); let uid = state .user_cache .get_user_uid(&user_uuid) .await .map_err(AppResponseError::from)?; - - let user = RealtimeUser { - uid, - device_id, - connect_at: timestamp(), - session_id, - app_version, - }; + let (workspace_id, object_id) = path.into_inner(); + state + .collab_access_control + .enforce_action( + &workspace_id.to_string(), + &uid, + &object_id.to_string(), + Action::Write, + ) + .await?; + let user = realtime_user_for_web_request(req.headers(), uid)?; trace!("create onetime web realtime user: {}", user); + let payload = payload.into_inner(); + let collab_type = payload.collab_type.clone(); + update_page_collab_data( &state.metrics.appflowy_web_metrics, server, @@ -1089,6 +1092,65 @@ async fn restore_all_pages_from_trash_handler( Ok(Json(AppResponse::Ok())) } +async fn delete_page_from_trash_handler( + user_uuid: UserUuid, + path: web::Path<(Uuid, String)>, + state: Data, + server: Data, + req: HttpRequest, +) -> Result>> { + let uid = state + .user_cache + .get_user_uid(&user_uuid) + .await + .map_err(AppResponseError::from)?; + let (workspace_id, view_id) = path.into_inner(); + state + .workspace_access_control + .enforce_action(&uid, &workspace_id.to_string(), Action::Write) + .await?; + let user = realtime_user_for_web_request(req.headers(), uid)?; + delete_trash( + &state.metrics.appflowy_web_metrics, + server, + user, + &state.collab_access_control_storage, + workspace_id, + &view_id, + ) + .await?; + Ok(Json(AppResponse::Ok())) +} + +async fn delete_all_pages_from_trash_handler( + user_uuid: UserUuid, + path: web::Path, + state: Data, + server: Data, + req: HttpRequest, +) -> Result>> { + let uid = state + .user_cache + .get_user_uid(&user_uuid) + .await + .map_err(AppResponseError::from)?; + let workspace_id = path.into_inner(); + state + .workspace_access_control + .enforce_action(&uid, &workspace_id.to_string(), Action::Write) + .await?; + let user = realtime_user_for_web_request(req.headers(), uid)?; + delete_all_pages_from_trash( + &state.metrics.appflowy_web_metrics, + server, + user, + &state.collab_access_control_storage, + workspace_id, + ) + .await?; + Ok(Json(AppResponse::Ok())) +} + async fn update_page_view_handler( user_uuid: UserUuid, path: web::Path<(Uuid, String)>, diff --git a/src/biz/workspace/page_view.rs b/src/biz/workspace/page_view.rs index 33a418f94..05f56c7f5 100644 --- a/src/biz/workspace/page_view.rs +++ b/src/biz/workspace/page_view.rs @@ -50,7 +50,8 @@ use shared_entity::dto::workspace_dto::{ use sqlx::{PgPool, Transaction}; use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; +use tokio::time::timeout_at; use tracing::instrument; use uuid::Uuid; @@ -627,6 +628,43 @@ async fn move_all_views_out_from_trash(folder: &mut Folder) -> Result Result, AppError> { + let encoded_update = { + let mut txn = folder.collab.transact_mut(); + folder + .body + .views + .update_view(&mut txn, view_id, |update| update.set_trash(false).done()); + folder.body.views.delete_views(&mut txn, vec![view_id]); + txn.encode_update_v1() + }; + + Ok(encoded_update) +} + +async fn delete_all_views_from_trash(folder: &mut Folder) -> Result, AppError> { + let all_trash_ids: Vec = folder + .get_all_trash_sections() + .iter() + .map(|s| s.id.clone()) + .collect(); + + let encoded_update = { + let mut txn = folder.collab.transact_mut(); + if let Some(op) = folder + .body + .section + .section_op(&txn, collab_folder::Section::Trash) + { + op.clear(&mut txn); + }; + folder.body.views.delete_views(&mut txn, all_trash_ids); + txn.encode_update_v1() + }; + + Ok(encoded_update) +} + fn folder_to_encoded_collab(folder: &Folder) -> Result, AppError> { let collab_type = CollabType::Folder; let encoded_folder_collab = folder @@ -1031,6 +1069,39 @@ pub async fn restore_all_pages_from_trash( Ok(()) } +pub async fn delete_trash( + appflowy_web_metrics: &AppFlowyWebMetrics, + server: Data, + user: RealtimeUser, + collab_storage: &CollabAccessControlStorage, + workspace_id: Uuid, + view_id: &str, +) -> Result<(), AppError> { + let uid = user.uid; + let collab_origin = GetCollabOrigin::User { uid }; + let mut folder = + get_latest_collab_folder(collab_storage, collab_origin, &workspace_id.to_string()).await?; + let update = delete_view_from_trash(view_id, &mut folder).await?; + update_workspace_folder_data(appflowy_web_metrics, server, user, workspace_id, update).await?; + Ok(()) +} + +pub async fn delete_all_pages_from_trash( + appflowy_web_metrics: &AppFlowyWebMetrics, + server: Data, + user: RealtimeUser, + collab_storage: &CollabAccessControlStorage, + workspace_id: Uuid, +) -> Result<(), AppError> { + let uid = user.uid; + let collab_origin = GetCollabOrigin::User { uid }; + let mut folder = + get_latest_collab_folder(collab_storage, collab_origin, &workspace_id.to_string()).await?; + let update = delete_all_views_from_trash(&mut folder).await?; + update_workspace_folder_data(appflowy_web_metrics, server, user, workspace_id, update).await?; + Ok(()) +} + #[allow(clippy::too_many_arguments)] pub async fn update_page( pg_pool: &PgPool, @@ -1273,7 +1344,7 @@ async fn get_page_collab_data_for_document( #[instrument(level = "debug", skip_all)] pub async fn update_page_collab_data( - appflowy_web_metrics: &Arc, + appflowy_web_metrics: &AppFlowyWebMetrics, server: Data, user: RealtimeUser, workspace_id: Uuid, @@ -1300,3 +1371,54 @@ pub async fn update_page_collab_data( Ok(()) } + +#[instrument(level = "debug", skip_all)] +pub async fn update_workspace_folder_data( + appflowy_web_metrics: &AppFlowyWebMetrics, + server: Data, + user: RealtimeUser, + workspace_id: Uuid, + update: Vec, +) -> Result<(), AppError> { + appflowy_web_metrics.record_update_size_bytes(update.len()); + + let (tx, rx) = tokio::sync::oneshot::channel(); + let message = ClientHttpUpdateMessage { + user, + workspace_id: workspace_id.to_string(), + object_id: workspace_id.to_string(), + collab_type: CollabType::Folder, + update: Bytes::from(update), + state_vector: None, + return_tx: Some(tx), + }; + + server + .try_send(message) + .map_err(|err| AppError::Internal(anyhow!("Failed to send message to server: {}", err)))?; + + let resp = timeout_at( + tokio::time::Instant::now() + Duration::from_millis(2000), + rx, + ) + .await + .map_err(|err| { + appflowy_web_metrics.incr_apply_update_timeout_count(1); + AppError::Internal(anyhow!( + "Failed to receive apply update within timeout: {}", + err + )) + })? + .map_err(|err| AppError::Internal(anyhow!("Unable to receive folder update reply: {}", err)))?; + + match resp { + Ok(_) => Ok(()), + Err(err) => { + appflowy_web_metrics.incr_apply_update_failure_count(1); + Err(AppError::Internal(anyhow!( + "Failed to apply folder update: {}", + err + ))) + }, + } +} diff --git a/tests/workspace/page_view.rs b/tests/workspace/page_view.rs index b752b86a7..a433dd251 100644 --- a/tests/workspace/page_view.rs +++ b/tests/workspace/page_view.rs @@ -262,7 +262,7 @@ async fn move_page_to_another_space() { } #[tokio::test] -async fn move_page_to_trash() { +async fn move_page_to_trash_then_restore() { let registered_user = generate_unique_registered_user().await; let mut app_client = TestClient::user_with_new_device(registered_user.clone()).await; let web_client = TestClient::user_with_new_device(registered_user.clone()).await; @@ -359,7 +359,7 @@ async fn move_page_to_trash() { } #[tokio::test] -async fn move_page_with_child_to_trash() { +async fn move_page_with_child_to_trash_then_restore() { let registered_user = generate_unique_registered_user().await; let mut app_client = TestClient::user_with_new_device(registered_user.clone()).await; let web_client = TestClient::user_with_new_device(registered_user.clone()).await; @@ -432,6 +432,153 @@ async fn move_page_with_child_to_trash() { assert!(!view_found); } +#[tokio::test] +async fn move_page_with_child_to_trash_then_delete_permanently() { + let registered_user = generate_unique_registered_user().await; + let mut app_client = TestClient::user_with_new_device(registered_user.clone()).await; + let web_client = TestClient::user_with_new_device(registered_user.clone()).await; + let workspace_id = app_client.workspace_id().await; + let folder_view = web_client + .api_client + .get_workspace_folder(&workspace_id, Some(2), None) + .await + .unwrap(); + let general_space = &folder_view + .children + .into_iter() + .find(|v| v.name == "General") + .unwrap(); + app_client.open_workspace_collab(&workspace_id).await; + app_client + .wait_object_sync_complete(&workspace_id) + .await + .unwrap(); + app_client + .api_client + .move_workspace_page_view_to_trash( + Uuid::parse_str(&workspace_id).unwrap(), + &general_space.view_id, + ) + .await + .unwrap(); + let folder = get_latest_folder(&app_client, &workspace_id).await; + let views_in_trash_for_app = folder + .get_my_trash_sections() + .iter() + .map(|v| v.id.clone()) + .collect::>(); + assert!(views_in_trash_for_app.contains(&general_space.view_id)); + for view in general_space.children.iter() { + assert!(!views_in_trash_for_app.contains(&view.view_id)); + } + let views_in_trash_for_web = web_client + .api_client + .get_workspace_trash(&workspace_id) + .await + .unwrap() + .views + .iter() + .map(|v| v.view.view_id.clone()) + .collect::>(); + assert!(views_in_trash_for_web.contains(&general_space.view_id)); + + web_client + .api_client + .delete_workspace_page_view_from_trash( + Uuid::parse_str(&workspace_id).unwrap(), + &general_space.view_id, + ) + .await + .unwrap(); + let folder = get_latest_folder(&app_client, &workspace_id).await; + assert!(folder.get_view(&general_space.view_id).is_none()); + assert!(!folder + .get_my_trash_sections() + .iter() + .any(|v| v.id == general_space.view_id)); + let view_found = web_client + .api_client + .get_workspace_trash(&workspace_id) + .await + .unwrap() + .views + .iter() + .any(|v| v.view.view_id == general_space.view_id); + assert!(!view_found); +} + +#[tokio::test] +async fn move_page_with_child_to_trash_then_delete_all_permanently() { + let registered_user = generate_unique_registered_user().await; + let mut app_client = TestClient::user_with_new_device(registered_user.clone()).await; + let web_client = TestClient::user_with_new_device(registered_user.clone()).await; + let workspace_id = app_client.workspace_id().await; + let folder_view = web_client + .api_client + .get_workspace_folder(&workspace_id, Some(2), None) + .await + .unwrap(); + let general_space = &folder_view + .children + .into_iter() + .find(|v| v.name == "General") + .unwrap(); + app_client.open_workspace_collab(&workspace_id).await; + app_client + .wait_object_sync_complete(&workspace_id) + .await + .unwrap(); + app_client + .api_client + .move_workspace_page_view_to_trash( + Uuid::parse_str(&workspace_id).unwrap(), + &general_space.view_id, + ) + .await + .unwrap(); + let folder = get_latest_folder(&app_client, &workspace_id).await; + let views_in_trash_for_app = folder + .get_my_trash_sections() + .iter() + .map(|v| v.id.clone()) + .collect::>(); + assert!(views_in_trash_for_app.contains(&general_space.view_id)); + for view in general_space.children.iter() { + assert!(!views_in_trash_for_app.contains(&view.view_id)); + } + let views_in_trash_for_web = web_client + .api_client + .get_workspace_trash(&workspace_id) + .await + .unwrap() + .views + .iter() + .map(|v| v.view.view_id.clone()) + .collect::>(); + assert!(views_in_trash_for_web.contains(&general_space.view_id)); + + web_client + .api_client + .delete_all_workspace_page_views_from_trash(Uuid::parse_str(&workspace_id).unwrap()) + .await + .unwrap(); + let folder = get_latest_folder(&app_client, &workspace_id).await; + assert!(folder.get_view(&general_space.view_id).is_none()); + assert!(!folder + .get_my_trash_sections() + .iter() + .any(|v| v.id == general_space.view_id)); + let view_found = web_client + .api_client + .get_workspace_trash(&workspace_id) + .await + .unwrap() + .views + .iter() + .any(|v| v.view.view_id == general_space.view_id); + assert!(!view_found); +} + #[tokio::test] async fn update_page() { let registered_user = generate_unique_registered_user().await;