Skip to content

Commit

Permalink
feat(process_tracker): make long standing payments as failed
Browse files Browse the repository at this point in the history
The criterion is that the process tracker has done its last retry
and the status is still processing and connector transaction id is absent
  • Loading branch information
Narayanbhat166 committed Sep 27, 2023
1 parent f479d94 commit d055c58
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 10 deletions.
4 changes: 2 additions & 2 deletions crates/router/src/core/refunds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -861,13 +861,13 @@ pub async fn sync_refund_with_gateway_workflow(
.await?
}
_ => {
payment_sync::retry_sync_task(
_ = payment_sync::retry_sync_task(
&*state.store,
response.connector,
response.merchant_id,
refund_tracker.to_owned(),
)
.await?
.await?;
}
}

Expand Down
107 changes: 99 additions & 8 deletions crates/router/src/workflows/payment_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use scheduler::{
};

use crate::{
core::payments::{self as payment_flows, operations},
core::{
errors::StorageErrorExt,
payments::{self as payment_flows, operations},
},
db::StorageInterface,
errors,
routes::AppState,
Expand Down Expand Up @@ -54,7 +57,7 @@ impl ProcessTrackerWorkflow<AppState> for PaymentsSyncWorkflow {
)
.await?;

let (payment_data, _, _, _) =
let (mut payment_data, _, customer, _) =
payment_flows::payments_operation_core::<api::PSync, _, _, _>(
state,
merchant_account.clone(),
Expand Down Expand Up @@ -90,15 +93,76 @@ impl ProcessTrackerWorkflow<AppState> for PaymentsSyncWorkflow {
let connector = payment_data
.payment_attempt
.connector
.clone()
.ok_or(sch_errors::ProcessTrackerError::MissingRequiredField)?;

retry_sync_task(
let is_last_retry = retry_sync_task(
db,
connector,
payment_data.payment_attempt.merchant_id,
payment_data.payment_attempt.merchant_id.clone(),
process,
)
.await?
.await?;

// If the payment status is still processing and there is no connector transaction_id
// then change the payment status to failed if all retries exceeded
if is_last_retry
&& payment_data.payment_attempt.status == enums::AttemptStatus::Pending
&& payment_data
.payment_attempt
.connector_transaction_id
.as_ref()
.is_none()
{
let payment_intent_update = data_models::payments::payment_intent::PaymentIntentUpdate::PGStatusUpdate { status: api_models::enums::IntentStatus::Failed };
let payment_attempt_update =
data_models::payments::payment_attempt::PaymentAttemptUpdate::ErrorUpdate {
connector: None,
status: api_models::enums::AttemptStatus::AuthenticationFailed,
error_code: None,
error_message: Some(Some(
"Failed due to no response from connector".to_string(),
)),
error_reason: None,
amount_capturable: Some(0),
};

payment_data.payment_attempt = db
.update_payment_attempt_with_attempt_id(
payment_data.payment_attempt,
payment_attempt_update,
merchant_account.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?;

payment_data.payment_intent = db
.update_payment_intent(
payment_data.payment_intent,
payment_intent_update,
merchant_account.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?;

// Trigger the outgoing webhook to notify the merchant about failed payment
let operation = operations::PaymentStatus;
crate::utils::trigger_payments_webhook::<
_,
api_models::payments::PaymentsRequest,
_,
>(
merchant_account,
payment_data,
None,
customer,
state,
operation,
)
.await
.map_err(|error| logger::warn!(payments_outgoing_webhook_error=?error))
.ok();
}
}
};
Ok(())
Expand All @@ -114,6 +178,26 @@ impl ProcessTrackerWorkflow<AppState> for PaymentsSyncWorkflow {
}
}

/// Get the next schedule time
///
/// The schedule time can be configured in configs by this key `pt_mapping_trustpay`
/// ```json
/// {
/// "default_mapping": {
/// "start_after": 60,
/// "frequency": [300],
/// "count": [5]
/// },
/// "max_retries_count": 5
/// }
/// ```
///
/// This config represents
///
/// `start_after`: The first psync should happen after 60 seconds
///
/// `frequency` and `count`: The next 5 retries should have an interval of 300 seconds between them
///
pub async fn get_sync_process_schedule_time(
db: &dyn StorageInterface,
connector: &str,
Expand Down Expand Up @@ -144,20 +228,27 @@ pub async fn get_sync_process_schedule_time(
Ok(utils::get_time_from_delta(time_delta))
}

/// Schedule the task for retry
///
/// Returns bool which indicates whether this was the last retry or not
pub async fn retry_sync_task(
db: &dyn StorageInterface,
connector: String,
merchant_id: String,
pt: storage::ProcessTracker,
) -> Result<(), sch_errors::ProcessTrackerError> {
) -> Result<bool, sch_errors::ProcessTrackerError> {
let schedule_time =
get_sync_process_schedule_time(db, &connector, &merchant_id, pt.retry_count).await?;

match schedule_time {
Some(s_time) => pt.retry(db.as_scheduler(), s_time).await,
Some(s_time) => {
pt.retry(db.as_scheduler(), s_time).await?;
Ok(false)
}
None => {
pt.finish_with_status(db.as_scheduler(), "RETRIES_EXCEEDED".to_string())
.await
.await?;
Ok(true)
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/scheduler/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ pub fn get_schedule_time(
None => mapping.default_mapping,
};

// For first try, get the `start_after` time
if retry_count == 0 {
Some(mapping.start_after)
} else {
Expand Down Expand Up @@ -328,6 +329,7 @@ pub fn get_pm_schedule_time(
}
}

/// Get the delay based on the retry count
fn get_delay<'a>(
retry_count: i32,
mut array: impl Iterator<Item = (&'a i32, &'a i32)>,
Expand Down

0 comments on commit d055c58

Please sign in to comment.