From 2bf71198d9c0c90d601d98e3801c6f11d3acbc30 Mon Sep 17 00:00:00 2001 From: Sampras lopes Date: Sun, 3 Dec 2023 03:39:24 +0530 Subject: [PATCH] feat(events): add event generation for connector api calls --- config/config.example.toml | 18 +++++ config/docker_compose.toml | 2 +- crates/router/src/core/refunds.rs | 4 +- crates/router/src/events.rs | 2 + .../router/src/events/connector_api_logs.rs | 65 +++++++++++++++++++ crates/router/src/routes/app.rs | 5 +- crates/router/src/services/api.rs | 43 +++++++++++- crates/router/src/services/kafka.rs | 14 +++- 8 files changed, 148 insertions(+), 5 deletions(-) create mode 100644 crates/router/src/events/connector_api_logs.rs diff --git a/config/config.example.toml b/config/config.example.toml index d935a4e7f20d..14318e9200e7 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -464,6 +464,12 @@ sdk_url = "http://localhost:9090/dist/HyperLoader.js" [analytics] source = "sqlx" # The Analytics source/strategy to be used +[analytics.clickhouse] +username = "" # Clickhouse username +password = "" # Clickhouse password (optional) +host = "" # Clickhouse host in http(s)://: format +database_name = "" # Clickhouse database name + [analytics.sqlx] username = "db_user" # Analytics DB Username password = "db_pass" # Analytics DB Password @@ -472,8 +478,20 @@ port = 5432 # Analytics DB Port dbname = "hyperswitch_db" # Name of Database pool_size = 5 # Number of connections to keep open connection_timeout = 10 # Timeout for database connection in seconds +queue_strategy = "Fifo" # Add the queue strategy used by the database bb8 client # Config for KV setup [kv_config] # TTL for KV in seconds ttl = 900 + +[events] +source = "logs" # The event sink to push events supports kafka or logs (stdout) + +[events.kafka] +brokers = [] # Kafka broker urls for bootstrapping the client +intent_analytics_topic = "topic" # Kafka topic to be used for PaymentIntent events +attempt_analytics_topic = "topic" # Kafka topic to be used for PaymentAttempt events +refund_analytics_topic = "topic" # Kafka topic to be used for Refund events +api_logs_topic = "topic" # Kafka topic to be used for incoming api events +connector_logs_topic = "topic" # Kafka topic to be used for connector api events \ No newline at end of file diff --git a/config/docker_compose.toml b/config/docker_compose.toml index 4d50600e1bf8..6bc6fcd69ea6 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -339,7 +339,7 @@ intent_analytics_topic = "hyperswitch-payment-intent-events" attempt_analytics_topic = "hyperswitch-payment-attempt-events" refund_analytics_topic = "hyperswitch-refund-events" api_logs_topic = "hyperswitch-api-log-events" -connector_events_topic = "hyperswitch-connector-api-events" +connector_logs_topic = "hyperswitch-connector-api-events" [analytics] source = "sqlx" diff --git a/crates/router/src/core/refunds.rs b/crates/router/src/core/refunds.rs index c43c00b7259c..3d150e6eb4c8 100644 --- a/crates/router/src/core/refunds.rs +++ b/crates/router/src/core/refunds.rs @@ -929,7 +929,9 @@ pub async fn start_refund_workflow( refund_tracker: &storage::ProcessTracker, ) -> Result<(), errors::ProcessTrackerError> { match refund_tracker.name.as_deref() { - Some("EXECUTE_REFUND") => trigger_refund_execute_workflow(state, refund_tracker).await, + Some("EXECUTE_REFUND") => { + Box::pin(trigger_refund_execute_workflow(state, refund_tracker)).await + } Some("SYNC_REFUND") => { Box::pin(sync_refund_with_gateway_workflow(state, refund_tracker)).await } diff --git a/crates/router/src/events.rs b/crates/router/src/events.rs index 8f980fee504a..2dc9258e19df 100644 --- a/crates/router/src/events.rs +++ b/crates/router/src/events.rs @@ -6,6 +6,7 @@ use storage_impl::errors::ApplicationError; use crate::{db::KafkaProducer, services::kafka::KafkaSettings}; pub mod api_logs; +pub mod connector_api_logs; pub mod event_logger; pub mod kafka_handler; @@ -29,6 +30,7 @@ pub enum EventType { PaymentAttempt, Refund, ApiLogs, + ConnectorApiLogs, } #[derive(Debug, Default, Deserialize, Clone)] diff --git a/crates/router/src/events/connector_api_logs.rs b/crates/router/src/events/connector_api_logs.rs new file mode 100644 index 000000000000..38f972089ad9 --- /dev/null +++ b/crates/router/src/events/connector_api_logs.rs @@ -0,0 +1,65 @@ +use common_utils::request::Method; +use router_env::tracing_actix_web::RequestId; +use serde::Serialize; +use time::OffsetDateTime; + +use super::{EventType, RawEvent}; + +#[derive(Debug, Serialize)] +pub struct ConnectorEvent { + connector_name: String, + flow: String, + request: String, + response: Option, + url: String, + method: String, + payment_id: String, + merchant_id: String, + created_at: i128, + request_id: String, + latency: u128, +} + +impl ConnectorEvent { + #[allow(clippy::too_many_arguments)] + pub fn new( + connector_name: String, + flow: String, + request: serde_json::Value, + response: Option, + url: String, + method: Method, + payment_id: String, + merchant_id: String, + request_id: &Option, + latency: u128, + ) -> Self { + Self { + connector_name, + flow, + request: request.to_string(), + response, + url, + method: method.to_string(), + payment_id, + merchant_id, + created_at: OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000, + request_id: request_id + .map(|i| i.as_hyphenated().to_string()) + .unwrap_or("NO_REQUEST_ID".to_string()), + latency, + } + } +} + +impl TryFrom for RawEvent { + type Error = serde_json::Error; + + fn try_from(value: ConnectorEvent) -> Result { + Ok(Self { + event_type: EventType::ConnectorApiLogs, + key: value.request_id.clone(), + payload: serde_json::to_value(value)?, + }) + } +} diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index a145f3e7e5d7..8ba1c3b02c14 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -57,6 +57,7 @@ pub struct AppState { pub api_client: Box, #[cfg(feature = "olap")] pub pool: crate::analytics::AnalyticsProvider, + pub request_id: Option, } impl scheduler::SchedulerAppState for AppState { @@ -93,7 +94,8 @@ impl AppStateInfo for AppState { } fn add_request_id(&mut self, request_id: RequestId) { self.api_client.add_request_id(request_id); - self.store.add_request_id(request_id.to_string()) + self.store.add_request_id(request_id.to_string()); + self.request_id.replace(request_id); } fn add_merchant_id(&mut self, merchant_id: Option) { @@ -212,6 +214,7 @@ impl AppState { event_handler, #[cfg(feature = "olap")] pool, + request_id: None, } }) .await diff --git a/crates/router/src/services/api.rs b/crates/router/src/services/api.rs index 3f49e666c2e1..e3a3e82f72ef 100644 --- a/crates/router/src/services/api.rs +++ b/crates/router/src/services/api.rs @@ -35,7 +35,10 @@ use crate::{ errors::{self, CustomResult}, payments, }, - events::api_logs::{ApiEvent, ApiEventMetric, ApiEventsType}, + events::{ + api_logs::{ApiEvent, ApiEventMetric, ApiEventsType}, + connector_api_logs::ConnectorEvent, + }, logger, routes::{ app::AppStateInfo, @@ -361,10 +364,48 @@ where match connector_request { Some(request) => { logger::debug!(connector_request=?request); + + let masked_request_body = match &request.body { + Some(request) => match request { + RequestContent::Json(i) + | RequestContent::FormUrlEncoded(i) + | RequestContent::Xml(i) => i + .masked_serialize() + .unwrap_or(json!({ "error": "failed to mask serialize"})), + RequestContent::FormData(_) => json!({"request_type": "FORM_DATA"}), + }, + None => json!({"error": "EMPTY_REQUEST_BODY"}), + }; + let request_url = request.url.clone(); + let request_method = request.method; + let current_time = Instant::now(); let response = call_connector_api(state, request).await; let external_latency = current_time.elapsed().as_millis(); logger::debug!(connector_response=?response); + + let connector_event = ConnectorEvent::new( + req.connector.clone(), + std::any::type_name::().to_string(), + masked_request_body, + None, + request_url, + request_method, + req.payment_id.clone(), + req.merchant_id.clone(), + &state.request_id, + external_latency, + ); + + match connector_event.try_into() { + Ok(event) => { + state.event_handler().log_event(event); + } + Err(err) => { + logger::error!(error=?err, "Error Logging Connector Event"); + } + } + match response { Ok(body) => { let response = match body { diff --git a/crates/router/src/services/kafka.rs b/crates/router/src/services/kafka.rs index 497ac16721b5..4c65b4677872 100644 --- a/crates/router/src/services/kafka.rs +++ b/crates/router/src/services/kafka.rs @@ -83,6 +83,7 @@ pub struct KafkaSettings { attempt_analytics_topic: String, refund_analytics_topic: String, api_logs_topic: String, + connector_logs_topic: String, } impl KafkaSettings { @@ -119,7 +120,15 @@ impl KafkaSettings { Err(ApplicationError::InvalidConfigurationValueError( "Kafka API event Analytics topic must not be empty".into(), )) - }) + })?; + + common_utils::fp_utils::when(self.connector_logs_topic.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "Kafka Connector Logs topic must not be empty".into(), + )) + })?; + + Ok(()) } } @@ -130,6 +139,7 @@ pub struct KafkaProducer { attempt_analytics_topic: String, refund_analytics_topic: String, api_logs_topic: String, + connector_logs_topic: String, } struct RdKafkaProducer(ThreadedProducer); @@ -166,6 +176,7 @@ impl KafkaProducer { attempt_analytics_topic: conf.attempt_analytics_topic.clone(), refund_analytics_topic: conf.refund_analytics_topic.clone(), api_logs_topic: conf.api_logs_topic.clone(), + connector_logs_topic: conf.connector_logs_topic.clone(), }) } @@ -297,6 +308,7 @@ impl KafkaProducer { EventType::PaymentAttempt => &self.attempt_analytics_topic, EventType::PaymentIntent => &self.intent_analytics_topic, EventType::Refund => &self.refund_analytics_topic, + EventType::ConnectorApiLogs => &self.connector_logs_topic, } } }