feat(payments): add api locking for payments core (#1898)

This commit is contained in:
Abhishek Marrivagu
2023-09-25 15:55:39 +05:30
committed by GitHub
parent a48f9865bc
commit 5d66156132
44 changed files with 802 additions and 158 deletions

View File

@ -15,11 +15,13 @@ use crate::compatibility::stripe::webhooks as stripe_webhooks;
use crate::{
consts,
core::{
api_locking,
errors::{self, ConnectorErrorExt, CustomResult, RouterResponse},
payments, refunds,
},
db::StorageInterface,
logger,
routes::{metrics::request::add_attributes, AppState},
routes::{lock_utils, metrics::request::add_attributes, AppState},
services,
types::{
self as router_types, api, domain,
@ -32,7 +34,6 @@ use crate::{
const OUTGOING_WEBHOOK_TIMEOUT_SECS: u64 = 5;
const MERCHANT_ID: &str = "merchant_id";
#[instrument(skip_all)]
pub async fn payments_incoming_webhook_flow<W: types::OutgoingWebhookType>(
state: AppState,
merchant_account: domain::MerchantAccount,
@ -47,6 +48,27 @@ pub async fn payments_incoming_webhook_flow<W: types::OutgoingWebhookType>(
};
let payments_response = match webhook_details.object_reference_id {
api_models::webhooks::ObjectReferenceId::PaymentId(id) => {
let payment_id = get_payment_id(
state.store.as_ref(),
&id,
merchant_account.merchant_id.as_str(),
merchant_account.storage_scheme,
)
.await?;
let lock_action = api_locking::LockAction::Hold {
input: super::api_locking::LockingInput {
unique_locking_key: payment_id,
api_identifier: lock_utils::ApiIdentifier::Payments,
override_lock_retries: None,
},
};
lock_action
.clone()
.perform_locking_action(&state, merchant_account.merchant_id.to_string())
.await?;
let response = payments::payments_core::<api::PSync, api::PaymentsResponse, _, _, _>(
state.clone(),
merchant_account.clone(),
@ -69,6 +91,10 @@ pub async fn payments_incoming_webhook_flow<W: types::OutgoingWebhookType>(
)
.await;
lock_action
.free_lock_action(&state, merchant_account.merchant_id.to_owned())
.await?;
match response {
Ok(value) => value,
Err(err)
@ -957,6 +983,44 @@ pub async fn webhooks_core<W: types::OutgoingWebhookType>(
Ok(response)
}
#[inline]
pub async fn get_payment_id(
db: &dyn StorageInterface,
payment_id: &api::PaymentIdType,
merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> errors::RouterResult<String> {
let pay_id = || async {
match payment_id {
api_models::payments::PaymentIdType::PaymentIntentId(ref id) => Ok(id.to_string()),
api_models::payments::PaymentIdType::ConnectorTransactionId(ref id) => db
.find_payment_attempt_by_merchant_id_connector_txn_id(
merchant_id,
id,
storage_scheme,
)
.await
.map(|p| p.payment_id),
api_models::payments::PaymentIdType::PaymentAttemptId(ref id) => db
.find_payment_attempt_by_attempt_id_merchant_id(id, merchant_id, storage_scheme)
.await
.map(|p| p.payment_id),
api_models::payments::PaymentIdType::PreprocessingId(ref id) => db
.find_payment_attempt_by_preprocessing_id_merchant_id(
id,
merchant_id,
storage_scheme,
)
.await
.map(|p| p.payment_id),
}
};
pay_id()
.await
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
}
async fn fetch_mca_and_connector(
state: AppState,
merchant_account: &domain::MerchantAccount,