Skip to content

Commit

Permalink
feat: Import metrics (#911)
Browse files Browse the repository at this point in the history
* chore: implement metrics for import

* chore: add metrics
  • Loading branch information
appflowy authored Oct 20, 2024
1 parent b1f37db commit 57c4481
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 71 deletions.
64 changes: 1 addition & 63 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions services/appflowy-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,5 @@ uuid.workspace = true
mailer.workspace = true
md5.workspace = true
base64.workspace = true
tempdir = "0.3.7"

prometheus-client = "0.22.3"

44 changes: 43 additions & 1 deletion services/appflowy-worker/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ use axum::Router;
use secrecy::ExposeSecret;

use crate::mailer::AFWorkerMailer;
use crate::metric::ImportMetrics;
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::get;
use mailer::sender::Mailer;
use std::sync::{Arc, Once};
use std::time::Duration;
Expand Down Expand Up @@ -85,26 +90,31 @@ pub async fn create_app(listener: TcpListener, config: Config) -> Result<(), Err

let mailer = get_worker_mailer(&config).await?;
let s3_client = get_aws_s3_client(&config.s3_setting).await?;
let metrics = AppMetrics::new();

let state = AppState {
redis_client,
pg_pool,
s3_client,
mailer: mailer.clone(),
metrics,
};

let local_set = LocalSet::new();
let email_notifier = EmailNotifier::new(mailer);
let import_worker_fut = local_set.run_until(run_import_worker(
state.pg_pool.clone(),
state.redis_client.clone(),
Some(state.metrics.import_metrics.clone()),
Arc::new(state.s3_client.clone()),
Arc::new(email_notifier),
"import_task_stream",
10,
));

let app = Router::new().with_state(state);
let app = Router::new()
.route("/metrics", get(metrics_handler))
.with_state(Arc::new(state));

tokio::select! {
_ = import_worker_fut => {
Expand All @@ -124,6 +134,7 @@ pub struct AppState {
pub pg_pool: PgPool,
pub s3_client: S3ClientImpl,
pub mailer: AFWorkerMailer,
pub metrics: AppMetrics,
}

async fn get_worker_mailer(config: &Config) -> Result<AFWorkerMailer, Error> {
Expand Down Expand Up @@ -180,3 +191,34 @@ pub async fn get_aws_s3_client(s3_setting: &S3Setting) -> Result<S3ClientImpl, E
bucket: s3_setting.bucket.clone(),
})
}

#[derive(Clone)]
pub struct AppMetrics {
#[allow(dead_code)]
registry: Arc<prometheus_client::registry::Registry>,
import_metrics: Arc<ImportMetrics>,
}

impl AppMetrics {
pub fn new() -> Self {
let mut registry = prometheus_client::registry::Registry::default();
let import_metrics = Arc::new(ImportMetrics::register(&mut registry));
Self {
registry: Arc::new(registry),
import_metrics,
}
}
}

async fn metrics_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let mut buffer = String::new();
if let Err(err) = prometheus_client::encoding::text::encode(&mut buffer, &state.metrics.registry)
{
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to encode metrics: {:?}", err),
)
.into_response();
}
(StatusCode::OK, buffer).into_response()
}
Loading

0 comments on commit 57c4481

Please sign in to comment.