Skip to content

Commit

Permalink
feat(events): add event generation for connector api calls
Browse files Browse the repository at this point in the history
  • Loading branch information
lsampras committed Dec 2, 2023
1 parent 37c9081 commit 2bf7119
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 5 deletions.
18 changes: 18 additions & 0 deletions config/config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,12 @@ sdk_url = "http://localhost:9090/dist/HyperLoader.js"
[analytics]
source = "sqlx" # The Analytics source/strategy to be used

[analytics.clickhouse]
username = "" # Clickhouse username
password = "" # Clickhouse password (optional)
host = "" # Clickhouse host in http(s)://<URL>:<PORT> format
database_name = "" # Clickhouse database name

[analytics.sqlx]
username = "db_user" # Analytics DB Username
password = "db_pass" # Analytics DB Password
Expand All @@ -472,8 +478,20 @@ port = 5432 # Analytics DB Port
dbname = "hyperswitch_db" # Name of Database
pool_size = 5 # Number of connections to keep open
connection_timeout = 10 # Timeout for database connection in seconds
queue_strategy = "Fifo" # Add the queue strategy used by the database bb8 client

# Config for KV setup
[kv_config]
# TTL for KV in seconds
ttl = 900

[events]
source = "logs" # The event sink to push events supports kafka or logs (stdout)

[events.kafka]
brokers = [] # Kafka broker urls for bootstrapping the client
intent_analytics_topic = "topic" # Kafka topic to be used for PaymentIntent events
attempt_analytics_topic = "topic" # Kafka topic to be used for PaymentAttempt events
refund_analytics_topic = "topic" # Kafka topic to be used for Refund events
api_logs_topic = "topic" # Kafka topic to be used for incoming api events
connector_logs_topic = "topic" # Kafka topic to be used for connector api events
2 changes: 1 addition & 1 deletion config/docker_compose.toml
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ intent_analytics_topic = "hyperswitch-payment-intent-events"
attempt_analytics_topic = "hyperswitch-payment-attempt-events"
refund_analytics_topic = "hyperswitch-refund-events"
api_logs_topic = "hyperswitch-api-log-events"
connector_events_topic = "hyperswitch-connector-api-events"
connector_logs_topic = "hyperswitch-connector-api-events"

