diff --git a/crates/router/src/consts.rs b/crates/router/src/consts.rs index 228d02e1ddac..02db8b1754ed 100644 --- a/crates/router/src/consts.rs +++ b/crates/router/src/consts.rs @@ -13,6 +13,8 @@ pub(crate) const ALPHABETS: [char; 62] = [ pub const REQUEST_TIME_OUT: u64 = 30; pub const REQUEST_TIMEOUT_ERROR_CODE: &str = "TIMEOUT"; pub const REQUEST_TIMEOUT_ERROR_MESSAGE: &str = "Connector did not respond in specified time"; +pub const REQUEST_TIMEOUT_ERROR_MESSAGE_FROM_PSYNC: &str = + "This Payment has been moved to failed as there is no response from the connector"; ///Payment intent fulfillment default timeout (in seconds) pub const DEFAULT_FULFILLMENT_TIME: i64 = 15 * 60; diff --git a/crates/router/src/core/payments/transformers.rs b/crates/router/src/core/payments/transformers.rs index a8069b4d4a62..c5e28110a6da 100644 --- a/crates/router/src/core/payments/transformers.rs +++ b/crates/router/src/core/payments/transformers.rs @@ -2,12 +2,11 @@ use std::{fmt::Debug, marker::PhantomData, str::FromStr}; use api_models::payments::FrmMessage; use common_utils::fp_utils; -use data_models::mandates::MandateData; use diesel_models::ephemeral_key; use error_stack::{IntoReport, ResultExt}; use router_env::{instrument, tracing}; -use super::{flows::Feature, PaymentAddress, PaymentData}; +use super::{flows::Feature, PaymentData}; use crate::{ configs::settings::{ConnectorRequestReferenceIdConfig, Server}, connector::Nexinets, @@ -202,40 +201,32 @@ where connector_request_reference_id_config: &ConnectorRequestReferenceIdConfig, connector_http_status_code: Option, ) -> RouterResponse { - let captures = payment_data - .multiple_capture_data - .and_then(|multiple_capture_data| { - multiple_capture_data - .expand_captures - .and_then(|should_expand| { - should_expand.then_some( - multiple_capture_data - .get_all_captures() - .into_iter() - .cloned() - .collect(), - ) - }) - }); + let captures = + payment_data + .multiple_capture_data + .clone() + .and_then(|multiple_capture_data| { + multiple_capture_data + .expand_captures + .and_then(|should_expand| { + should_expand.then_some( + multiple_capture_data + .get_all_captures() + .into_iter() + .cloned() + .collect(), + ) + }) + }); + payments_to_payments_response( req, - payment_data.payment_attempt, - payment_data.payment_intent, - payment_data.refunds, - payment_data.disputes, - payment_data.attempts, + payment_data, captures, - payment_data.payment_method_data, customer, auth_flow, - payment_data.address, server, - payment_data.connector_response.authentication_data, &operation, - payment_data.ephemeral_key, - payment_data.sessions_token, - payment_data.frm_message, - payment_data.setup_mandate, connector_request_reference_id_config, connector_http_status_code, ) @@ -333,31 +324,23 @@ where // try to use router data here so that already validated things , we don't want to repeat the validations. // Add internal value not found and external value not found so that we can give 500 / Internal server error for internal value not found #[allow(clippy::too_many_arguments)] -pub fn payments_to_payments_response( +pub fn payments_to_payments_response( payment_request: Option, - payment_attempt: storage::PaymentAttempt, - payment_intent: storage::PaymentIntent, - refunds: Vec, - disputes: Vec, - option_attempts: Option>, + payment_data: PaymentData, captures: Option>, - payment_method_data: Option, customer: Option, auth_flow: services::AuthFlow, - address: PaymentAddress, server: &Server, - redirection_data: Option, operation: &Op, - ephemeral_key_option: Option, - session_tokens: Vec, - fraud_check: Option, - mandate_data: Option, connector_request_reference_id_config: &ConnectorRequestReferenceIdConfig, connector_http_status_code: Option, ) -> RouterResponse where Op: Debug, { + let payment_attempt = payment_data.payment_attempt; + let payment_intent = payment_data.payment_intent; + let currency = payment_attempt .currency .as_ref() @@ -369,22 +352,31 @@ where field_name: "amount", })?; let mandate_id = payment_attempt.mandate_id.clone(); - let refunds_response = if refunds.is_empty() { + let refunds_response = if payment_data.refunds.is_empty() { None } else { - Some(refunds.into_iter().map(ForeignInto::foreign_into).collect()) + Some( + payment_data + .refunds + .into_iter() + .map(ForeignInto::foreign_into) + .collect(), + ) }; - let disputes_response = if disputes.is_empty() { + + let disputes_response = if payment_data.disputes.is_empty() { None } else { Some( - disputes + payment_data + .disputes .into_iter() .map(ForeignInto::foreign_into) .collect(), ) }; - let attempts_response = option_attempts.map(|attempts| { + + let attempts_response = payment_data.attempts.map(|attempts| { attempts .into_iter() .map(ForeignInto::foreign_into) @@ -419,7 +411,7 @@ where field_name: "payment_method_data", })?; let merchant_decision = payment_intent.merchant_decision.to_owned(); - let frm_message = fraud_check.map(FrmMessage::foreign_from); + let frm_message = payment_data.frm_message.map(FrmMessage::foreign_from); let payment_method_data_response = additional_payment_method_data.map(api::PaymentMethodDataResponse::from); @@ -441,13 +433,23 @@ where let output = Ok(match payment_request { Some(_request) => { - if payments::is_start_pay(&operation) && redirection_data.is_some() { - let redirection_data = redirection_data.get_required_value("redirection_data")?; + if payments::is_start_pay(&operation) + && payment_data + .connector_response + .authentication_data + .is_some() + { + let redirection_data = payment_data + .connector_response + .authentication_data + .get_required_value("redirection_data")?; + let form: RedirectForm = serde_json::from_value(redirection_data) .map_err(|_| errors::ApiErrorResponse::InternalServerError)?; + services::ApplicationResponse::Form(Box::new(services::RedirectionFormData { redirect_form: form, - payment_method_data, + payment_method_data: payment_data.payment_method_data, amount, currency: currency.to_string(), })) @@ -494,22 +496,23 @@ where display_to_timestamp: wait_screen_data.display_to_timestamp, } })) - .or(redirection_data.map(|_| { - api_models::payments::NextActionData::RedirectToUrl { + .or(payment_data + .connector_response + .authentication_data + .map(|_| api_models::payments::NextActionData::RedirectToUrl { redirect_to_url: helpers::create_startpay_url( server, &payment_attempt, &payment_intent, ), - } - })); + })); }; // next action check for third party sdk session (for ex: Apple pay through trustpay has third party sdk session response) if third_party_sdk_session_next_action(&payment_attempt, operation) { next_action_response = Some( api_models::payments::NextActionData::ThirdPartySdkSessionToken { - session_token: session_tokens.get(0).cloned(), + session_token: payment_data.sessions_token.get(0).cloned(), }, ) } @@ -555,7 +558,7 @@ where ) .set_mandate_id(mandate_id) .set_mandate_data( - mandate_data.map(|d| api::MandateData { + payment_data.setup_mandate.map(|d| api::MandateData { customer_acceptance: d.customer_acceptance.map(|d| { api::CustomerAcceptance { acceptance_type: match d.acceptance_type { @@ -621,8 +624,8 @@ where .or(payment_attempt.error_message), ) .set_error_code(payment_attempt.error_code) - .set_shipping(address.shipping) - .set_billing(address.billing) + .set_shipping(payment_data.address.shipping) + .set_billing(payment_data.address.billing) .set_next_action(next_action_response) .set_return_url(payment_intent.return_url) .set_cancellation_reason(payment_attempt.cancellation_reason) @@ -642,7 +645,9 @@ where .set_allowed_payment_method_types( payment_intent.allowed_payment_method_types, ) - .set_ephemeral_key(ephemeral_key_option.map(ForeignFrom::foreign_from)) + .set_ephemeral_key( + payment_data.ephemeral_key.map(ForeignFrom::foreign_from), + ) .set_frm_message(frm_message) .set_merchant_decision(merchant_decision) .set_manual_retry_allowed(helpers::is_manual_retry_allowed( @@ -696,8 +701,8 @@ where .as_ref() .and_then(|cus| cus.phone.as_ref().map(|s| s.to_owned())), mandate_id, - shipping: address.shipping, - billing: address.billing, + shipping: payment_data.address.shipping, + billing: payment_data.address.billing, cancellation_reason: payment_attempt.cancellation_reason, payment_token: payment_attempt.payment_token, metadata: payment_intent.metadata, diff --git a/crates/router/src/core/refunds.rs b/crates/router/src/core/refunds.rs index 4ad8344501c4..4f1537d5f483 100644 --- a/crates/router/src/core/refunds.rs +++ b/crates/router/src/core/refunds.rs @@ -861,13 +861,13 @@ pub async fn sync_refund_with_gateway_workflow( .await? } _ => { - payment_sync::retry_sync_task( + _ = payment_sync::retry_sync_task( &*state.store, response.connector, response.merchant_id, refund_tracker.to_owned(), ) - .await? + .await?; } } diff --git a/crates/router/src/core/webhooks.rs b/crates/router/src/core/webhooks.rs index 530d445b50de..7540845c343f 100644 --- a/crates/router/src/core/webhooks.rs +++ b/crates/router/src/core/webhooks.rs @@ -695,11 +695,6 @@ pub async fn create_event_and_trigger_outgoing_webhook(merchant_account, outgoing_webhook, &state).await; diff --git a/crates/router/src/utils.rs b/crates/router/src/utils.rs index e8a5597eb26d..a0491c18a5f7 100644 --- a/crates/router/src/utils.rs +++ b/crates/router/src/utils.rs @@ -5,6 +5,8 @@ pub mod ext_traits; #[cfg(feature = "kv_store")] pub mod storage_partitioning; +use std::fmt::Debug; + use api_models::{enums, payments, webhooks}; use base64::Engine; pub use common_utils::{ @@ -27,11 +29,12 @@ use crate::{ consts, core::{ errors::{self, CustomResult, RouterResult, StorageErrorExt}, - utils, + utils, webhooks as webhooks_core, }, db::StorageInterface, logger, routes::metrics, + services, types::{ self, domain::{ @@ -39,6 +42,7 @@ use crate::{ types::{encrypt_optional, AsyncLift}, }, storage, + transformers::{ForeignTryFrom, ForeignTryInto}, }, }; @@ -669,3 +673,89 @@ pub fn add_apple_pay_payment_status_metrics( } } } + +impl ForeignTryFrom for enums::EventType { + type Error = errors::ValidationError; + + fn foreign_try_from(value: enums::IntentStatus) -> Result { + match value { + enums::IntentStatus::Succeeded => Ok(Self::PaymentSucceeded), + enums::IntentStatus::Failed => Ok(Self::PaymentFailed), + enums::IntentStatus::Processing => Ok(Self::PaymentProcessing), + enums::IntentStatus::RequiresMerchantAction + | enums::IntentStatus::RequiresCustomerAction => Ok(Self::ActionRequired), + _ => Err(errors::ValidationError::IncorrectValueProvided { + field_name: "intent_status", + }), + } + } +} + +pub async fn trigger_payments_webhook( + merchant_account: domain::MerchantAccount, + payment_data: crate::core::payments::PaymentData, + req: Option, + customer: Option, + state: &crate::routes::AppState, + operation: Op, +) -> RouterResult<()> +where + F: Send + Clone + Sync, + Op: Debug, +{ + let status = payment_data.payment_intent.status; + let payment_id = payment_data.payment_intent.payment_id.clone(); + let captures = payment_data + .multiple_capture_data + .clone() + .map(|multiple_capture_data| { + multiple_capture_data + .get_all_captures() + .into_iter() + .cloned() + .collect() + }); + + if matches!( + status, + enums::IntentStatus::Succeeded | enums::IntentStatus::Failed + ) { + let payments_response = crate::core::payments::transformers::payments_to_payments_response( + req, + payment_data, + captures, + customer, + services::AuthFlow::Merchant, + &state.conf.server, + &operation, + &state.conf.connector_request_reference_id_config, + None, + )?; + + let event_type: enums::EventType = status + .foreign_try_into() + .into_report() + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("payment event type mapping failed")?; + + if let services::ApplicationResponse::JsonWithHeaders((payments_response_json, _)) = + payments_response + { + Box::pin( + webhooks_core::create_event_and_trigger_appropriate_outgoing_webhook( + state.clone(), + merchant_account, + event_type, + diesel_models::enums::EventClass::Payments, + None, + payment_id, + diesel_models::enums::EventObjectType::PaymentDetails, + webhooks::OutgoingWebhookContent::PaymentDetails(payments_response_json), + ), + ) + .await?; + } + } + + Ok(()) +} diff --git a/crates/router/src/workflows/payment_sync.rs b/crates/router/src/workflows/payment_sync.rs index 4dbf97081a60..cbb13be2f9b0 100644 --- a/crates/router/src/workflows/payment_sync.rs +++ b/crates/router/src/workflows/payment_sync.rs @@ -4,11 +4,13 @@ use router_env::logger; use scheduler::{ consumer::{self, types::process_data, workflows::ProcessTrackerWorkflow}, db::process_tracker::ProcessTrackerExt, - errors as sch_errors, utils, SchedulerAppState, + errors as sch_errors, utils as scheduler_utils, SchedulerAppState, }; use crate::{ + consts, core::{ + errors::StorageErrorExt, payment_methods::Oss, payments::{self as payment_flows, operations}, }, @@ -20,6 +22,7 @@ use crate::{ api, storage::{self, enums}, }, + utils, }; pub struct PaymentsSyncWorkflow; @@ -57,7 +60,7 @@ impl ProcessTrackerWorkflow for PaymentsSyncWorkflow { ) .await?; - let (payment_data, _, _, _) = + let (mut payment_data, _, customer, _) = payment_flows::payments_operation_core::( state, merchant_account.clone(), @@ -93,15 +96,72 @@ impl ProcessTrackerWorkflow for PaymentsSyncWorkflow { let connector = payment_data .payment_attempt .connector + .clone() .ok_or(sch_errors::ProcessTrackerError::MissingRequiredField)?; - retry_sync_task( + let is_last_retry = retry_sync_task( db, connector, - payment_data.payment_attempt.merchant_id, + payment_data.payment_attempt.merchant_id.clone(), process, ) - .await? + .await?; + + // If the payment status is still processing and there is no connector transaction_id + // then change the payment status to failed if all retries exceeded + if is_last_retry + && payment_data.payment_attempt.status == enums::AttemptStatus::Pending + && payment_data + .payment_attempt + .connector_transaction_id + .as_ref() + .is_none() + { + let payment_intent_update = data_models::payments::payment_intent::PaymentIntentUpdate::PGStatusUpdate { status: api_models::enums::IntentStatus::Failed }; + let payment_attempt_update = + data_models::payments::payment_attempt::PaymentAttemptUpdate::ErrorUpdate { + connector: None, + status: api_models::enums::AttemptStatus::AuthenticationFailed, + error_code: None, + error_message: None, + error_reason: Some(Some( + consts::REQUEST_TIMEOUT_ERROR_MESSAGE_FROM_PSYNC.to_string(), + )), + amount_capturable: Some(0), + }; + + payment_data.payment_attempt = db + .update_payment_attempt_with_attempt_id( + payment_data.payment_attempt, + payment_attempt_update, + merchant_account.storage_scheme, + ) + .await + .to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?; + + payment_data.payment_intent = db + .update_payment_intent( + payment_data.payment_intent, + payment_intent_update, + merchant_account.storage_scheme, + ) + .await + .to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?; + + // Trigger the outgoing webhook to notify the merchant about failed payment + let operation = operations::PaymentStatus; + utils::trigger_payments_webhook::<_, api_models::payments::PaymentsRequest, _>( + merchant_account, + payment_data, + None, + customer, + state, + operation, + ) + .await + .map_err(|error| logger::warn!(payments_outgoing_webhook_error=?error)) + .ok(); + } } }; Ok(()) @@ -117,6 +177,26 @@ impl ProcessTrackerWorkflow for PaymentsSyncWorkflow { } } +/// Get the next schedule time +/// +/// The schedule time can be configured in configs by this key `pt_mapping_trustpay` +/// ```json +/// { +/// "default_mapping": { +/// "start_after": 60, +/// "frequency": [300], +/// "count": [5] +/// }, +/// "max_retries_count": 5 +/// } +/// ``` +/// +/// This config represents +/// +/// `start_after`: The first psync should happen after 60 seconds +/// +/// `frequency` and `count`: The next 5 retries should have an interval of 300 seconds between them +/// pub async fn get_sync_process_schedule_time( db: &dyn StorageInterface, connector: &str, @@ -142,25 +222,32 @@ pub async fn get_sync_process_schedule_time( process_data::ConnectorPTMapping::default() } }; - let time_delta = utils::get_schedule_time(mapping, merchant_id, retry_count + 1); + let time_delta = scheduler_utils::get_schedule_time(mapping, merchant_id, retry_count + 1); - Ok(utils::get_time_from_delta(time_delta)) + Ok(scheduler_utils::get_time_from_delta(time_delta)) } +/// Schedule the task for retry +/// +/// Returns bool which indicates whether this was the last retry or not pub async fn retry_sync_task( db: &dyn StorageInterface, connector: String, merchant_id: String, pt: storage::ProcessTracker, -) -> Result<(), sch_errors::ProcessTrackerError> { +) -> Result { let schedule_time = get_sync_process_schedule_time(db, &connector, &merchant_id, pt.retry_count).await?; match schedule_time { - Some(s_time) => pt.retry(db.as_scheduler(), s_time).await, + Some(s_time) => { + pt.retry(db.as_scheduler(), s_time).await?; + Ok(false) + } None => { pt.finish_with_status(db.as_scheduler(), "RETRIES_EXCEEDED".to_string()) - .await + .await?; + Ok(true) } } } @@ -173,9 +260,11 @@ mod tests { #[test] fn test_get_default_schedule_time() { let schedule_time_delta = - utils::get_schedule_time(process_data::ConnectorPTMapping::default(), "-", 0).unwrap(); + scheduler_utils::get_schedule_time(process_data::ConnectorPTMapping::default(), "-", 0) + .unwrap(); let first_retry_time_delta = - utils::get_schedule_time(process_data::ConnectorPTMapping::default(), "-", 1).unwrap(); + scheduler_utils::get_schedule_time(process_data::ConnectorPTMapping::default(), "-", 1) + .unwrap(); let cpt_default = process_data::ConnectorPTMapping::default().default_mapping; assert_eq!( vec![schedule_time_delta, first_retry_time_delta], diff --git a/crates/scheduler/src/utils.rs b/crates/scheduler/src/utils.rs index 676ef330f9d1..53f14bd1fb9c 100644 --- a/crates/scheduler/src/utils.rs +++ b/crates/scheduler/src/utils.rs @@ -298,6 +298,7 @@ pub fn get_schedule_time( None => mapping.default_mapping, }; + // For first try, get the `start_after` time if retry_count == 0 { Some(mapping.start_after) } else { @@ -328,6 +329,7 @@ pub fn get_pm_schedule_time( } } +/// Get the delay based on the retry count fn get_delay<'a>( retry_count: i32, mut array: impl Iterator,