diff --git a/Cargo.lock b/Cargo.lock index 36edd8ec3..489dc363d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -821,13 +821,13 @@ dependencies = [ "mailer", "md5", "mime_guess", + "prometheus-client", "redis 0.25.4", "secrecy", "serde", "serde_json", "serde_repr", "sqlx", - "tempdir", "thiserror", "tokio", "tokio-stream", @@ -3374,12 +3374,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "fuchsia-cprng" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" - [[package]] name = "funty" version = "2.0.0" @@ -5649,19 +5643,6 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" -[[package]] -name = "rand" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" -dependencies = [ - "fuchsia-cprng", - "libc", - "rand_core 0.3.1", - "rdrand", - "winapi", -] - [[package]] name = "rand" version = "0.7.3" @@ -5706,21 +5687,6 @@ dependencies = [ "rand_core 0.6.4", ] -[[package]] -name = "rand_core" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" -dependencies = [ - "rand_core 0.4.2", -] - -[[package]] -name = "rand_core" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" - [[package]] name = "rand_core" version = "0.5.1" @@ -5790,15 +5756,6 @@ dependencies = [ "yasna", ] -[[package]] -name = "rdrand" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" -dependencies = [ - "rand_core 0.3.1", -] - [[package]] name = "redis" version = "0.23.3" @@ -5926,15 +5883,6 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" -[[package]] -name = "remove_dir_all" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" -dependencies = [ - "winapi", -] - [[package]] name = "rend" version = "0.4.2" @@ -7268,16 +7216,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" -[[package]] -name = "tempdir" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" -dependencies = [ - "rand 0.4.6", - "remove_dir_all", -] - [[package]] name = "tempfile" version = "3.10.1" diff --git a/services/appflowy-worker/Cargo.toml b/services/appflowy-worker/Cargo.toml index 048d3fd1a..cf87f8371 100644 --- a/services/appflowy-worker/Cargo.toml +++ b/services/appflowy-worker/Cargo.toml @@ -48,6 +48,5 @@ uuid.workspace = true mailer.workspace = true md5.workspace = true base64.workspace = true -tempdir = "0.3.7" - +prometheus-client = "0.22.3" diff --git a/services/appflowy-worker/src/application.rs b/services/appflowy-worker/src/application.rs index 1f92e0d5a..9bd4f3015 100644 --- a/services/appflowy-worker/src/application.rs +++ b/services/appflowy-worker/src/application.rs @@ -14,6 +14,11 @@ use axum::Router; use secrecy::ExposeSecret; use crate::mailer::AFWorkerMailer; +use crate::metric::ImportMetrics; +use axum::extract::State; +use axum::http::StatusCode; +use axum::response::IntoResponse; +use axum::routing::get; use mailer::sender::Mailer; use std::sync::{Arc, Once}; use std::time::Duration; @@ -85,12 +90,14 @@ pub async fn create_app(listener: TcpListener, config: Config) -> Result<(), Err let mailer = get_worker_mailer(&config).await?; let s3_client = get_aws_s3_client(&config.s3_setting).await?; + let metrics = AppMetrics::new(); let state = AppState { redis_client, pg_pool, s3_client, mailer: mailer.clone(), + metrics, }; let local_set = LocalSet::new(); @@ -98,13 +105,16 @@ pub async fn create_app(listener: TcpListener, config: Config) -> Result<(), Err let import_worker_fut = local_set.run_until(run_import_worker( state.pg_pool.clone(), state.redis_client.clone(), + Some(state.metrics.import_metrics.clone()), Arc::new(state.s3_client.clone()), Arc::new(email_notifier), "import_task_stream", 10, )); - let app = Router::new().with_state(state); + let app = Router::new() + .route("/metrics", get(metrics_handler)) + .with_state(Arc::new(state)); tokio::select! { _ = import_worker_fut => { @@ -124,6 +134,7 @@ pub struct AppState { pub pg_pool: PgPool, pub s3_client: S3ClientImpl, pub mailer: AFWorkerMailer, + pub metrics: AppMetrics, } async fn get_worker_mailer(config: &Config) -> Result { @@ -180,3 +191,34 @@ pub async fn get_aws_s3_client(s3_setting: &S3Setting) -> Result, + import_metrics: Arc, +} + +impl AppMetrics { + pub fn new() -> Self { + let mut registry = prometheus_client::registry::Registry::default(); + let import_metrics = Arc::new(ImportMetrics::register(&mut registry)); + Self { + registry: Arc::new(registry), + import_metrics, + } + } +} + +async fn metrics_handler(State(state): State>) -> impl IntoResponse { + let mut buffer = String::new(); + if let Err(err) = prometheus_client::encoding::text::encode(&mut buffer, &state.metrics.registry) + { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to encode metrics: {:?}", err), + ) + .into_response(); + } + (StatusCode::OK, buffer).into_response() +} diff --git a/services/appflowy-worker/src/import_worker/worker.rs b/services/appflowy-worker/src/import_worker/worker.rs index b18344261..e9eaeb330 100644 --- a/services/appflowy-worker/src/import_worker/worker.rs +++ b/services/appflowy-worker/src/import_worker/worker.rs @@ -26,8 +26,8 @@ use database::workspace::{ }; use database_entity::dto::CollabParams; +use crate::metric::ImportMetrics; use async_zip::base::read::stream::{Ready, ZipFileReader}; - use collab_importer::zip_tool::async_zip::async_unzip; use collab_importer::zip_tool::sync_zip::sync_unzip; use futures::stream::FuturesUnordered; @@ -67,6 +67,7 @@ const CONSUMER_NAME: &str = "appflowy_worker"; pub async fn run_import_worker( pg_pool: PgPool, mut redis_client: ConnectionManager, + metrics: Option>, s3_client: Arc, notifier: Arc, stream_name: &str, @@ -90,6 +91,7 @@ pub async fn run_import_worker( GROUP_NAME, CONSUMER_NAME, notifier.clone(), + &metrics, ) .await; @@ -103,6 +105,7 @@ pub async fn run_import_worker( CONSUMER_NAME, notifier.clone(), tick_interval_secs, + &metrics, ) .await?; @@ -119,6 +122,7 @@ async fn process_un_acked_tasks( group_name: &str, consumer_name: &str, notifier: Arc, + metrics: &Option>, ) { // when server restarts, we need to check if there are any unacknowledged tasks match get_un_ack_tasks(stream_name, group_name, consumer_name, redis_client).await { @@ -136,6 +140,7 @@ async fn process_un_acked_tasks( s3_client, pg_pool, notifier.clone(), + metrics, ) .await; } @@ -155,6 +160,7 @@ async fn process_upcoming_tasks( consumer_name: &str, notifier: Arc, interval_secs: u64, + metrics: &Option>, ) -> Result<(), ImportError> { let options = StreamReadOptions::default() .group(group_name, consumer_name) @@ -189,6 +195,7 @@ async fn process_upcoming_tasks( let stream_name = stream_name.to_string(); let group_name = group_name.to_string(); let storage_dir = storage_dir.to_path_buf(); + let metrics = metrics.clone(); task_handlers.push(spawn_local(async move { consume_task( &storage_dir, @@ -200,6 +207,7 @@ async fn process_upcoming_tasks( &cloned_s3_client, &pg_pool, notifier, + &metrics, ) .await?; Ok::<(), ImportError>(()) @@ -233,6 +241,7 @@ async fn consume_task( s3_client: &Arc, pg_pool: &Pool, notifier: Arc, + metrics: &Option>, ) -> Result<(), ImportError> { let result = process_task( storage_dir, @@ -241,6 +250,7 @@ async fn consume_task( redis_client, pg_pool, notifier, + metrics, ) .await; @@ -263,6 +273,7 @@ async fn process_task( redis_client: &mut ConnectionManager, pg_pool: &PgPool, notifier: Arc, + metrics: &Option>, ) -> Result<(), ImportError> { let retry_interval: u64 = get_env_var("APPFLOWY_WORKER_IMPORT_NOTION_RETRY_INTERVAL", "10") .parse() @@ -287,6 +298,7 @@ async fn process_task( 3, Duration::from_secs(retry_interval), streaming, + metrics, ) .await; match unzip_result { @@ -305,7 +317,7 @@ async fn process_task( } clean_up(s3_client, &task).await; - notify_user(&task, result, notifier).await?; + notify_user(&task, result, notifier, metrics).await?; }, Err(err) => { // If there is any errors when download or unzip the file, we will remove the file from S3 and notify the user. @@ -314,7 +326,7 @@ async fn process_task( } remove_workspace(&task.workspace_id, pg_pool).await; clean_up(s3_client, &task).await; - notify_user(&task, Err(err), notifier).await?; + notify_user(&task, Err(err), notifier, metrics).await?; }, } @@ -347,14 +359,15 @@ pub async fn download_and_unzip_file_retry( max_retries: usize, interval: Duration, streaming: bool, + metrics: &Option>, ) -> Result { let mut attempt = 0; loop { attempt += 1; - match download_and_unzip_file(storage_dir, import_task, s3_client, streaming).await { + match download_and_unzip_file(storage_dir, import_task, s3_client, streaming, metrics).await { Ok(result) => return Ok(result), Err(err) if attempt < max_retries && !err.is_file_not_found() => { - error!( + warn!( "{} attempt {} failed: {}. Retrying in {:?}...", import_task.workspace_id, attempt, err, interval ); @@ -380,6 +393,7 @@ async fn download_and_unzip_file( import_task: &NotionImportTask, s3_client: &Arc, streaming: bool, + metrics: &Option>, ) -> Result { let S3StreamResponse { stream, @@ -389,7 +403,11 @@ async fn download_and_unzip_file( .get_blob_stream(import_task.s3_key.as_str()) .await .map_err(|err| ImportError::Internal(err.into()))?; + let buffer_size = buffer_size_from_content_length(content_length); + if let Some(metrics) = metrics { + metrics.record_import_size_bytes(buffer_size); + } if streaming { let zip_reader = get_zip_reader(buffer_size, StreamOrFile::Stream(stream)).await?; let unique_file_name = Uuid::new_v4().to_string(); @@ -832,11 +850,15 @@ async fn notify_user( import_task: &NotionImportTask, result: Result<(), ImportError>, notifier: Arc, + metrics: &Option>, ) -> Result<(), ImportError> { let task_id = import_task.task_id.to_string(); let (error, error_detail) = match result { Ok(_) => { info!("[Import]: successfully imported:{}", import_task); + if let Some(metrics) = metrics { + metrics.incr_import_success_count(1); + } (None, None) }, Err(err) => { @@ -844,6 +866,9 @@ async fn notify_user( "[Import]: failed to import:{}: error:{:?}", import_task, err ); + if let Some(metrics) = metrics { + metrics.incr_import_fail_count(1); + } let (error, error_detail) = err.report(&task_id); (Some(error), Some(error_detail)) }, diff --git a/services/appflowy-worker/src/lib.rs b/services/appflowy-worker/src/lib.rs index 9c0baa675..b3f384e56 100644 --- a/services/appflowy-worker/src/lib.rs +++ b/services/appflowy-worker/src/lib.rs @@ -1,4 +1,5 @@ pub mod error; pub mod import_worker; mod mailer; +pub mod metric; pub mod s3_client; diff --git a/services/appflowy-worker/src/main.rs b/services/appflowy-worker/src/main.rs index 4505efa75..157b5ab7e 100644 --- a/services/appflowy-worker/src/main.rs +++ b/services/appflowy-worker/src/main.rs @@ -4,6 +4,8 @@ pub mod error; pub mod import_worker; pub(crate) mod s3_client; +mod metric; + mod mailer; use crate::application::run_server; use crate::config::Config; diff --git a/services/appflowy-worker/src/metric.rs b/services/appflowy-worker/src/metric.rs new file mode 100644 index 000000000..1786bed16 --- /dev/null +++ b/services/appflowy-worker/src/metric.rs @@ -0,0 +1,53 @@ +use prometheus_client::metrics::gauge::Gauge; +use prometheus_client::metrics::histogram::{exponential_buckets, Histogram}; +use prometheus_client::registry::Registry; + +pub struct ImportMetrics { + pub update_size_bytes: Histogram, + pub import_success_count: Gauge, + pub import_fail_count: Gauge, +} + +impl ImportMetrics { + pub fn init() -> Self { + let update_size_buckets = exponential_buckets(1024.0, 2.0, 10); + Self { + update_size_bytes: Histogram::new(update_size_buckets), + import_success_count: Default::default(), + import_fail_count: Default::default(), + } + } + + pub fn register(registry: &mut Registry) -> Self { + let metrics = Self::init(); + let web_update_registry = registry.sub_registry_with_prefix("appflowy_web"); + web_update_registry.register( + "import_payload_size_bytes", + "Size of the update in bytes", + metrics.update_size_bytes.clone(), + ); + web_update_registry.register( + "import_success_count", + "import success count", + metrics.import_success_count.clone(), + ); + web_update_registry.register( + "import_fail_count", + "import fail count", + metrics.import_fail_count.clone(), + ); + metrics + } + + pub fn record_import_size_bytes(&self, size: usize) { + self.update_size_bytes.observe(size as f64); + } + + pub fn incr_import_success_count(&self, count: i64) { + self.import_success_count.inc_by(count); + } + + pub fn incr_import_fail_count(&self, count: i64) { + self.import_fail_count.inc_by(count); + } +} diff --git a/services/appflowy-worker/tests/import_test.rs b/services/appflowy-worker/tests/import_test.rs index f1296845a..729a457a9 100644 --- a/services/appflowy-worker/tests/import_test.rs +++ b/services/appflowy-worker/tests/import_test.rs @@ -143,6 +143,7 @@ fn run_importer_worker( let import_worker_fut = local_set.run_until(run_import_worker( pg_pool, redis_client, + None, Arc::new(MockS3Client), notifier, &stream_name,