Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
kamalika0363 authored Oct 3, 2023
2 parents 240faef + 5048d24 commit 86e31b4
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 31 deletions.
22 changes: 21 additions & 1 deletion crates/api_models/src/webhooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use utoipa::ToSchema;

use crate::{disputes, enums as api_enums, payments, refunds};

#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Copy)]
#[serde(rename_all = "snake_case")]
pub enum IncomingWebhookEvent {
PaymentIntentFailure,
Expand Down Expand Up @@ -39,6 +39,26 @@ pub enum WebhookFlow {
BankTransfer,
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
/// This enum tells about the affect a webhook had on an object
pub enum WebhookResponseTracker {
Payment {
payment_id: String,
status: common_enums::IntentStatus,
},
Refund {
payment_id: String,
refund_id: String,
status: common_enums::RefundStatus,
},
Dispute {
dispute_id: String,
payment_id: String,
status: common_enums::DisputeStatus,
},
NoEffect,
}

impl From<IncomingWebhookEvent> for WebhookFlow {
fn from(evt: IncomingWebhookEvent) -> Self {
match evt {
Expand Down
105 changes: 77 additions & 28 deletions crates/router/src/core/webhooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub mod utils;

use std::str::FromStr;

use api_models::payments::HeaderPayload;
use api_models::{payments::HeaderPayload, webhooks::WebhookResponseTracker};
use common_utils::errors::ReportSwitchExt;
use error_stack::{report, IntoReport, ResultExt};
use masking::ExposeInterface;
Expand Down Expand Up @@ -40,7 +40,7 @@ pub async fn payments_incoming_webhook_flow<W: types::OutgoingWebhookType>(
key_store: domain::MerchantKeyStore,
webhook_details: api::IncomingWebhookDetails,
source_verified: bool,
) -> CustomResult<(), errors::ApiErrorResponse> {
) -> CustomResult<WebhookResponseTracker, errors::ApiErrorResponse> {
let consume_or_trigger_flow = if source_verified {
payments::CallConnectorAction::HandleResponse(webhook_details.resource_object)
} else {
Expand Down Expand Up @@ -111,9 +111,12 @@ pub async fn payments_incoming_webhook_flow<W: types::OutgoingWebhookType>(
metrics::WEBHOOK_PAYMENT_NOT_FOUND.add(
&metrics::CONTEXT,
1,
&[add_attributes("merchant_id", merchant_account.merchant_id)],
&[add_attributes(
"merchant_id",
merchant_account.merchant_id.clone(),
)],
);
return Ok(());
return Ok(WebhookResponseTracker::NoEffect);
}
error @ Err(_) => error?,
}
Expand All @@ -134,6 +137,8 @@ pub async fn payments_incoming_webhook_flow<W: types::OutgoingWebhookType>(
.change_context(errors::ApiErrorResponse::WebhookProcessingFailure)
.attach_printable("payment id not received from payments core")?;

let status = payments_response.status;

let event_type: Option<enums::EventType> = payments_response.status.foreign_into();

// If event is NOT an UnsupportedEvent, trigger Outgoing Webhook
Expand All @@ -144,20 +149,22 @@ pub async fn payments_incoming_webhook_flow<W: types::OutgoingWebhookType>(
outgoing_event_type,
enums::EventClass::Payments,
None,
payment_id,
payment_id.clone(),
enums::EventObjectType::PaymentDetails,
api::OutgoingWebhookContent::PaymentDetails(payments_response),
)
.await?;
}
};

let response = WebhookResponseTracker::Payment { payment_id, status };

Ok(response)
}

_ => Err(errors::ApiErrorResponse::WebhookProcessingFailure)
.into_report()
.attach_printable("received non-json response from payments core")?,
}

Ok(())
}

#[instrument(skip_all)]
Expand All @@ -169,7 +176,7 @@ pub async fn refunds_incoming_webhook_flow<W: types::OutgoingWebhookType>(
connector_name: &str,
source_verified: bool,
event_type: api_models::webhooks::IncomingWebhookEvent,
) -> CustomResult<(), errors::ApiErrorResponse> {
) -> CustomResult<WebhookResponseTracker, errors::ApiErrorResponse> {
let db = &*state.store;
//find refund by connector refund id
let refund = match webhook_details.object_reference_id {
Expand Down Expand Up @@ -246,7 +253,8 @@ pub async fn refunds_incoming_webhook_flow<W: types::OutgoingWebhookType>(

// If event is NOT an UnsupportedEvent, trigger Outgoing Webhook
if let Some(outgoing_event_type) = event_type {
let refund_response: api_models::refunds::RefundResponse = updated_refund.foreign_into();
let refund_response: api_models::refunds::RefundResponse =
updated_refund.clone().foreign_into();
create_event_and_trigger_outgoing_webhook::<W>(
state,
merchant_account,
Expand All @@ -260,7 +268,11 @@ pub async fn refunds_incoming_webhook_flow<W: types::OutgoingWebhookType>(
.await?;
}

Ok(())
Ok(WebhookResponseTracker::Refund {
payment_id: updated_refund.payment_id,
refund_id: updated_refund.refund_id,
status: updated_refund.refund_status,
})
}

pub async fn get_payment_attempt_from_object_reference_id(
Expand Down Expand Up @@ -386,7 +398,7 @@ pub async fn disputes_incoming_webhook_flow<W: types::OutgoingWebhookType>(
connector: &(dyn api::Connector + Sync),
request_details: &api::IncomingWebhookRequestDetails<'_>,
event_type: api_models::webhooks::IncomingWebhookEvent,
) -> CustomResult<(), errors::ApiErrorResponse> {
) -> CustomResult<WebhookResponseTracker, errors::ApiErrorResponse> {
metrics::INCOMING_DISPUTE_WEBHOOK_METRIC.add(&metrics::CONTEXT, 1, &[]);
if source_verified {
let db = &*state.store;
Expand All @@ -411,7 +423,7 @@ pub async fn disputes_incoming_webhook_flow<W: types::OutgoingWebhookType>(
dispute_details,
&merchant_account.merchant_id,
&payment_attempt,
event_type.clone(),
event_type,
connector.id(),
)
.await?;
Expand All @@ -424,13 +436,17 @@ pub async fn disputes_incoming_webhook_flow<W: types::OutgoingWebhookType>(
event_type,
enums::EventClass::Disputes,
None,
dispute_object.dispute_id,
dispute_object.dispute_id.clone(),
enums::EventObjectType::DisputeDetails,
api::OutgoingWebhookContent::DisputeDetails(disputes_response),
)
.await?;
metrics::INCOMING_DISPUTE_WEBHOOK_MERCHANT_NOTIFIED_METRIC.add(&metrics::CONTEXT, 1, &[]);
Ok(())
Ok(WebhookResponseTracker::Dispute {
dispute_id: dispute_object.dispute_id,
payment_id: dispute_object.payment_id,
status: dispute_object.dispute_status,
})
} else {
metrics::INCOMING_DISPUTE_WEBHOOK_SIGNATURE_FAILURE_METRIC.add(&metrics::CONTEXT, 1, &[]);
Err(errors::ApiErrorResponse::WebhookAuthenticationFailed).into_report()
Expand All @@ -443,7 +459,7 @@ async fn bank_transfer_webhook_flow<W: types::OutgoingWebhookType>(
key_store: domain::MerchantKeyStore,
webhook_details: api::IncomingWebhookDetails,
source_verified: bool,
) -> CustomResult<(), errors::ApiErrorResponse> {
) -> CustomResult<WebhookResponseTracker, errors::ApiErrorResponse> {
let response = if source_verified {
let payment_attempt = get_payment_attempt_from_object_reference_id(
&state,
Expand Down Expand Up @@ -486,6 +502,7 @@ async fn bank_transfer_webhook_flow<W: types::OutgoingWebhookType>(
.attach_printable("did not receive payment id from payments core response")?;

let event_type: Option<enums::EventType> = payments_response.status.foreign_into();
let status = payments_response.status;

// If event is NOT an UnsupportedEvent, trigger Outgoing Webhook
if let Some(outgoing_event_type) = event_type {
Expand All @@ -495,20 +512,20 @@ async fn bank_transfer_webhook_flow<W: types::OutgoingWebhookType>(
outgoing_event_type,
enums::EventClass::Payments,
None,
payment_id,
payment_id.clone(),
enums::EventObjectType::PaymentDetails,
api::OutgoingWebhookContent::PaymentDetails(payments_response),
)
.await?;
}

Ok(WebhookResponseTracker::Payment { payment_id, status })
}

_ => Err(errors::ApiErrorResponse::WebhookProcessingFailure)
.into_report()
.attach_printable("received non-json response from payments core")?,
}

Ok(())
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -729,6 +746,27 @@ pub async fn trigger_webhook_to_merchant<W: types::OutgoingWebhookType>(
Ok(())
}

pub async fn webhooks_wrapper<W: types::OutgoingWebhookType>(
state: AppState,
req: &actix_web::HttpRequest,
merchant_account: domain::MerchantAccount,
key_store: domain::MerchantKeyStore,
connector_name_or_mca_id: &str,
body: actix_web::web::Bytes,
) -> RouterResponse<serde_json::Value> {
let (application_response, _webhooks_response_tracker) = webhooks_core::<W>(
state,
req,
merchant_account,
key_store,
connector_name_or_mca_id,
body,
)
.await?;

Ok(application_response)
}

#[instrument(skip_all)]
pub async fn webhooks_core<W: types::OutgoingWebhookType>(
state: AppState,
Expand All @@ -737,7 +775,10 @@ pub async fn webhooks_core<W: types::OutgoingWebhookType>(
key_store: domain::MerchantKeyStore,
connector_name_or_mca_id: &str,
body: actix_web::web::Bytes,
) -> RouterResponse<serde_json::Value> {
) -> errors::RouterResult<(
services::ApplicationResponse<serde_json::Value>,
WebhookResponseTracker,
)> {
metrics::WEBHOOK_INCOMING_COUNT.add(
&metrics::CONTEXT,
1,
Expand All @@ -754,8 +795,11 @@ pub async fn webhooks_core<W: types::OutgoingWebhookType>(
body: &body,
};

// Fetch the merchant connector account to get the webhooks source secret
// `webhooks source secret` is a secret shared between the merchant and connector
// This is used for source verification and webhooks integrity
let (merchant_connector_account, connector) = fetch_mca_and_connector(
state.clone(),
&state,
&merchant_account,
connector_name_or_mca_id,
&key_store,
Expand Down Expand Up @@ -810,10 +854,12 @@ pub async fn webhooks_core<W: types::OutgoingWebhookType>(
],
);

return connector
let response = connector
.get_webhook_api_response(&request_details)
.switch()
.attach_printable("Failed while early return in case of event type parsing");
.attach_printable("Failed while early return in case of event type parsing")?;

return Ok((response, WebhookResponseTracker::NoEffect));
}
};

Expand All @@ -829,7 +875,9 @@ pub async fn webhooks_core<W: types::OutgoingWebhookType>(
logger::info!(event_type=?event_type);

let flow_type: api::WebhookFlow = event_type.to_owned().into();
if process_webhook_further && !matches!(flow_type, api::WebhookFlow::ReturnResponse) {
let webhook_effect = if process_webhook_further
&& !matches!(flow_type, api::WebhookFlow::ReturnResponse)
{
let object_ref_id = connector
.get_webhook_object_reference_id(&request_details)
.switch()
Expand Down Expand Up @@ -962,7 +1010,7 @@ pub async fn webhooks_core<W: types::OutgoingWebhookType>(
.await
.attach_printable("Incoming bank-transfer webhook flow failed")?,

api::WebhookFlow::ReturnResponse => {}
api::WebhookFlow::ReturnResponse => WebhookResponseTracker::NoEffect,

_ => Err(errors::ApiErrorResponse::InternalServerError)
.into_report()
Expand All @@ -977,14 +1025,15 @@ pub async fn webhooks_core<W: types::OutgoingWebhookType>(
merchant_account.merchant_id.clone(),
)],
);
}
WebhookResponseTracker::NoEffect
};

let response = connector
.get_webhook_api_response(&request_details)
.switch()
.attach_printable("Could not get incoming webhook api response from connector")?;

Ok(response)
Ok((response, webhook_effect))
}

#[inline]
Expand Down Expand Up @@ -1026,7 +1075,7 @@ pub async fn get_payment_id(
}

async fn fetch_mca_and_connector(
state: AppState,
state: &AppState,
merchant_account: &domain::MerchantAccount,
connector_name_or_mca_id: &str,
key_store: &domain::MerchantKeyStore,
Expand Down
3 changes: 3 additions & 0 deletions crates/router/src/core/webhooks/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const IRRELEVANT_ATTEMPT_ID_IN_SOURCE_VERIFICATION_FLOW: &str =
const IRRELEVANT_CONNECTOR_REQUEST_REFERENCE_ID_IN_SOURCE_VERIFICATION_FLOW: &str =
"irrelevant_connector_request_reference_id_in_source_verification_flow";

/// Check whether the merchant has configured to process the webhook `event` for the `connector`
/// First check for the key "whconf_{merchant_id}_{connector_id}" in redis,
/// if not found, fetch from configs table in database, if not found use default
pub async fn lookup_webhook_event(
db: &dyn StorageInterface,
connector_id: &str,
Expand Down
4 changes: 2 additions & 2 deletions crates/router/src/routes/webhooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ pub async fn receive_incoming_webhook<W: types::OutgoingWebhookType>(
&req,
body,
|state, auth, body| {
webhooks::webhooks_core::<W>(
state,
webhooks::webhooks_wrapper::<W>(
state.to_owned(),
&req,
auth.merchant_account,
auth.key_store,
Expand Down

0 comments on commit 86e31b4

Please sign in to comment.