feat: change async-bb8 fork and tokio spawn for concurrent database calls (#2774)

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: akshay-97 <adiosphobian@gmail.com>
Co-authored-by: akshay.s <akshay.s@juspay.in>
Co-authored-by: Kartikeya Hegde <karthikey.hegde@juspay.in>
This commit is contained in:
Arun Raj M
2023-11-16 10:27:34 +05:30
committed by GitHub
parent 496245d990
commit d634fdeac3
69 changed files with 1516 additions and 721 deletions

View File

@ -79,29 +79,35 @@ pub async fn payments_incoming_webhook_flow<
.perform_locking_action(&state, merchant_account.merchant_id.to_string())
.await?;
let response =
payments::payments_core::<api::PSync, api::PaymentsResponse, _, _, _, Ctx>(
state.clone(),
merchant_account.clone(),
key_store,
payments::operations::PaymentStatus,
api::PaymentsRetrieveRequest {
resource_id: id,
merchant_id: Some(merchant_account.merchant_id.clone()),
force_sync: true,
connector: None,
param: None,
merchant_connector_details: None,
client_secret: None,
expand_attempts: None,
expand_captures: None,
},
services::AuthFlow::Merchant,
consume_or_trigger_flow,
None,
HeaderPayload::default(),
)
.await;
let response = Box::pin(payments::payments_core::<
api::PSync,
api::PaymentsResponse,
_,
_,
_,
Ctx,
>(
state.clone(),
merchant_account.clone(),
key_store,
payments::operations::PaymentStatus,
api::PaymentsRetrieveRequest {
resource_id: id,
merchant_id: Some(merchant_account.merchant_id.clone()),
force_sync: true,
connector: None,
param: None,
merchant_connector_details: None,
client_secret: None,
expand_attempts: None,
expand_captures: None,
},
services::AuthFlow::Merchant,
consume_or_trigger_flow,
None,
HeaderPayload::default(),
))
.await;
lock_action
.free_lock_action(&state, merchant_account.merchant_id.to_owned())
@ -572,7 +578,14 @@ async fn bank_transfer_webhook_flow<W: types::OutgoingWebhookType, Ctx: PaymentM
payment_token: payment_attempt.payment_token,
..Default::default()
};
payments::payments_core::<api::Authorize, api::PaymentsResponse, _, _, _, Ctx>(
Box::pin(payments::payments_core::<
api::Authorize,
api::PaymentsResponse,
_,
_,
_,
Ctx,
>(
state.clone(),
merchant_account.to_owned(),
key_store,
@ -582,7 +595,7 @@ async fn bank_transfer_webhook_flow<W: types::OutgoingWebhookType, Ctx: PaymentM
payments::CallConnectorAction::Trigger,
None,
HeaderPayload::default(),
)
))
.await
} else {
Err(report!(
@ -854,14 +867,14 @@ pub async fn webhooks_wrapper<W: types::OutgoingWebhookType, Ctx: PaymentMethodR
connector_name_or_mca_id: &str,
body: actix_web::web::Bytes,
) -> RouterResponse<serde_json::Value> {
let (application_response, _webhooks_response_tracker) = webhooks_core::<W, Ctx>(
let (application_response, _webhooks_response_tracker) = Box::pin(webhooks_core::<W, Ctx>(
state,
req,
merchant_account,
key_store,
connector_name_or_mca_id,
body,
)
))
.await?;
Ok(application_response)
@ -1089,18 +1102,18 @@ pub async fn webhooks_core<W: types::OutgoingWebhookType, Ctx: PaymentMethodRetr
})?;
match flow_type {
api::WebhookFlow::Payment => payments_incoming_webhook_flow::<W, Ctx>(
api::WebhookFlow::Payment => Box::pin(payments_incoming_webhook_flow::<W, Ctx>(
state.clone(),
merchant_account,
business_profile,
key_store,
webhook_details,
source_verified,
)
))
.await
.attach_printable("Incoming webhook flow for payments failed")?,
api::WebhookFlow::Refund => refunds_incoming_webhook_flow::<W>(
api::WebhookFlow::Refund => Box::pin(refunds_incoming_webhook_flow::<W>(
state.clone(),
merchant_account,
business_profile,
@ -1109,7 +1122,7 @@ pub async fn webhooks_core<W: types::OutgoingWebhookType, Ctx: PaymentMethodRetr
connector_name.as_str(),
source_verified,
event_type,
)
))
.await
.attach_printable("Incoming webhook flow for refunds failed")?,
@ -1126,14 +1139,14 @@ pub async fn webhooks_core<W: types::OutgoingWebhookType, Ctx: PaymentMethodRetr
.await
.attach_printable("Incoming webhook flow for disputes failed")?,
api::WebhookFlow::BankTransfer => bank_transfer_webhook_flow::<W, Ctx>(
api::WebhookFlow::BankTransfer => Box::pin(bank_transfer_webhook_flow::<W, Ctx>(
state.clone(),
merchant_account,
business_profile,
key_store,
webhook_details,
source_verified,
)
))
.await
.attach_printable("Incoming bank-transfer webhook flow failed")?,