mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-30 09:38:33 +08:00
refactor(router): add psync task to process tracker after building connector request in payments flow (#1603)
This commit is contained in:
committed by
GitHub
parent
e713b62ae3
commit
e978e9d66b
@ -35,7 +35,7 @@ use crate::{
|
|||||||
db::StorageInterface,
|
db::StorageInterface,
|
||||||
logger,
|
logger,
|
||||||
routes::{metrics, AppState},
|
routes::{metrics, AppState},
|
||||||
scheduler::utils as pt_utils,
|
scheduler::{utils as pt_utils, workflows::payment_sync},
|
||||||
services::{self, api::Authenticate},
|
services::{self, api::Authenticate},
|
||||||
types::{
|
types::{
|
||||||
self, api, domain,
|
self, api, domain,
|
||||||
@ -119,6 +119,26 @@ where
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
let schedule_time = match &connector {
|
||||||
|
Some(api::ConnectorCallType::Single(connector_data)) => {
|
||||||
|
if should_add_task_to_process_tracker(&payment_data) {
|
||||||
|
payment_sync::get_sync_process_schedule_time(
|
||||||
|
&*state.store,
|
||||||
|
connector_data.connector.id(),
|
||||||
|
&merchant_account.merchant_id,
|
||||||
|
0,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.into_report()
|
||||||
|
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||||
|
.attach_printable("Failed while getting process schedule time")?
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
let (mut payment_data, tokenization_action) =
|
let (mut payment_data, tokenization_action) =
|
||||||
get_connector_tokenization_action(state, &operation, payment_data, &validate_result)
|
get_connector_tokenization_action(state, &operation, payment_data, &validate_result)
|
||||||
.await?;
|
.await?;
|
||||||
@ -133,19 +153,6 @@ where
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if let Some(connector_details) = connector {
|
if let Some(connector_details) = connector {
|
||||||
if should_add_task_to_process_tracker(&payment_data) {
|
|
||||||
operation
|
|
||||||
.to_domain()?
|
|
||||||
.add_task_to_process_tracker(
|
|
||||||
state,
|
|
||||||
&payment_data.payment_attempt,
|
|
||||||
validate_result.requeue,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.map_err(|error| logger::error!(process_tracker_error=?error))
|
|
||||||
.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
payment_data = match connector_details {
|
payment_data = match connector_details {
|
||||||
api::ConnectorCallType::Single(connector) => {
|
api::ConnectorCallType::Single(connector) => {
|
||||||
let router_data = call_connector_service(
|
let router_data = call_connector_service(
|
||||||
@ -159,6 +166,8 @@ where
|
|||||||
call_connector_action,
|
call_connector_action,
|
||||||
tokenization_action,
|
tokenization_action,
|
||||||
updated_customer,
|
updated_customer,
|
||||||
|
validate_result.requeue,
|
||||||
|
schedule_time,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
@ -521,6 +530,8 @@ pub async fn call_connector_service<F, RouterDReq, ApiRequest>(
|
|||||||
call_connector_action: CallConnectorAction,
|
call_connector_action: CallConnectorAction,
|
||||||
tokenization_action: TokenizationAction,
|
tokenization_action: TokenizationAction,
|
||||||
updated_customer: Option<storage::CustomerUpdate>,
|
updated_customer: Option<storage::CustomerUpdate>,
|
||||||
|
requeue: bool,
|
||||||
|
schedule_time: Option<time::PrimitiveDateTime>,
|
||||||
) -> RouterResult<types::RouterData<F, RouterDReq, types::PaymentsResponseData>>
|
) -> RouterResult<types::RouterData<F, RouterDReq, types::PaymentsResponseData>>
|
||||||
where
|
where
|
||||||
F: Send + Clone + Sync,
|
F: Send + Clone + Sync,
|
||||||
@ -592,6 +603,20 @@ where
|
|||||||
(None, false)
|
(None, false)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if should_add_task_to_process_tracker(payment_data) {
|
||||||
|
operation
|
||||||
|
.to_domain()?
|
||||||
|
.add_task_to_process_tracker(
|
||||||
|
state,
|
||||||
|
&payment_data.payment_attempt,
|
||||||
|
requeue,
|
||||||
|
schedule_time,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|error| logger::error!(process_tracker_error=?error))
|
||||||
|
.ok();
|
||||||
|
}
|
||||||
|
|
||||||
// Update the payment trackers just before calling the connector
|
// Update the payment trackers just before calling the connector
|
||||||
// Since the request is already built in the previous step,
|
// Since the request is already built in the previous step,
|
||||||
// there should be no error in request construction from hyperswitch end
|
// there should be no error in request construction from hyperswitch end
|
||||||
|
|||||||
@ -28,7 +28,7 @@ use crate::{
|
|||||||
},
|
},
|
||||||
db::StorageInterface,
|
db::StorageInterface,
|
||||||
routes::{metrics, AppState},
|
routes::{metrics, AppState},
|
||||||
scheduler::{metrics as scheduler_metrics, workflows::payment_sync},
|
scheduler::metrics as scheduler_metrics,
|
||||||
services,
|
services,
|
||||||
types::{
|
types::{
|
||||||
api::{self, admin, enums as api_enums, CustomerAcceptanceExt, MandateValidationFieldsExt},
|
api::{self, admin, enums as api_enums, CustomerAcceptanceExt, MandateValidationFieldsExt},
|
||||||
@ -764,27 +764,12 @@ pub async fn add_domain_task_to_pt<Op>(
|
|||||||
state: &AppState,
|
state: &AppState,
|
||||||
payment_attempt: &storage::PaymentAttempt,
|
payment_attempt: &storage::PaymentAttempt,
|
||||||
requeue: bool,
|
requeue: bool,
|
||||||
|
schedule_time: Option<time::PrimitiveDateTime>,
|
||||||
) -> CustomResult<(), errors::ApiErrorResponse>
|
) -> CustomResult<(), errors::ApiErrorResponse>
|
||||||
where
|
where
|
||||||
Op: std::fmt::Debug,
|
Op: std::fmt::Debug,
|
||||||
{
|
{
|
||||||
if check_if_operation_confirm(operation) {
|
if check_if_operation_confirm(operation) {
|
||||||
let connector_name = payment_attempt
|
|
||||||
.connector
|
|
||||||
.clone()
|
|
||||||
.ok_or(errors::ApiErrorResponse::InternalServerError)?;
|
|
||||||
|
|
||||||
let schedule_time = payment_sync::get_sync_process_schedule_time(
|
|
||||||
&*state.store,
|
|
||||||
&connector_name,
|
|
||||||
&payment_attempt.merchant_id,
|
|
||||||
0,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.into_report()
|
|
||||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
|
||||||
.attach_printable("Failed while getting process schedule time")?;
|
|
||||||
|
|
||||||
match schedule_time {
|
match schedule_time {
|
||||||
Some(stime) => {
|
Some(stime) => {
|
||||||
if !requeue {
|
if !requeue {
|
||||||
|
|||||||
@ -121,6 +121,7 @@ pub trait Domain<F: Clone, R>: Send + Sync {
|
|||||||
_db: &'a AppState,
|
_db: &'a AppState,
|
||||||
_payment_attempt: &storage::PaymentAttempt,
|
_payment_attempt: &storage::PaymentAttempt,
|
||||||
_requeue: bool,
|
_requeue: bool,
|
||||||
|
_schedule_time: Option<time::PrimitiveDateTime>,
|
||||||
) -> CustomResult<(), errors::ApiErrorResponse> {
|
) -> CustomResult<(), errors::ApiErrorResponse> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -292,6 +292,7 @@ impl<F: Clone + Send> Domain<F, api::PaymentsRequest> for CompleteAuthorize {
|
|||||||
_state: &'a AppState,
|
_state: &'a AppState,
|
||||||
_payment_attempt: &storage::PaymentAttempt,
|
_payment_attempt: &storage::PaymentAttempt,
|
||||||
_requeue: bool,
|
_requeue: bool,
|
||||||
|
_schedule_time: Option<time::PrimitiveDateTime>,
|
||||||
) -> CustomResult<(), errors::ApiErrorResponse> {
|
) -> CustomResult<(), errors::ApiErrorResponse> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -344,8 +344,9 @@ impl<F: Clone + Send> Domain<F, api::PaymentsRequest> for PaymentConfirm {
|
|||||||
state: &'a AppState,
|
state: &'a AppState,
|
||||||
payment_attempt: &storage::PaymentAttempt,
|
payment_attempt: &storage::PaymentAttempt,
|
||||||
requeue: bool,
|
requeue: bool,
|
||||||
|
schedule_time: Option<time::PrimitiveDateTime>,
|
||||||
) -> CustomResult<(), errors::ApiErrorResponse> {
|
) -> CustomResult<(), errors::ApiErrorResponse> {
|
||||||
helpers::add_domain_task_to_pt(self, state, payment_attempt, requeue).await
|
helpers::add_domain_task_to_pt(self, state, payment_attempt, requeue, schedule_time).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_connector<'a>(
|
async fn get_connector<'a>(
|
||||||
|
|||||||
@ -311,6 +311,7 @@ impl<F: Clone + Send> Domain<F, api::PaymentsRequest> for PaymentCreate {
|
|||||||
_state: &'a AppState,
|
_state: &'a AppState,
|
||||||
_payment_attempt: &storage::PaymentAttempt,
|
_payment_attempt: &storage::PaymentAttempt,
|
||||||
_requeue: bool,
|
_requeue: bool,
|
||||||
|
_schedule_time: Option<time::PrimitiveDateTime>,
|
||||||
) -> CustomResult<(), errors::ApiErrorResponse> {
|
) -> CustomResult<(), errors::ApiErrorResponse> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -94,8 +94,9 @@ impl<F: Clone + Send> Domain<F, api::PaymentsRequest> for PaymentStatus {
|
|||||||
state: &'a AppState,
|
state: &'a AppState,
|
||||||
payment_attempt: &storage::PaymentAttempt,
|
payment_attempt: &storage::PaymentAttempt,
|
||||||
requeue: bool,
|
requeue: bool,
|
||||||
|
schedule_time: Option<time::PrimitiveDateTime>,
|
||||||
) -> CustomResult<(), errors::ApiErrorResponse> {
|
) -> CustomResult<(), errors::ApiErrorResponse> {
|
||||||
helpers::add_domain_task_to_pt(self, state, payment_attempt, requeue).await
|
helpers::add_domain_task_to_pt(self, state, payment_attempt, requeue, schedule_time).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_connector<'a>(
|
async fn get_connector<'a>(
|
||||||
|
|||||||
@ -379,6 +379,7 @@ impl<F: Clone + Send> Domain<F, api::PaymentsRequest> for PaymentUpdate {
|
|||||||
_state: &'a AppState,
|
_state: &'a AppState,
|
||||||
_payment_attempt: &storage::PaymentAttempt,
|
_payment_attempt: &storage::PaymentAttempt,
|
||||||
_requeue: bool,
|
_requeue: bool,
|
||||||
|
_schedule_time: Option<time::PrimitiveDateTime>,
|
||||||
) -> CustomResult<(), errors::ApiErrorResponse> {
|
) -> CustomResult<(), errors::ApiErrorResponse> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user