[analytics]
source = "sqlx"
Expand Down
4 changes: 3 additions & 1 deletion crates/router/src/core/refunds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,9 @@ pub async fn start_refund_workflow(
refund_tracker: &storage::ProcessTracker,
) -> Result<(), errors::ProcessTrackerError> {
match refund_tracker.name.as_deref() {
Some("EXECUTE_REFUND") => trigger_refund_execute_workflow(state, refund_tracker).await,
Some("EXECUTE_REFUND") => {
Box::pin(trigger_refund_execute_workflow(state, refund_tracker)).await
}
Some("SYNC_REFUND") => {
Box::pin(sync_refund_with_gateway_workflow(state, refund_tracker)).await
}
Expand Down
2 changes: 2 additions & 0 deletions crates/router/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use storage_impl::errors::ApplicationError;
use crate::{db::KafkaProducer, services::kafka::KafkaSettings};

pub mod api_logs;
pub mod connector_api_logs;
pub mod event_logger;
pub mod kafka_handler;

Expand All @@ -29,6 +30,7 @@ pub enum EventType {
PaymentAttempt,
Refund,
ApiLogs,
ConnectorApiLogs,
}

#[derive(Debug, Default, Deserialize, Clone)]
Expand Down
65 changes: 65 additions & 0 deletions crates/router/src/events/connector_api_logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use common_utils::request::Method;
use router_env::tracing_actix_web::RequestId;
use serde::Serialize;
use time::OffsetDateTime;

use super::{EventType, RawEvent};

#[derive(Debug, Serialize)]
pub struct ConnectorEvent {
connector_name: String,
flow: String,
request: String,
response: Option<String>,
url: String,
method: String,
payment_id: String,
merchant_id: String,
created_at: i128,
request_id: String,
latency: u128,
}

impl ConnectorEvent {
#[allow(clippy::too_many_arguments)]
pub fn new(
connector_name: String,
flow: String,
request: serde_json::Value,
response: Option<String>,
url: String,
method: Method,
payment_id: String,
merchant_id: String,
request_id: &Option<RequestId>,
latency: u128,
) -> Self {
Self {
connector_name,
flow,
request: request.to_string(),
response,
url,
method: method.to_string(),
payment_id,
merchant_id,
created_at: OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000,
request_id: request_id
.map(|i| i.as_hyphenated().to_string())
.unwrap_or("NO_REQUEST_ID".to_string()),
latency,
}
}
}

impl TryFrom<ConnectorEvent> for RawEvent {
type Error = serde_json::Error;

fn try_from(value: ConnectorEvent) -> Result<Self, Self::Error> {
Ok(Self {
event_type: EventType::ConnectorApiLogs,
key: value.request_id.clone(),
payload: serde_json::to_value(value)?,
})
}
}
5 changes: 4 additions & 1 deletion crates/router/src/routes/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub struct AppState {
pub api_client: Box<dyn crate::services::ApiClient>,
#[cfg(feature = "olap")]
pub pool: crate::analytics::AnalyticsProvider,
pub request_id: Option<RequestId>,
}

impl scheduler::SchedulerAppState for AppState {
Expand Down Expand Up @@ -93,7 +94,8 @@ impl AppStateInfo for AppState {
}
fn add_request_id(&mut self, request_id: RequestId) {
self.api_client.add_request_id(request_id);
self.store.add_request_id(request_id.to_string())
self.store.add_request_id(request_id.to_string());
self.request_id.replace(request_id);
}

fn add_merchant_id(&mut self, merchant_id: Option<String>) {
Expand Down Expand Up @@ -212,6 +214,7 @@ impl AppState {
event_handler,
#[cfg(feature = "olap")]
pool,
request_id: None,
}
})
.await
Expand Down
43 changes: 42 additions & 1 deletion crates/router/src/services/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ use crate::{
errors::{self, CustomResult},
payments,
},
events::api_logs::{ApiEvent, ApiEventMetric, ApiEventsType},
events::{
api_logs::{ApiEvent, ApiEventMetric, ApiEventsType},
connector_api_logs::ConnectorEvent,
},
logger,
routes::{
app::AppStateInfo,
Expand Down Expand Up @@ -361,10 +364,48 @@ where
match connector_request {
Some(request) => {
logger::debug!(connector_request=?request);

let masked_request_body = match &request.body {
Some(request) => match request {
RequestContent::Json(i)
| RequestContent::FormUrlEncoded(i)
| RequestContent::Xml(i) => i
.masked_serialize()
.unwrap_or(json!({ "error": "failed to mask serialize"})),
RequestContent::FormData(_) => json!({"request_type": "FORM_DATA"}),
},
None => json!({"error": "EMPTY_REQUEST_BODY"}),
};
let request_url = request.url.clone();
let request_method = request.method;

let current_time = Instant::now();
let response = call_connector_api(state, request).await;
let external_latency = current_time.elapsed().as_millis();
logger::debug!(connector_response=?response);

let connector_event = ConnectorEvent::new(
req.connector.clone(),
std::any::type_name::<T>().to_string(),
masked_request_body,
None,
request_url,
request_method,
req.payment_id.clone(),
req.merchant_id.clone(),
&state.request_id,
external_latency,
);

match connector_event.try_into() {
Ok(event) => {
state.event_handler().log_event(event);
}
Err(err) => {
logger::error!(error=?err, "Error Logging Connector Event");
}
}

match response {
Ok(body) => {
let response = match body {
Expand Down
14 changes: 13 additions & 1 deletion crates/router/src/services/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub struct KafkaSettings {
attempt_analytics_topic: String,
refund_analytics_topic: String,
api_logs_topic: String,
connector_logs_topic: String,
}

impl KafkaSettings {
Expand Down Expand Up @@ -119,7 +120,15 @@ impl KafkaSettings {
Err(ApplicationError::InvalidConfigurationValueError(
"Kafka API event Analytics topic must not be empty".into(),
))
})
})?;

common_utils::fp_utils::when(self.connector_logs_topic.is_default_or_empty(), || {
Err(ApplicationError::InvalidConfigurationValueError(
"Kafka Connector Logs topic must not be empty".into(),
))
})?;

Ok(())
}
}

Expand All @@ -130,6 +139,7 @@ pub struct KafkaProducer {
attempt_analytics_topic: String,
refund_analytics_topic: String,
api_logs_topic: String,
connector_logs_topic: String,
}

struct RdKafkaProducer(ThreadedProducer<DefaultProducerContext>);
Expand Down Expand Up @@ -166,6 +176,7 @@ impl KafkaProducer {
attempt_analytics_topic: conf.attempt_analytics_topic.clone(),
refund_analytics_topic: conf.refund_analytics_topic.clone(),
api_logs_topic: conf.api_logs_topic.clone(),
connector_logs_topic: conf.connector_logs_topic.clone(),
})
}

Expand Down Expand Up @@ -297,6 +308,7 @@ impl KafkaProducer {
EventType::PaymentAttempt => &self.attempt_analytics_topic,
EventType::PaymentIntent => &self.intent_analytics_topic,
EventType::Refund => &self.refund_analytics_topic,
EventType::ConnectorApiLogs => &self.connector_logs_topic,
}
}
}
Expand Down

0 comments on commit 2bf7119

Please sign in to comment.