mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-31 01:57:45 +08:00
feat(process_tracker): make long standing payments failed (#2380)
Co-authored-by: Arun Raj M <jarnura47@gmail.com>
This commit is contained in:
@ -13,6 +13,8 @@ pub(crate) const ALPHABETS: [char; 62] = [
|
||||
pub const REQUEST_TIME_OUT: u64 = 30;
|
||||
pub const REQUEST_TIMEOUT_ERROR_CODE: &str = "TIMEOUT";
|
||||
pub const REQUEST_TIMEOUT_ERROR_MESSAGE: &str = "Connector did not respond in specified time";
|
||||
pub const REQUEST_TIMEOUT_ERROR_MESSAGE_FROM_PSYNC: &str =
|
||||
"This Payment has been moved to failed as there is no response from the connector";
|
||||
|
||||
///Payment intent fulfillment default timeout (in seconds)
|
||||
pub const DEFAULT_FULFILLMENT_TIME: i64 = 15 * 60;
|
||||
|
||||
@ -2,12 +2,11 @@ use std::{fmt::Debug, marker::PhantomData, str::FromStr};
|
||||
|
||||
use api_models::payments::FrmMessage;
|
||||
use common_utils::fp_utils;
|
||||
use data_models::mandates::MandateData;
|
||||
use diesel_models::ephemeral_key;
|
||||
use error_stack::{IntoReport, ResultExt};
|
||||
use router_env::{instrument, tracing};
|
||||
|
||||
use super::{flows::Feature, PaymentAddress, PaymentData};
|
||||
use super::{flows::Feature, PaymentData};
|
||||
use crate::{
|
||||
configs::settings::{ConnectorRequestReferenceIdConfig, Server},
|
||||
connector::Nexinets,
|
||||
@ -202,8 +201,10 @@ where
|
||||
connector_request_reference_id_config: &ConnectorRequestReferenceIdConfig,
|
||||
connector_http_status_code: Option<u16>,
|
||||
) -> RouterResponse<Self> {
|
||||
let captures = payment_data
|
||||
let captures =
|
||||
payment_data
|
||||
.multiple_capture_data
|
||||
.clone()
|
||||
.and_then(|multiple_capture_data| {
|
||||
multiple_capture_data
|
||||
.expand_captures
|
||||
@ -217,25 +218,15 @@ where
|
||||
)
|
||||
})
|
||||
});
|
||||
|
||||
payments_to_payments_response(
|
||||
req,
|
||||
payment_data.payment_attempt,
|
||||
payment_data.payment_intent,
|
||||
payment_data.refunds,
|
||||
payment_data.disputes,
|
||||
payment_data.attempts,
|
||||
payment_data,
|
||||
captures,
|
||||
payment_data.payment_method_data,
|
||||
customer,
|
||||
auth_flow,
|
||||
payment_data.address,
|
||||
server,
|
||||
payment_data.connector_response.authentication_data,
|
||||
&operation,
|
||||
payment_data.ephemeral_key,
|
||||
payment_data.sessions_token,
|
||||
payment_data.frm_message,
|
||||
payment_data.setup_mandate,
|
||||
connector_request_reference_id_config,
|
||||
connector_http_status_code,
|
||||
)
|
||||
@ -333,31 +324,23 @@ where
|
||||
// try to use router data here so that already validated things , we don't want to repeat the validations.
|
||||
// Add internal value not found and external value not found so that we can give 500 / Internal server error for internal value not found
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn payments_to_payments_response<R, Op>(
|
||||
pub fn payments_to_payments_response<R, Op, F: Clone>(
|
||||
payment_request: Option<R>,
|
||||
payment_attempt: storage::PaymentAttempt,
|
||||
payment_intent: storage::PaymentIntent,
|
||||
refunds: Vec<storage::Refund>,
|
||||
disputes: Vec<storage::Dispute>,
|
||||
option_attempts: Option<Vec<storage::PaymentAttempt>>,
|
||||
payment_data: PaymentData<F>,
|
||||
captures: Option<Vec<storage::Capture>>,
|
||||
payment_method_data: Option<api::PaymentMethodData>,
|
||||
customer: Option<domain::Customer>,
|
||||
auth_flow: services::AuthFlow,
|
||||
address: PaymentAddress,
|
||||
server: &Server,
|
||||
redirection_data: Option<serde_json::Value>,
|
||||
operation: &Op,
|
||||
ephemeral_key_option: Option<ephemeral_key::EphemeralKey>,
|
||||
session_tokens: Vec<api::SessionToken>,
|
||||
fraud_check: Option<payments::FraudCheck>,
|
||||
mandate_data: Option<MandateData>,
|
||||
connector_request_reference_id_config: &ConnectorRequestReferenceIdConfig,
|
||||
connector_http_status_code: Option<u16>,
|
||||
) -> RouterResponse<api::PaymentsResponse>
|
||||
where
|
||||
Op: Debug,
|
||||
{
|
||||
let payment_attempt = payment_data.payment_attempt;
|
||||
let payment_intent = payment_data.payment_intent;
|
||||
|
||||
let currency = payment_attempt
|
||||
.currency
|
||||
.as_ref()
|
||||
@ -369,22 +352,31 @@ where
|
||||
field_name: "amount",
|
||||
})?;
|
||||
let mandate_id = payment_attempt.mandate_id.clone();
|
||||
let refunds_response = if refunds.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(refunds.into_iter().map(ForeignInto::foreign_into).collect())
|
||||
};
|
||||
let disputes_response = if disputes.is_empty() {
|
||||
let refunds_response = if payment_data.refunds.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(
|
||||
disputes
|
||||
payment_data
|
||||
.refunds
|
||||
.into_iter()
|
||||
.map(ForeignInto::foreign_into)
|
||||
.collect(),
|
||||
)
|
||||
};
|
||||
let attempts_response = option_attempts.map(|attempts| {
|
||||
|
||||
let disputes_response = if payment_data.disputes.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(
|
||||
payment_data
|
||||
.disputes
|
||||
.into_iter()
|
||||
.map(ForeignInto::foreign_into)
|
||||
.collect(),
|
||||
)
|
||||
};
|
||||
|
||||
let attempts_response = payment_data.attempts.map(|attempts| {
|
||||
attempts
|
||||
.into_iter()
|
||||
.map(ForeignInto::foreign_into)
|
||||
@ -419,7 +411,7 @@ where
|
||||
field_name: "payment_method_data",
|
||||
})?;
|
||||
let merchant_decision = payment_intent.merchant_decision.to_owned();
|
||||
let frm_message = fraud_check.map(FrmMessage::foreign_from);
|
||||
let frm_message = payment_data.frm_message.map(FrmMessage::foreign_from);
|
||||
|
||||
let payment_method_data_response =
|
||||
additional_payment_method_data.map(api::PaymentMethodDataResponse::from);
|
||||
@ -441,13 +433,23 @@ where
|
||||
|
||||
let output = Ok(match payment_request {
|
||||
Some(_request) => {
|
||||
if payments::is_start_pay(&operation) && redirection_data.is_some() {
|
||||
let redirection_data = redirection_data.get_required_value("redirection_data")?;
|
||||
if payments::is_start_pay(&operation)
|
||||
&& payment_data
|
||||
.connector_response
|
||||
.authentication_data
|
||||
.is_some()
|
||||
{
|
||||
let redirection_data = payment_data
|
||||
.connector_response
|
||||
.authentication_data
|
||||
.get_required_value("redirection_data")?;
|
||||
|
||||
let form: RedirectForm = serde_json::from_value(redirection_data)
|
||||
.map_err(|_| errors::ApiErrorResponse::InternalServerError)?;
|
||||
|
||||
services::ApplicationResponse::Form(Box::new(services::RedirectionFormData {
|
||||
redirect_form: form,
|
||||
payment_method_data,
|
||||
payment_method_data: payment_data.payment_method_data,
|
||||
amount,
|
||||
currency: currency.to_string(),
|
||||
}))
|
||||
@ -494,14 +496,15 @@ where
|
||||
display_to_timestamp: wait_screen_data.display_to_timestamp,
|
||||
}
|
||||
}))
|
||||
.or(redirection_data.map(|_| {
|
||||
api_models::payments::NextActionData::RedirectToUrl {
|
||||
.or(payment_data
|
||||
.connector_response
|
||||
.authentication_data
|
||||
.map(|_| api_models::payments::NextActionData::RedirectToUrl {
|
||||
redirect_to_url: helpers::create_startpay_url(
|
||||
server,
|
||||
&payment_attempt,
|
||||
&payment_intent,
|
||||
),
|
||||
}
|
||||
}));
|
||||
};
|
||||
|
||||
@ -509,7 +512,7 @@ where
|
||||
if third_party_sdk_session_next_action(&payment_attempt, operation) {
|
||||
next_action_response = Some(
|
||||
api_models::payments::NextActionData::ThirdPartySdkSessionToken {
|
||||
session_token: session_tokens.get(0).cloned(),
|
||||
session_token: payment_data.sessions_token.get(0).cloned(),
|
||||
},
|
||||
)
|
||||
}
|
||||
@ -555,7 +558,7 @@ where
|
||||
)
|
||||
.set_mandate_id(mandate_id)
|
||||
.set_mandate_data(
|
||||
mandate_data.map(|d| api::MandateData {
|
||||
payment_data.setup_mandate.map(|d| api::MandateData {
|
||||
customer_acceptance: d.customer_acceptance.map(|d| {
|
||||
api::CustomerAcceptance {
|
||||
acceptance_type: match d.acceptance_type {
|
||||
@ -621,8 +624,8 @@ where
|
||||
.or(payment_attempt.error_message),
|
||||
)
|
||||
.set_error_code(payment_attempt.error_code)
|
||||
.set_shipping(address.shipping)
|
||||
.set_billing(address.billing)
|
||||
.set_shipping(payment_data.address.shipping)
|
||||
.set_billing(payment_data.address.billing)
|
||||
.set_next_action(next_action_response)
|
||||
.set_return_url(payment_intent.return_url)
|
||||
.set_cancellation_reason(payment_attempt.cancellation_reason)
|
||||
@ -642,7 +645,9 @@ where
|
||||
.set_allowed_payment_method_types(
|
||||
payment_intent.allowed_payment_method_types,
|
||||
)
|
||||
.set_ephemeral_key(ephemeral_key_option.map(ForeignFrom::foreign_from))
|
||||
.set_ephemeral_key(
|
||||
payment_data.ephemeral_key.map(ForeignFrom::foreign_from),
|
||||
)
|
||||
.set_frm_message(frm_message)
|
||||
.set_merchant_decision(merchant_decision)
|
||||
.set_manual_retry_allowed(helpers::is_manual_retry_allowed(
|
||||
@ -696,8 +701,8 @@ where
|
||||
.as_ref()
|
||||
.and_then(|cus| cus.phone.as_ref().map(|s| s.to_owned())),
|
||||
mandate_id,
|
||||
shipping: address.shipping,
|
||||
billing: address.billing,
|
||||
shipping: payment_data.address.shipping,
|
||||
billing: payment_data.address.billing,
|
||||
cancellation_reason: payment_attempt.cancellation_reason,
|
||||
payment_token: payment_attempt.payment_token,
|
||||
metadata: payment_intent.metadata,
|
||||
|
||||
@ -861,13 +861,13 @@ pub async fn sync_refund_with_gateway_workflow(
|
||||
.await?
|
||||
}
|
||||
_ => {
|
||||
payment_sync::retry_sync_task(
|
||||
_ = payment_sync::retry_sync_task(
|
||||
&*state.store,
|
||||
response.connector,
|
||||
response.merchant_id,
|
||||
refund_tracker.to_owned(),
|
||||
)
|
||||
.await?
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -695,11 +695,6 @@ pub async fn create_event_and_trigger_outgoing_webhook<W: types::OutgoingWebhook
|
||||
}?;
|
||||
|
||||
if state.conf.webhooks.outgoing_enabled {
|
||||
let arbiter = actix::Arbiter::try_current()
|
||||
.ok_or(errors::ApiErrorResponse::WebhookProcessingFailure)
|
||||
.into_report()
|
||||
.attach_printable("arbiter retrieval failure")?;
|
||||
|
||||
let outgoing_webhook = api::OutgoingWebhook {
|
||||
merchant_id: merchant_account.merchant_id.clone(),
|
||||
event_id: event.event_id,
|
||||
@ -708,7 +703,9 @@ pub async fn create_event_and_trigger_outgoing_webhook<W: types::OutgoingWebhook
|
||||
timestamp: event.created_at,
|
||||
};
|
||||
|
||||
arbiter.spawn(async move {
|
||||
// Using a tokio spawn here and not arbiter because not all caller of this function
|
||||
// may have an actix arbiter
|
||||
tokio::spawn(async move {
|
||||
let result =
|
||||
trigger_webhook_to_merchant::<W>(merchant_account, outgoing_webhook, &state).await;
|
||||
|
||||
|
||||
@ -5,6 +5,8 @@ pub mod ext_traits;
|
||||
#[cfg(feature = "kv_store")]
|
||||
pub mod storage_partitioning;
|
||||
|
||||
use std::fmt::Debug;
|
||||
|
||||
use api_models::{enums, payments, webhooks};
|
||||
use base64::Engine;
|
||||
pub use common_utils::{
|
||||
@ -27,11 +29,12 @@ use crate::{
|
||||
consts,
|
||||
core::{
|
||||
errors::{self, CustomResult, RouterResult, StorageErrorExt},
|
||||
utils,
|
||||
utils, webhooks as webhooks_core,
|
||||
},
|
||||
db::StorageInterface,
|
||||
logger,
|
||||
routes::metrics,
|
||||
services,
|
||||
types::{
|
||||
self,
|
||||
domain::{
|
||||
@ -39,6 +42,7 @@ use crate::{
|
||||
types::{encrypt_optional, AsyncLift},
|
||||
},
|
||||
storage,
|
||||
transformers::{ForeignTryFrom, ForeignTryInto},
|
||||
},
|
||||
};
|
||||
|
||||
@ -669,3 +673,89 @@ pub fn add_apple_pay_payment_status_metrics(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ForeignTryFrom<enums::IntentStatus> for enums::EventType {
|
||||
type Error = errors::ValidationError;
|
||||
|
||||
fn foreign_try_from(value: enums::IntentStatus) -> Result<Self, Self::Error> {
|
||||
match value {
|
||||
enums::IntentStatus::Succeeded => Ok(Self::PaymentSucceeded),
|
||||
enums::IntentStatus::Failed => Ok(Self::PaymentFailed),
|
||||
enums::IntentStatus::Processing => Ok(Self::PaymentProcessing),
|
||||
enums::IntentStatus::RequiresMerchantAction
|
||||
| enums::IntentStatus::RequiresCustomerAction => Ok(Self::ActionRequired),
|
||||
_ => Err(errors::ValidationError::IncorrectValueProvided {
|
||||
field_name: "intent_status",
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn trigger_payments_webhook<F, Req, Op>(
|
||||
merchant_account: domain::MerchantAccount,
|
||||
payment_data: crate::core::payments::PaymentData<F>,
|
||||
req: Option<Req>,
|
||||
customer: Option<domain::Customer>,
|
||||
state: &crate::routes::AppState,
|
||||
operation: Op,
|
||||
) -> RouterResult<()>
|
||||
where
|
||||
F: Send + Clone + Sync,
|
||||
Op: Debug,
|
||||
{
|
||||
let status = payment_data.payment_intent.status;
|
||||
let payment_id = payment_data.payment_intent.payment_id.clone();
|
||||
let captures = payment_data
|
||||
.multiple_capture_data
|
||||
.clone()
|
||||
.map(|multiple_capture_data| {
|
||||
multiple_capture_data
|
||||
.get_all_captures()
|
||||
.into_iter()
|
||||
.cloned()
|
||||
.collect()
|
||||
});
|
||||
|
||||
if matches!(
|
||||
status,
|
||||
enums::IntentStatus::Succeeded | enums::IntentStatus::Failed
|
||||
) {
|
||||
let payments_response = crate::core::payments::transformers::payments_to_payments_response(
|
||||
req,
|
||||
payment_data,
|
||||
captures,
|
||||
customer,
|
||||
services::AuthFlow::Merchant,
|
||||
&state.conf.server,
|
||||
&operation,
|
||||
&state.conf.connector_request_reference_id_config,
|
||||
None,
|
||||
)?;
|
||||
|
||||
let event_type: enums::EventType = status
|
||||
.foreign_try_into()
|
||||
.into_report()
|
||||
.change_context(errors::ApiErrorResponse::WebhookProcessingFailure)
|
||||
.attach_printable("payment event type mapping failed")?;
|
||||
|
||||
if let services::ApplicationResponse::JsonWithHeaders((payments_response_json, _)) =
|
||||
payments_response
|
||||
{
|
||||
Box::pin(
|
||||
webhooks_core::create_event_and_trigger_appropriate_outgoing_webhook(
|
||||
state.clone(),
|
||||
merchant_account,
|
||||
event_type,
|
||||
diesel_models::enums::EventClass::Payments,
|
||||
None,
|
||||
payment_id,
|
||||
diesel_models::enums::EventObjectType::PaymentDetails,
|
||||
webhooks::OutgoingWebhookContent::PaymentDetails(payments_response_json),
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -4,11 +4,13 @@ use router_env::logger;
|
||||
use scheduler::{
|
||||
consumer::{self, types::process_data, workflows::ProcessTrackerWorkflow},
|
||||
db::process_tracker::ProcessTrackerExt,
|
||||
errors as sch_errors, utils, SchedulerAppState,
|
||||
errors as sch_errors, utils as scheduler_utils, SchedulerAppState,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
consts,
|
||||
core::{
|
||||
errors::StorageErrorExt,
|
||||
payment_methods::Oss,
|
||||
payments::{self as payment_flows, operations},
|
||||
},
|
||||
@ -20,6 +22,7 @@ use crate::{
|
||||
api,
|
||||
storage::{self, enums},
|
||||
},
|
||||
utils,
|
||||
};
|
||||
|
||||
pub struct PaymentsSyncWorkflow;
|
||||
@ -57,7 +60,7 @@ impl ProcessTrackerWorkflow<AppState> for PaymentsSyncWorkflow {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let (payment_data, _, _, _) =
|
||||
let (mut payment_data, _, customer, _) =
|
||||
payment_flows::payments_operation_core::<api::PSync, _, _, _, Oss>(
|
||||
state,
|
||||
merchant_account.clone(),
|
||||
@ -93,15 +96,72 @@ impl ProcessTrackerWorkflow<AppState> for PaymentsSyncWorkflow {
|
||||
let connector = payment_data
|
||||
.payment_attempt
|
||||
.connector
|
||||
.clone()
|
||||
.ok_or(sch_errors::ProcessTrackerError::MissingRequiredField)?;
|
||||
|
||||
retry_sync_task(
|
||||
let is_last_retry = retry_sync_task(
|
||||
db,
|
||||
connector,
|
||||
payment_data.payment_attempt.merchant_id,
|
||||
payment_data.payment_attempt.merchant_id.clone(),
|
||||
process,
|
||||
)
|
||||
.await?
|
||||
.await?;
|
||||
|
||||
// If the payment status is still processing and there is no connector transaction_id
|
||||
// then change the payment status to failed if all retries exceeded
|
||||
if is_last_retry
|
||||
&& payment_data.payment_attempt.status == enums::AttemptStatus::Pending
|
||||
&& payment_data
|
||||
.payment_attempt
|
||||
.connector_transaction_id
|
||||
.as_ref()
|
||||
.is_none()
|
||||
{
|
||||
let payment_intent_update = data_models::payments::payment_intent::PaymentIntentUpdate::PGStatusUpdate { status: api_models::enums::IntentStatus::Failed };
|
||||
let payment_attempt_update =
|
||||
data_models::payments::payment_attempt::PaymentAttemptUpdate::ErrorUpdate {
|
||||
connector: None,
|
||||
status: api_models::enums::AttemptStatus::AuthenticationFailed,
|
||||
error_code: None,
|
||||
error_message: None,
|
||||
error_reason: Some(Some(
|
||||
consts::REQUEST_TIMEOUT_ERROR_MESSAGE_FROM_PSYNC.to_string(),
|
||||
)),
|
||||
amount_capturable: Some(0),
|
||||
};
|
||||
|
||||
payment_data.payment_attempt = db
|
||||
.update_payment_attempt_with_attempt_id(
|
||||
payment_data.payment_attempt,
|
||||
payment_attempt_update,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?;
|
||||
|
||||
payment_data.payment_intent = db
|
||||
.update_payment_intent(
|
||||
payment_data.payment_intent,
|
||||
payment_intent_update,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?;
|
||||
|
||||
// Trigger the outgoing webhook to notify the merchant about failed payment
|
||||
let operation = operations::PaymentStatus;
|
||||
utils::trigger_payments_webhook::<_, api_models::payments::PaymentsRequest, _>(
|
||||
merchant_account,
|
||||
payment_data,
|
||||
None,
|
||||
customer,
|
||||
state,
|
||||
operation,
|
||||
)
|
||||
.await
|
||||
.map_err(|error| logger::warn!(payments_outgoing_webhook_error=?error))
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
@ -117,6 +177,26 @@ impl ProcessTrackerWorkflow<AppState> for PaymentsSyncWorkflow {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the next schedule time
|
||||
///
|
||||
/// The schedule time can be configured in configs by this key `pt_mapping_trustpay`
|
||||
/// ```json
|
||||
/// {
|
||||
/// "default_mapping": {
|
||||
/// "start_after": 60,
|
||||
/// "frequency": [300],
|
||||
/// "count": [5]
|
||||
/// },
|
||||
/// "max_retries_count": 5
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// This config represents
|
||||
///
|
||||
/// `start_after`: The first psync should happen after 60 seconds
|
||||
///
|
||||
/// `frequency` and `count`: The next 5 retries should have an interval of 300 seconds between them
|
||||
///
|
||||
pub async fn get_sync_process_schedule_time(
|
||||
db: &dyn StorageInterface,
|
||||
connector: &str,
|
||||
@ -142,25 +222,32 @@ pub async fn get_sync_process_schedule_time(
|
||||
process_data::ConnectorPTMapping::default()
|
||||
}
|
||||
};
|
||||
let time_delta = utils::get_schedule_time(mapping, merchant_id, retry_count + 1);
|
||||
let time_delta = scheduler_utils::get_schedule_time(mapping, merchant_id, retry_count + 1);
|
||||
|
||||
Ok(utils::get_time_from_delta(time_delta))
|
||||
Ok(scheduler_utils::get_time_from_delta(time_delta))
|
||||
}
|
||||
|
||||
/// Schedule the task for retry
|
||||
///
|
||||
/// Returns bool which indicates whether this was the last retry or not
|
||||
pub async fn retry_sync_task(
|
||||
db: &dyn StorageInterface,
|
||||
connector: String,
|
||||
merchant_id: String,
|
||||
pt: storage::ProcessTracker,
|
||||
) -> Result<(), sch_errors::ProcessTrackerError> {
|
||||
) -> Result<bool, sch_errors::ProcessTrackerError> {
|
||||
let schedule_time =
|
||||
get_sync_process_schedule_time(db, &connector, &merchant_id, pt.retry_count).await?;
|
||||
|
||||
match schedule_time {
|
||||
Some(s_time) => pt.retry(db.as_scheduler(), s_time).await,
|
||||
Some(s_time) => {
|
||||
pt.retry(db.as_scheduler(), s_time).await?;
|
||||
Ok(false)
|
||||
}
|
||||
None => {
|
||||
pt.finish_with_status(db.as_scheduler(), "RETRIES_EXCEEDED".to_string())
|
||||
.await
|
||||
.await?;
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -173,9 +260,11 @@ mod tests {
|
||||
#[test]
|
||||
fn test_get_default_schedule_time() {
|
||||
let schedule_time_delta =
|
||||
utils::get_schedule_time(process_data::ConnectorPTMapping::default(), "-", 0).unwrap();
|
||||
scheduler_utils::get_schedule_time(process_data::ConnectorPTMapping::default(), "-", 0)
|
||||
.unwrap();
|
||||
let first_retry_time_delta =
|
||||
utils::get_schedule_time(process_data::ConnectorPTMapping::default(), "-", 1).unwrap();
|
||||
scheduler_utils::get_schedule_time(process_data::ConnectorPTMapping::default(), "-", 1)
|
||||
.unwrap();
|
||||
let cpt_default = process_data::ConnectorPTMapping::default().default_mapping;
|
||||
assert_eq!(
|
||||
vec![schedule_time_delta, first_retry_time_delta],
|
||||
|
||||
@ -298,6 +298,7 @@ pub fn get_schedule_time(
|
||||
None => mapping.default_mapping,
|
||||
};
|
||||
|
||||
// For first try, get the `start_after` time
|
||||
if retry_count == 0 {
|
||||
Some(mapping.start_after)
|
||||
} else {
|
||||
@ -328,6 +329,7 @@ pub fn get_pm_schedule_time(
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the delay based on the retry count
|
||||
fn get_delay<'a>(
|
||||
retry_count: i32,
|
||||
mut array: impl Iterator<Item = (&'a i32, &'a i32)>,
|
||||
|
||||
Reference in New Issue
Block a user