mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-28 04:04:55 +08:00
feat(payments): make database calls parallel for payments_confirm operation (#2098)
This commit is contained in:
@ -4,6 +4,7 @@ use api_models::enums::FrmSuggestion;
|
||||
use async_trait::async_trait;
|
||||
use common_utils::ext_traits::{AsyncExt, Encode};
|
||||
use error_stack::ResultExt;
|
||||
use futures::FutureExt;
|
||||
use router_derive::PaymentOperation;
|
||||
use router_env::{instrument, tracing};
|
||||
|
||||
@ -50,16 +51,27 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
|
||||
let db = &*state.store;
|
||||
let merchant_id = &merchant_account.merchant_id;
|
||||
let storage_scheme = merchant_account.storage_scheme;
|
||||
let (mut payment_intent, mut payment_attempt, currency, amount, connector_response);
|
||||
let (currency, amount);
|
||||
|
||||
let payment_id = payment_id
|
||||
.get_payment_intent_id()
|
||||
.change_context(errors::ApiErrorResponse::PaymentNotFound)?;
|
||||
|
||||
payment_intent = db
|
||||
// Stage 1
|
||||
|
||||
let payment_intent_fut = db
|
||||
.find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id, storage_scheme)
|
||||
.await
|
||||
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?;
|
||||
.map(|x| x.change_context(errors::ApiErrorResponse::PaymentNotFound));
|
||||
|
||||
let mandate_details_fut = helpers::get_token_pm_type_mandate_details(
|
||||
state,
|
||||
request,
|
||||
mandate_type.clone(),
|
||||
merchant_account,
|
||||
);
|
||||
|
||||
let (mut payment_intent, mandate_details) =
|
||||
futures::try_join!(payment_intent_fut, mandate_details_fut)?;
|
||||
|
||||
helpers::validate_customer_access(&payment_intent, auth_flow, request)?;
|
||||
|
||||
@ -80,15 +92,49 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
|
||||
&payment_intent,
|
||||
merchant_account.intent_fulfillment_time,
|
||||
)?;
|
||||
payment_attempt = db
|
||||
|
||||
let customer_details = helpers::get_customer_details_from_request(request);
|
||||
|
||||
// Stage 2
|
||||
|
||||
let payment_attempt_fut = db
|
||||
.find_payment_attempt_by_payment_id_merchant_id_attempt_id(
|
||||
payment_intent.payment_id.as_str(),
|
||||
merchant_id,
|
||||
payment_intent.active_attempt_id.as_str(),
|
||||
storage_scheme,
|
||||
)
|
||||
.await
|
||||
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?;
|
||||
.map(|x| x.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound));
|
||||
|
||||
let shipping_address_fut = helpers::get_address_for_payment_request(
|
||||
db,
|
||||
request.shipping.as_ref(),
|
||||
payment_intent.shipping_address_id.as_deref(),
|
||||
merchant_id,
|
||||
payment_intent
|
||||
.customer_id
|
||||
.as_ref()
|
||||
.or(customer_details.customer_id.as_ref()),
|
||||
key_store,
|
||||
);
|
||||
|
||||
let billing_address_fut = helpers::get_address_for_payment_request(
|
||||
db,
|
||||
request.billing.as_ref(),
|
||||
payment_intent.billing_address_id.as_deref(),
|
||||
merchant_id,
|
||||
payment_intent
|
||||
.customer_id
|
||||
.as_ref()
|
||||
.or(customer_details.customer_id.as_ref()),
|
||||
key_store,
|
||||
);
|
||||
|
||||
let (mut payment_attempt, shipping_address, billing_address) = futures::try_join!(
|
||||
payment_attempt_fut,
|
||||
shipping_address_fut,
|
||||
billing_address_fut
|
||||
)?;
|
||||
|
||||
payment_intent.order_details = request
|
||||
.get_order_details_as_value()
|
||||
@ -101,6 +147,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
|
||||
|
||||
(payment_intent, payment_attempt) = attempt_type
|
||||
.modify_payment_intent_and_payment_attempt(
|
||||
// 3
|
||||
request,
|
||||
payment_intent,
|
||||
payment_attempt,
|
||||
@ -120,13 +167,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
|
||||
setup_mandate,
|
||||
recurring_mandate_payment_data,
|
||||
mandate_connector,
|
||||
) = helpers::get_token_pm_type_mandate_details(
|
||||
state,
|
||||
request,
|
||||
mandate_type.clone(),
|
||||
merchant_account,
|
||||
)
|
||||
.await?;
|
||||
) = mandate_details;
|
||||
|
||||
let browser_info = request
|
||||
.browser_info
|
||||
@ -140,8 +181,6 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
|
||||
|
||||
helpers::validate_card_data(request.payment_method_data.clone())?;
|
||||
|
||||
let customer_details = helpers::get_customer_details_from_request(request);
|
||||
|
||||
let token = token.or_else(|| payment_attempt.payment_token.clone());
|
||||
|
||||
helpers::validate_pm_or_token_given(
|
||||
@ -176,34 +215,29 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
|
||||
.or_else(|| customer_details.customer_id.clone()),
|
||||
)?;
|
||||
|
||||
let shipping_address = helpers::get_address_for_payment_request(
|
||||
db,
|
||||
request.shipping.as_ref(),
|
||||
payment_intent.shipping_address_id.as_deref(),
|
||||
merchant_id,
|
||||
payment_intent
|
||||
.customer_id
|
||||
.as_ref()
|
||||
.or(customer_details.customer_id.as_ref()),
|
||||
key_store,
|
||||
)
|
||||
.await?;
|
||||
let billing_address = helpers::get_address_for_payment_request(
|
||||
db,
|
||||
request.billing.as_ref(),
|
||||
payment_intent.billing_address_id.as_deref(),
|
||||
merchant_id,
|
||||
payment_intent
|
||||
.customer_id
|
||||
.as_ref()
|
||||
.or(customer_details.customer_id.as_ref()),
|
||||
key_store,
|
||||
)
|
||||
.await?;
|
||||
let creds_identifier = request
|
||||
.merchant_connector_details
|
||||
.as_ref()
|
||||
.map(|mcd| mcd.creds_identifier.to_owned());
|
||||
|
||||
connector_response = attempt_type
|
||||
.get_connector_response(&payment_attempt, db, storage_scheme)
|
||||
.await?;
|
||||
let config_update_fut = request
|
||||
.merchant_connector_details
|
||||
.to_owned()
|
||||
.async_map(|mcd| async {
|
||||
helpers::insert_merchant_connector_creds_to_config(
|
||||
db,
|
||||
merchant_account.merchant_id.as_str(),
|
||||
mcd,
|
||||
)
|
||||
.await
|
||||
})
|
||||
.map(|x| x.transpose());
|
||||
|
||||
let connector_response_fut =
|
||||
attempt_type.get_connector_response(&payment_attempt, db, storage_scheme);
|
||||
|
||||
let (connector_response, _) =
|
||||
futures::try_join!(connector_response_fut, config_update_fut)?;
|
||||
|
||||
payment_intent.shipping_address_id = shipping_address.clone().map(|i| i.address_id);
|
||||
payment_intent.billing_address_id = billing_address.clone().map(|i| i.address_id);
|
||||
@ -236,24 +270,6 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
|
||||
.clone()
|
||||
.or(payment_attempt.business_sub_label);
|
||||
|
||||
let creds_identifier = request
|
||||
.merchant_connector_details
|
||||
.as_ref()
|
||||
.map(|mcd| mcd.creds_identifier.to_owned());
|
||||
request
|
||||
.merchant_connector_details
|
||||
.to_owned()
|
||||
.async_map(|mcd| async {
|
||||
helpers::insert_merchant_connector_creds_to_config(
|
||||
db,
|
||||
merchant_account.merchant_id.as_str(),
|
||||
mcd,
|
||||
)
|
||||
.await
|
||||
})
|
||||
.await
|
||||
.transpose()?;
|
||||
|
||||
// The operation merges mandate data from both request and payment_attempt
|
||||
let setup_mandate = setup_mandate.map(|mandate_data| api_models::payments::MandateData {
|
||||
customer_acceptance: mandate_data.customer_acceptance,
|
||||
@ -378,7 +394,6 @@ impl<F: Clone + Send> Domain<F, api::PaymentsRequest> for PaymentConfirm {
|
||||
|
||||
#[async_trait]
|
||||
impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsRequest> for PaymentConfirm {
|
||||
#[instrument(skip_all)]
|
||||
async fn update_trackers<'b>(
|
||||
&'b self,
|
||||
db: &dyn StorageInterface,
|
||||
@ -442,30 +457,6 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsRequest> for Paymen
|
||||
|
||||
let business_sub_label = payment_data.payment_attempt.business_sub_label.clone();
|
||||
let authentication_type = payment_data.payment_attempt.authentication_type;
|
||||
payment_data.payment_attempt = db
|
||||
.update_payment_attempt_with_attempt_id(
|
||||
payment_data.payment_attempt,
|
||||
storage::PaymentAttemptUpdate::ConfirmUpdate {
|
||||
amount: payment_data.amount.into(),
|
||||
currency: payment_data.currency,
|
||||
status: attempt_status,
|
||||
payment_method,
|
||||
authentication_type,
|
||||
browser_info,
|
||||
connector,
|
||||
payment_token,
|
||||
payment_method_data: additional_pm_data,
|
||||
payment_method_type,
|
||||
payment_experience,
|
||||
business_sub_label,
|
||||
straight_through_algorithm,
|
||||
error_code,
|
||||
error_message,
|
||||
},
|
||||
storage_scheme,
|
||||
)
|
||||
.await
|
||||
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?;
|
||||
|
||||
let (shipping_address, billing_address) = (
|
||||
payment_data.payment_intent.shipping_address_id.clone(),
|
||||
@ -487,7 +478,31 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsRequest> for Paymen
|
||||
let order_details = payment_data.payment_intent.order_details.clone();
|
||||
let metadata = payment_data.payment_intent.metadata.clone();
|
||||
|
||||
payment_data.payment_intent = db
|
||||
let payment_attempt_fut = db
|
||||
.update_payment_attempt_with_attempt_id(
|
||||
payment_data.payment_attempt,
|
||||
storage::PaymentAttemptUpdate::ConfirmUpdate {
|
||||
amount: payment_data.amount.into(),
|
||||
currency: payment_data.currency,
|
||||
status: attempt_status,
|
||||
payment_method,
|
||||
authentication_type,
|
||||
browser_info,
|
||||
connector,
|
||||
payment_token,
|
||||
payment_method_data: additional_pm_data,
|
||||
payment_method_type,
|
||||
payment_experience,
|
||||
business_sub_label,
|
||||
straight_through_algorithm,
|
||||
error_code,
|
||||
error_message,
|
||||
},
|
||||
storage_scheme,
|
||||
)
|
||||
.map(|x| x.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound));
|
||||
|
||||
let payment_intent_fut = db
|
||||
.update_payment_intent(
|
||||
payment_data.payment_intent,
|
||||
storage::PaymentIntentUpdate::Update {
|
||||
@ -509,20 +524,27 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsRequest> for Paymen
|
||||
},
|
||||
storage_scheme,
|
||||
)
|
||||
.await
|
||||
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?;
|
||||
.map(|x| x.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound));
|
||||
|
||||
if let Some((updated_customer, customer)) = updated_customer.zip(customer) {
|
||||
db.update_customer_by_customer_id_merchant_id(
|
||||
customer.customer_id.to_owned(),
|
||||
customer.merchant_id.to_owned(),
|
||||
updated_customer,
|
||||
key_store,
|
||||
)
|
||||
.await
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable("Failed to update CustomerConnector in customer")?;
|
||||
};
|
||||
let customer_fut = Box::pin(async {
|
||||
if let Some((updated_customer, customer)) = updated_customer.zip(customer) {
|
||||
db.update_customer_by_customer_id_merchant_id(
|
||||
customer.customer_id.to_owned(),
|
||||
customer.merchant_id.to_owned(),
|
||||
updated_customer,
|
||||
key_store,
|
||||
)
|
||||
.await
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable("Failed to update CustomerConnector in customer")?;
|
||||
};
|
||||
Ok::<_, error_stack::Report<errors::ApiErrorResponse>>(())
|
||||
});
|
||||
|
||||
let (payment_intent, payment_attempt, _) =
|
||||
futures::try_join!(payment_intent_fut, payment_attempt_fut, customer_fut)?;
|
||||
payment_data.payment_intent = payment_intent;
|
||||
payment_data.payment_attempt = payment_attempt;
|
||||
|
||||
Ok((Box::new(self), payment_data))
|
||||
}
|
||||
|
||||
@ -2,6 +2,7 @@ use std::collections::HashMap;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use error_stack::ResultExt;
|
||||
use futures::FutureExt;
|
||||
use router_derive;
|
||||
|
||||
use super::{Operation, PostUpdateTracker};
|
||||
@ -506,29 +507,44 @@ async fn payment_response_update_tracker<F: Clone, T: types::Capturable>(
|
||||
None => None,
|
||||
};
|
||||
|
||||
payment_data.payment_attempt = match payment_attempt_update {
|
||||
Some(payment_attempt_update) => db
|
||||
.update_payment_attempt_with_attempt_id(
|
||||
payment_data.payment_attempt,
|
||||
payment_attempt_update,
|
||||
storage_scheme,
|
||||
)
|
||||
.await
|
||||
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?,
|
||||
None => payment_data.payment_attempt,
|
||||
};
|
||||
// Stage 1
|
||||
|
||||
let payment_attempt = payment_data.payment_attempt.clone();
|
||||
let connector_response = payment_data.connector_response.clone();
|
||||
|
||||
let payment_attempt_fut = Box::pin(async move {
|
||||
Ok::<_, error_stack::Report<errors::ApiErrorResponse>>(match payment_attempt_update {
|
||||
Some(payment_attempt_update) => db
|
||||
.update_payment_attempt_with_attempt_id(
|
||||
payment_attempt,
|
||||
payment_attempt_update,
|
||||
storage_scheme,
|
||||
)
|
||||
.await
|
||||
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?,
|
||||
None => payment_attempt,
|
||||
})
|
||||
});
|
||||
|
||||
let connector_response_fut = Box::pin(async move {
|
||||
Ok::<_, error_stack::Report<errors::ApiErrorResponse>>(match connector_response_update {
|
||||
Some(connector_response_update) => db
|
||||
.update_connector_response(
|
||||
connector_response,
|
||||
connector_response_update,
|
||||
storage_scheme,
|
||||
)
|
||||
.await
|
||||
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?,
|
||||
None => connector_response,
|
||||
})
|
||||
});
|
||||
|
||||
let (payment_attempt, connector_response) =
|
||||
futures::try_join!(payment_attempt_fut, connector_response_fut)?;
|
||||
payment_data.payment_attempt = payment_attempt;
|
||||
payment_data.connector_response = connector_response;
|
||||
|
||||
payment_data.connector_response = match connector_response_update {
|
||||
Some(connector_response_update) => db
|
||||
.update_connector_response(
|
||||
payment_data.connector_response,
|
||||
connector_response_update,
|
||||
storage_scheme,
|
||||
)
|
||||
.await
|
||||
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?,
|
||||
None => payment_data.connector_response,
|
||||
};
|
||||
let amount_captured = get_total_amount_captured(
|
||||
router_data.request,
|
||||
router_data.amount_captured,
|
||||
@ -546,23 +562,24 @@ async fn payment_response_update_tracker<F: Clone, T: types::Capturable>(
|
||||
},
|
||||
};
|
||||
|
||||
payment_data.payment_intent = db
|
||||
let payment_intent_fut = db
|
||||
.update_payment_intent(
|
||||
payment_data.payment_intent,
|
||||
payment_data.payment_intent.clone(),
|
||||
payment_intent_update,
|
||||
storage_scheme,
|
||||
)
|
||||
.await
|
||||
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?;
|
||||
.map(|x| x.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound));
|
||||
|
||||
// When connector requires redirection for mandate creation it can update the connector mandate_id during Psync
|
||||
mandate::update_connector_mandate_id(
|
||||
let mandate_update_fut = mandate::update_connector_mandate_id(
|
||||
db,
|
||||
router_data.merchant_id,
|
||||
payment_data.mandate_id.clone(),
|
||||
router_data.response.clone(),
|
||||
)
|
||||
.await?;
|
||||
);
|
||||
|
||||
let (payment_intent, _) = futures::try_join!(payment_intent_fut, mandate_update_fut)?;
|
||||
payment_data.payment_intent = payment_intent;
|
||||
|
||||
Ok(payment_data)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user