Skip to content

Commit

Permalink
feat(events): add events for incoming API requests (#2621)
Browse files Browse the repository at this point in the history
Co-authored-by: Nishant Joshi <[email protected]>
  • Loading branch information
lsampras and NishantJoshi00 authored Oct 18, 2023
1 parent da77d13 commit 7a76d6c
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 23 deletions.
3 changes: 2 additions & 1 deletion crates/router/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use serde::Serialize;

pub mod api_logs;
pub mod event_logger;

pub trait EventHandler: Sync + Send + dyn_clone::DynClone {
fn log_event<T: Event>(&self, event: T, previous: Option<T>);
fn log_event<T: Event>(&self, event: T);
}

#[derive(Debug, Serialize)]
Expand Down
41 changes: 41 additions & 0 deletions crates/router/src/events/api_logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use router_env::{tracing_actix_web::RequestId, types::FlowMetric};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;

use super::Event;

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct ApiEvent {
api_flow: String,
created_at_timestamp: i128,
request_id: String,
latency: u128,
status_code: i64,
}

impl ApiEvent {
pub fn new(
api_flow: &impl FlowMetric,
request_id: &RequestId,
latency: u128,
status_code: i64,
) -> Self {
Self {
api_flow: api_flow.to_string(),
created_at_timestamp: OffsetDateTime::now_utc().unix_timestamp_nanos(),
request_id: request_id.as_hyphenated().to_string(),
latency,
status_code,
}
}
}

impl Event for ApiEvent {
fn event_type() -> super::EventType {
super::EventType::ApiLogs
}

fn key(&self) -> String {
self.request_id.to_string()
}
}
8 changes: 2 additions & 6 deletions crates/router/src/events/event_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@ use crate::services::logger;
pub struct EventLogger {}

impl EventHandler for EventLogger {
fn log_event<T: Event>(&self, event: T, previous: Option<T>) {
if let Some(prev) = previous {
logger::info!(previous = ?serde_json::to_string(&prev).unwrap_or(r#"{ "error": "Serialization failed" }"#.to_string()), current = ?serde_json::to_string(&event).unwrap_or(r#"{ "error": "Serialization failed" }"#.to_string()), event_type =? T::event_type(), event_id =? event.key(), log_type = "event");
} else {
logger::info!(current = ?serde_json::to_string(&event).unwrap_or(r#"{ "error": "Serialization failed" }"#.to_string()), event_type =? T::event_type(), event_id =? event.key(), log_type = "event");
}
fn log_event<T: Event>(&self, event: T) {
logger::info!(current = ?serde_json::to_string(&event).unwrap_or(r#"{ "error": "Serialization failed" }"#.to_string()), event_type =? T::event_type(), event_id =? event.key(), log_type = "event");
}
}
5 changes: 3 additions & 2 deletions crates/router/src/routes/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use actix_web::{web, Scope};
use external_services::email::{AwsSes, EmailClient};
#[cfg(feature = "kms")]
use external_services::kms::{self, decrypt::KmsDecrypt};
use router_env::tracing_actix_web::RequestId;
use scheduler::SchedulerInterface;
use storage_impl::MockDb;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -58,7 +59,7 @@ pub trait AppStateInfo {
fn event_handler(&self) -> &Self::Event;
#[cfg(feature = "email")]
fn email_client(&self) -> Arc<dyn EmailClient>;
fn add_request_id(&mut self, request_id: Option<String>);
fn add_request_id(&mut self, request_id: RequestId);
fn add_merchant_id(&mut self, merchant_id: Option<String>);
fn add_flow_name(&mut self, flow_name: String);
fn get_request_id(&self) -> Option<String>;
Expand All @@ -79,7 +80,7 @@ impl AppStateInfo for AppState {
fn event_handler(&self) -> &Self::Event {
&self.event_handler
}
fn add_request_id(&mut self, request_id: Option<String>) {
fn add_request_id(&mut self, request_id: RequestId) {
self.api_client.add_request_id(request_id);
}
fn add_merchant_id(&mut self, merchant_id: Option<String>) {
Expand Down
31 changes: 22 additions & 9 deletions crates/router/src/services/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ use actix_web::{body, web, FromRequest, HttpRequest, HttpResponse, Responder, Re
use api_models::enums::CaptureMethod;
pub use client::{proxy_bypass_urls, ApiClient, MockApiClient, ProxyClient};
pub use common_utils::request::{ContentType, Method, Request, RequestBuilder};
use common_utils::{consts::X_HS_LATENCY, errors::ReportSwitchExt};
use common_utils::{
consts::X_HS_LATENCY,
errors::{ErrorSwitch, ReportSwitchExt},
};
use error_stack::{report, IntoReport, Report, ResultExt};
use masking::{ExposeOptionInterface, PeekInterface};
use router_env::{instrument, tracing, tracing_actix_web::RequestId, Tag};
Expand All @@ -30,6 +33,7 @@ use crate::{
errors::{self, CustomResult},
payments,
},
events::{api_logs::ApiEvent, EventHandler},
logger,
routes::{
app::AppStateInfo,
Expand Down Expand Up @@ -750,19 +754,20 @@ where
T: Debug,
A: AppStateInfo + Clone,
U: auth::AuthInfo,
CustomResult<ApplicationResponse<Q>, E>: ReportSwitchExt<ApplicationResponse<Q>, OErr>,
CustomResult<U, errors::ApiErrorResponse>: ReportSwitchExt<U, OErr>,
CustomResult<(), errors::ApiErrorResponse>: ReportSwitchExt<(), OErr>,
OErr: ResponseError + Sync + Send + 'static,
E: ErrorSwitch<OErr> + error_stack::Context,
OErr: ResponseError + error_stack::Context,
errors::ApiErrorResponse: ErrorSwitch<OErr>,
{
let request_id = RequestId::extract(request)
.await
.ok()
.map(|id| id.as_hyphenated().to_string());
.into_report()
.attach_printable("Unable to extract request id from request")
.change_context(errors::ApiErrorResponse::InternalServerError.switch())?;

let mut request_state = state.get_ref().clone();

request_state.add_request_id(request_id);
let start_instant = Instant::now();

let auth_out = api_auth
.authenticate_and_fetch(request.headers(), &request_state)
Expand Down Expand Up @@ -795,11 +800,20 @@ where
.switch()?;
res
};
let request_duration = Instant::now()
.saturating_duration_since(start_instant)
.as_millis();

let status_code = match output.as_ref() {
Ok(res) => metrics::request::track_response_status_code(res),
Err(err) => err.current_context().status_code().as_u16().into(),
};
state.event_handler().log_event(ApiEvent::new(
flow,
&request_id,
request_duration,
status_code,
));

metrics::request::status_code_metrics(status_code, flow.to_string(), merchant_id.to_string());

Expand Down Expand Up @@ -827,8 +841,7 @@ where
U: auth::AuthInfo,
A: AppStateInfo + Clone,
ApplicationResponse<Q>: Debug,
CustomResult<ApplicationResponse<Q>, E>:
ReportSwitchExt<ApplicationResponse<Q>, api_models::errors::types::ApiErrorResponse>,
E: ErrorSwitch<api_models::errors::types::ApiErrorResponse> + error_stack::Context,
{
let request_method = request.method().as_str();
let url_path = request.path();
Expand Down
12 changes: 7 additions & 5 deletions crates/router/src/services/api/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use http::{HeaderValue, Method};
use masking::PeekInterface;
use once_cell::sync::OnceCell;
use reqwest::multipart::Form;
use router_env::tracing_actix_web::RequestId;

use super::{request::Maskable, Request};
use crate::{
Expand Down Expand Up @@ -167,10 +168,10 @@ where
forward_to_kafka: bool,
) -> CustomResult<reqwest::Response, ApiClientError>;

fn add_request_id(&mut self, _request_id: Option<String>);
fn add_request_id(&mut self, request_id: RequestId);
fn get_request_id(&self) -> Option<String>;
fn add_merchant_id(&mut self, _merchant_id: Option<String>);
fn add_flow_name(&mut self, _flow_name: String);
fn add_flow_name(&mut self, flow_name: String);
}

dyn_clone::clone_trait_object!(ApiClient);
Expand Down Expand Up @@ -350,8 +351,9 @@ impl ApiClient for ProxyClient {
crate::services::send_request(state, request, option_timeout_secs).await
}

fn add_request_id(&mut self, _request_id: Option<String>) {
self.request_id = _request_id
fn add_request_id(&mut self, request_id: RequestId) {
self.request_id
.replace(request_id.as_hyphenated().to_string());
}

fn get_request_id(&self) -> Option<String> {
Expand Down Expand Up @@ -402,7 +404,7 @@ impl ApiClient for MockApiClient {
Err(ApiClientError::UnexpectedState.into())
}

fn add_request_id(&mut self, _request_id: Option<String>) {
fn add_request_id(&mut self, _request_id: RequestId) {
// [#2066]: Add Mock implementation for ApiClient
}

Expand Down

0 comments on commit 7a76d6c

Please sign in to comment.