feat(router): dynamically toggle KV for merchant and refactoring around it (#79)

This commit is contained in:
ItsMeShashank
2022-12-09 13:10:44 +05:30
committed by GitHub
parent 5b470bf8f5
commit f76f3e2f54
38 changed files with 980 additions and 482 deletions

View File

@ -274,7 +274,7 @@ pub async fn create_payment_connector(
req: api::PaymentConnectorCreate,
merchant_id: &String,
) -> RouterResponse<api::PaymentConnectorCreate> {
store
let _merchant_account = store
.find_merchant_account_by_merchant_id(merchant_id)
.await
.map_err(|error| {
@ -332,6 +332,13 @@ pub async fn retrieve_payment_connector(
merchant_id: String,
merchant_connector_id: i32,
) -> RouterResponse<api::PaymentConnectorCreate> {
let _merchant_account = store
.find_merchant_account_by_merchant_id(&merchant_id)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound)
})?;
let mca = store
.find_by_merchant_connector_account_merchant_id_merchant_connector_id(
&merchant_id,
@ -366,11 +373,13 @@ pub async fn update_payment_connector(
merchant_connector_id: i32,
req: api::PaymentConnectorCreate,
) -> RouterResponse<api::PaymentConnectorCreate> {
db.find_merchant_account_by_merchant_id(merchant_id)
let _merchant_account = db
.find_merchant_account_by_merchant_id(merchant_id)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound)
})?;
let mca = db
.find_by_merchant_connector_account_merchant_id_merchant_connector_id(
merchant_id,
@ -430,6 +439,13 @@ pub async fn delete_payment_connector(
merchant_id: String,
merchant_connector_id: i32,
) -> RouterResponse<api::DeleteMcaResponse> {
let _merchant_account = db
.find_merchant_account_by_merchant_id(&merchant_id)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound)
})?;
let is_deleted = db
.delete_merchant_connector_account_by_merchant_id_merchant_connector_id(
&merchant_id,

View File

@ -65,21 +65,22 @@ where
let operation: BoxedOperation<F, Req> = Box::new(operation);
let (operation, merchant_id, payment_id, mandate_type) = operation
let (operation, validate_result) = operation
.to_validate_request()?
.validate_request(&req, &merchant_account)?;
tracing::Span::current().record("payment_id", &format!("{:?}", payment_id));
tracing::Span::current().record("payment_id", &format!("{:?}", validate_result.payment_id));
let (operation, mut payment_data, customer_details) = operation
.to_get_tracker()?
.get_trackers(
state,
&payment_id,
merchant_id,
&validate_result.payment_id,
validate_result.merchant_id,
connector.connector_name,
&req,
mandate_type,
validate_result.mandate_type,
validate_result.storage_scheme,
)
.await?;
@ -89,7 +90,7 @@ where
&*state.store,
&mut payment_data,
customer_details,
merchant_id,
validate_result.merchant_id,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)?;
@ -103,13 +104,20 @@ where
&payment_data.payment_attempt,
&payment_data.payment_method_data,
&payment_data.token,
validate_result.storage_scheme,
)
.await?;
payment_data.payment_method_data = payment_method_data;
let (operation, mut payment_data) = operation
.to_update_tracker()?
.update_trackers(&*state.store, &payment_id, payment_data, customer.clone())
.update_trackers(
&*state.store,
&validate_result.payment_id,
payment_data,
customer.clone(),
validate_result.storage_scheme,
)
.await?;
operation
@ -121,7 +129,7 @@ where
payment_data = call_connector_service(
state,
&merchant_account,
&payment_id,
&validate_result.payment_id,
connector,
&operation,
payment_data,
@ -295,13 +303,20 @@ where
customer,
payment_data,
call_connector_action,
merchant_account.storage_scheme,
)
.await;
let response = helpers::amap(res, |response| async {
let operation = helpers::response_operation::<F, Req>();
let payment_data = operation
.to_post_update_tracker()?
.update_tracker(db, payment_id, payment_data, Some(response))
.update_tracker(
db,
payment_id,
payment_data,
Some(response),
merchant_account.storage_scheme,
)
.await?;
Ok(payment_data)
})
@ -451,9 +466,10 @@ pub async fn list_payments(
) -> RouterResponse<api::PaymentListResponse> {
validate_payment_list_request(&constraints)?;
let merchant_id = &merchant.merchant_id;
let payment_intent = filter_by_constraints(db, &constraints, merchant_id)
.await
.map_err(|err| err.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound))?;
let payment_intent =
filter_by_constraints(db, &constraints, merchant_id, merchant.storage_scheme)
.await
.map_err(|err| err.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound))?;
let data: Vec<api::PaymentsResponse> = payment_intent.into_iter().map(From::from).collect();
utils::when(

View File

@ -11,7 +11,10 @@ use crate::{
core::{errors::RouterResult, payments},
routes::AppState,
services,
types::{self, api, storage},
types::{
self, api,
storage::{self, enums},
},
};
#[async_trait]
@ -33,6 +36,7 @@ pub trait Feature<F, T> {
maybe_customer: &Option<storage::Customer>,
payment_data: PaymentData<F>,
call_connector_action: payments::CallConnectorAction,
storage_scheme: enums::MerchantStorageScheme,
) -> (RouterResult<Self>, PaymentData<F>)
where
Self: std::marker::Sized,

View File

@ -60,6 +60,7 @@ impl Feature<api::Authorize, types::PaymentsAuthorizeData>
customer: &Option<storage::Customer>,
payment_data: PaymentData<api::Authorize>,
call_connector_action: payments::CallConnectorAction,
storage_scheme: enums::MerchantStorageScheme,
) -> (RouterResult<Self>, PaymentData<api::Authorize>)
where
dyn api::Connector: services::ConnectorIntegration<
@ -75,6 +76,7 @@ impl Feature<api::Authorize, types::PaymentsAuthorizeData>
customer,
Some(true),
call_connector_action,
storage_scheme,
)
.await;
@ -92,6 +94,7 @@ impl PaymentsAuthorizeRouterData {
maybe_customer: &Option<storage::Customer>,
confirm: Option<bool>,
call_connector_action: payments::CallConnectorAction,
_storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<PaymentsAuthorizeRouterData>
where
dyn api::Connector + Sync: services::ConnectorIntegration<

View File

@ -8,7 +8,11 @@ use crate::{
},
routes::AppState,
services,
types::{self, api, storage, PaymentsCancelRouterData, PaymentsResponseData},
types::{
self, api,
storage::{self, enums},
PaymentsCancelRouterData, PaymentsResponseData,
},
};
#[async_trait]
@ -41,6 +45,7 @@ impl Feature<api::Void, types::PaymentsCancelData>
customer: &Option<storage::Customer>,
payment_data: PaymentData<api::Void>,
call_connector_action: payments::CallConnectorAction,
_storage_scheme: enums::MerchantStorageScheme,
) -> (RouterResult<Self>, PaymentData<api::Void>)
where
dyn api::Connector: services::ConnectorIntegration<

View File

@ -9,7 +9,9 @@ use crate::{
routes::AppState,
services,
types::{
self, api, storage, PaymentsCaptureData, PaymentsCaptureRouterData, PaymentsResponseData,
self, api,
storage::{self, enums},
PaymentsCaptureData, PaymentsCaptureRouterData, PaymentsResponseData,
},
};
@ -44,6 +46,7 @@ impl Feature<api::Capture, types::PaymentsCaptureData>
customer: &Option<storage::Customer>,
payment_data: PaymentData<api::Capture>,
call_connector_action: payments::CallConnectorAction,
_storage_scheme: enums::MerchantStorageScheme,
) -> (RouterResult<Self>, PaymentData<api::Capture>)
where
dyn api::Connector: services::ConnectorIntegration<

View File

@ -8,7 +8,11 @@ use crate::{
},
routes::AppState,
services,
types::{self, api, storage, PaymentsResponseData, PaymentsSyncData, PaymentsSyncRouterData},
types::{
self, api,
storage::{self, enums},
PaymentsResponseData, PaymentsSyncData, PaymentsSyncRouterData,
},
};
#[async_trait]
@ -43,6 +47,7 @@ impl Feature<api::PSync, types::PaymentsSyncData>
customer: &Option<storage::Customer>,
payment_data: PaymentData<api::PSync>,
call_connector_action: payments::CallConnectorAction,
_storage_scheme: enums::MerchantStorageScheme,
) -> (RouterResult<Self>, PaymentData<api::PSync>)
where
dyn api::Connector: services::ConnectorIntegration<

View File

@ -47,6 +47,7 @@ impl Feature<api::Verify, types::VerifyRequestData> for types::VerifyRouterData
customer: &Option<storage::Customer>,
payment_data: PaymentData<api::Verify>,
call_connector_action: payments::CallConnectorAction,
storage_scheme: enums::MerchantStorageScheme,
) -> (RouterResult<Self>, PaymentData<api::Verify>)
where
dyn api::Connector: services::ConnectorIntegration<
@ -62,6 +63,7 @@ impl Feature<api::Verify, types::VerifyRequestData> for types::VerifyRouterData
customer,
Some(true),
call_connector_action,
storage_scheme,
)
.await;
@ -77,6 +79,7 @@ impl types::VerifyRouterData {
maybe_customer: &Option<storage::Customer>,
confirm: Option<bool>,
call_connector_action: payments::CallConnectorAction,
_storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<Self>
where
dyn api::Connector + Sync: services::ConnectorIntegration<

View File

@ -807,9 +807,10 @@ pub(super) async fn filter_by_constraints(
db: &dyn StorageInterface,
constraints: &api::PaymentListConstraints,
merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<storage::PaymentIntent>, errors::StorageError> {
let result = db
.filter_payment_intent_by_constraints(merchant_id, constraints)
.filter_payment_intent_by_constraints(merchant_id, constraints, storage_scheme)
.await?;
Ok(result)
}

View File

@ -68,18 +68,20 @@ pub trait Operation<F: Clone, T>: Send + std::fmt::Debug {
}
}
pub struct ValidateResult<'a> {
pub merchant_id: &'a str,
pub payment_id: api::PaymentIdType,
pub mandate_type: Option<api::MandateTxnType>,
pub storage_scheme: enums::MerchantStorageScheme,
}
#[allow(clippy::type_complexity)]
pub trait ValidateRequest<F, R> {
fn validate_request<'a, 'b>(
&'b self,
request: &R,
merchant_account: &'a storage::MerchantAccount,
) -> RouterResult<(
BoxedOperation<'b, F, R>,
&'a str,
api::PaymentIdType,
Option<api::MandateTxnType>,
)>;
) -> RouterResult<(BoxedOperation<'b, F, R>, ValidateResult<'a>)>;
}
#[async_trait]
@ -93,6 +95,7 @@ pub trait GetTracker<F, D, R>: Send {
connector: types::Connector,
request: &R,
mandate_type: Option<api::MandateTxnType>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(BoxedOperation<'a, F, R>, D, Option<CustomerDetails>)>;
}
@ -116,6 +119,7 @@ pub trait Domain<F: Clone, R>: Send + Sync {
payment_attempt: &storage::PaymentAttempt,
request: &Option<api::PaymentMethod>,
token: &Option<String>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(BoxedOperation<'a, F, R>, Option<api::PaymentMethod>)>;
async fn add_task_to_process_tracker<'a>(
@ -135,6 +139,7 @@ pub trait UpdateTracker<F, D, R>: Send {
payment_id: &api::PaymentIdType,
payment_data: D,
customer: Option<Customer>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(BoxedOperation<'b, F, R>, D)>
where
F: 'b + Send;
@ -148,6 +153,7 @@ pub trait PostUpdateTracker<F, D, R>: Send {
payment_id: &api::PaymentIdType,
payment_data: D,
response: Option<types::RouterData<F, R, PaymentsResponseData>>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<D>
where
F: 'b + Send;
@ -192,6 +198,7 @@ where
payment_attempt: &storage::PaymentAttempt,
request: &Option<api::PaymentMethod>,
token: &Option<String>,
_storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'a, F, api::PaymentsRequest>,
Option<api::PaymentMethod>,
@ -280,6 +287,7 @@ where
payment_attempt: &storage::PaymentAttempt,
request: &Option<api::PaymentMethod>,
token: &Option<String>,
_storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'a, F, api::PaymentsRetrieveRequest>,
Option<api::PaymentMethod>,
@ -336,6 +344,7 @@ where
_payment_attempt: &storage::PaymentAttempt,
_request: &Option<api::PaymentMethod>,
_token: &Option<String>,
_storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'a, F, api::PaymentsCaptureRequest>,
Option<api::PaymentMethod>,
@ -384,6 +393,7 @@ where
_payment_attempt: &storage::PaymentAttempt,
_request: &Option<api::PaymentMethod>,
_token: &Option<String>,
_storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'a, F, api::PaymentsCancelRequest>,
Option<api::PaymentMethod>,

View File

@ -9,7 +9,7 @@ use super::{BoxedOperation, Domain, GetTracker, Operation, UpdateTracker, Valida
use crate::{
core::{
errors::{self, RouterResult, StorageErrorExt},
payments::{CustomerDetails, PaymentAddress, PaymentData},
payments::{operations, CustomerDetails, PaymentAddress, PaymentData},
},
db::StorageInterface,
routes::AppState,
@ -36,6 +36,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsCancelRequest>
_connector: Connector,
request: &api::PaymentsCancelRequest,
_mandate_type: Option<api::MandateTxnType>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'a, F, api::PaymentsCancelRequest>,
PaymentData<F>,
@ -47,14 +48,18 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsCancelRequest>
.change_context(errors::ApiErrorResponse::PaymentNotFound)?;
let payment_intent = db
.find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id)
.find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id, storage_scheme)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
})?;
let mut payment_attempt = db
.find_payment_attempt_by_payment_id_merchant_id(&payment_id, merchant_id)
.find_payment_attempt_by_payment_id_merchant_id(
&payment_id,
merchant_id,
storage_scheme,
)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
@ -65,6 +70,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsCancelRequest>
&payment_attempt.payment_id,
&payment_attempt.merchant_id,
&payment_attempt.txn_id,
storage_scheme,
)
.await
.map_err(|error| {
@ -116,6 +122,7 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsCancelRequest> for
_payment_id: &api::PaymentIdType,
mut payment_data: PaymentData<F>,
_customer: Option<Customer>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'b, F, api::PaymentsCancelRequest>,
PaymentData<F>,
@ -131,6 +138,7 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsCancelRequest> for
status: enums::AttemptStatus::VoidInitiated,
cancellation_reason,
},
storage_scheme,
)
.await
.map_err(|err| err.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound))?;
@ -147,15 +155,16 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsCancelRequest> for Payment
merchant_account: &'a storage::MerchantAccount,
) -> RouterResult<(
BoxedOperation<'b, F, api::PaymentsCancelRequest>,
&'a str,
api::PaymentIdType,
Option<api::MandateTxnType>,
operations::ValidateResult<'a>,
)> {
Ok((
Box::new(self),
&merchant_account.merchant_id,
api::PaymentIdType::PaymentIntentId(request.payment_id.to_owned()),
None,
operations::ValidateResult {
merchant_id: &merchant_account.merchant_id,
payment_id: api::PaymentIdType::PaymentIntentId(request.payment_id.to_owned()),
mandate_type: None,
storage_scheme: merchant_account.storage_scheme,
},
))
}
}

View File

@ -8,11 +8,16 @@ use super::{BoxedOperation, Domain, GetTracker, Operation, UpdateTracker, Valida
use crate::{
core::{
errors::{self, RouterResult, StorageErrorExt},
payments::{self, helpers},
payments::{self, helpers, operations},
},
db::StorageInterface,
routes::AppState,
types::{api, api::PaymentsCaptureRequest, storage, Connector},
types::{
api,
api::PaymentsCaptureRequest,
storage::{self, enums},
Connector,
},
utils::OptionExt,
};
@ -33,6 +38,7 @@ impl<F: Send + Clone> GetTracker<F, payments::PaymentData<F>, api::PaymentsCaptu
_connector: Connector,
request: &PaymentsCaptureRequest,
_mandate_type: Option<api::MandateTxnType>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'a, F, api::PaymentsCaptureRequest>,
payments::PaymentData<F>,
@ -46,7 +52,7 @@ impl<F: Send + Clone> GetTracker<F, payments::PaymentData<F>, api::PaymentsCaptu
.change_context(errors::ApiErrorResponse::PaymentNotFound)?;
payment_intent = db
.find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id)
.find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id, storage_scheme)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
@ -57,7 +63,11 @@ impl<F: Send + Clone> GetTracker<F, payments::PaymentData<F>, api::PaymentsCaptu
helpers::validate_amount_to_capture(payment_intent.amount, request.amount_to_capture)?;
payment_attempt = db
.find_payment_attempt_by_payment_id_merchant_id(&payment_id, merchant_id)
.find_payment_attempt_by_payment_id_merchant_id(
&payment_id,
merchant_id,
storage_scheme,
)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
@ -82,6 +92,7 @@ impl<F: Send + Clone> GetTracker<F, payments::PaymentData<F>, api::PaymentsCaptu
&payment_attempt.payment_id,
&payment_attempt.merchant_id,
&payment_attempt.txn_id,
storage_scheme,
)
.await
.map_err(|error| {
@ -140,6 +151,7 @@ impl<F: Clone> UpdateTracker<F, payments::PaymentData<F>, api::PaymentsCaptureRe
_payment_id: &api::PaymentIdType,
payment_data: payments::PaymentData<F>,
_customer: Option<storage::Customer>,
_storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'b, F, api::PaymentsCaptureRequest>,
payments::PaymentData<F>,
@ -159,9 +171,7 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsCaptureRequest> for Paymen
merchant_account: &'a storage::MerchantAccount,
) -> RouterResult<(
BoxedOperation<'b, F, api::PaymentsCaptureRequest>,
&'a str,
api::PaymentIdType,
Option<api::MandateTxnType>,
operations::ValidateResult<'a>,
)> {
let payment_id = request
.payment_id
@ -170,9 +180,12 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsCaptureRequest> for Paymen
Ok((
Box::new(self),
&merchant_account.merchant_id,
api::PaymentIdType::PaymentIntentId(payment_id.to_owned()),
None,
operations::ValidateResult {
merchant_id: &merchant_account.merchant_id,
payment_id: api::PaymentIdType::PaymentIntentId(payment_id.to_owned()),
mandate_type: None,
storage_scheme: merchant_account.storage_scheme,
},
))
}
}

View File

@ -9,7 +9,7 @@ use super::{BoxedOperation, Domain, GetTracker, Operation, UpdateTracker, Valida
use crate::{
core::{
errors::{self, RouterResult, StorageErrorExt},
payments::{helpers, CustomerDetails, PaymentAddress, PaymentData},
payments::{helpers, operations, CustomerDetails, PaymentAddress, PaymentData},
utils as core_utils,
},
db::StorageInterface,
@ -37,6 +37,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
_connector: Connector,
request: &api::PaymentsRequest,
mandate_type: Option<api::MandateTxnType>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'a, F, api::PaymentsRequest>,
PaymentData<F>,
@ -54,7 +55,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
.await?;
payment_intent = db
.find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id)
.find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id, storage_scheme)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
@ -78,7 +79,11 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
})?;
payment_attempt = db
.find_payment_attempt_by_payment_id_merchant_id(&payment_id, merchant_id)
.find_payment_attempt_by_payment_id_merchant_id(
&payment_id,
merchant_id,
storage_scheme,
)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
@ -95,6 +100,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
&payment_attempt.payment_id,
&payment_attempt.merchant_id,
&payment_attempt.txn_id,
storage_scheme,
)
.await
.map_err(|error| {
@ -167,6 +173,7 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsRequest> for Paymen
_payment_id: &api::PaymentIdType,
mut payment_data: PaymentData<F>,
_customer: Option<storage::Customer>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(BoxedOperation<'b, F, api::PaymentsRequest>, PaymentData<F>)>
where
F: 'b + Send,
@ -194,6 +201,7 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsRequest> for Paymen
payment_method,
browser_info,
},
storage_scheme,
)
.await
.map_err(|error| {
@ -213,6 +221,7 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsRequest> for Paymen
shipping_address_id: shipping_address,
billing_address_id: billing_address,
},
storage_scheme,
)
.await
.map_err(|error| {
@ -231,9 +240,7 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsRequest> for PaymentConfir
merchant_account: &'a storage::MerchantAccount,
) -> RouterResult<(
BoxedOperation<'b, F, api::PaymentsRequest>,
&'a str,
api::PaymentIdType,
Option<api::MandateTxnType>,
operations::ValidateResult<'a>,
)> {
let given_payment_id = match &request.payment_id {
Some(id_type) => Some(
@ -256,9 +263,12 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsRequest> for PaymentConfir
Ok((
Box::new(self),
&merchant_account.merchant_id,
api::PaymentIdType::PaymentIntentId(payment_id),
mandate_type,
operations::ValidateResult {
merchant_id: &merchant_account.merchant_id,
payment_id: api::PaymentIdType::PaymentIntentId(payment_id),
mandate_type,
storage_scheme: merchant_account.storage_scheme,
},
))
}
}

View File

@ -11,7 +11,7 @@ use crate::{
consts,
core::{
errors::{self, RouterResult, StorageErrorExt},
payments::{self, helpers, CustomerDetails, PaymentAddress, PaymentData},
payments::{self, helpers, operations, CustomerDetails, PaymentAddress, PaymentData},
utils as core_utils,
},
db::StorageInterface,
@ -40,6 +40,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
connector: types::Connector,
request: &api::PaymentsRequest,
mandate_type: Option<api::MandateTxnType>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'a, F, api::PaymentsRequest>,
PaymentData<F>,
@ -79,15 +80,18 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
})?;
payment_attempt = match db
.insert_payment_attempt(Self::make_payment_attempt(
&payment_id,
merchant_id,
connector,
money,
payment_method_type,
request,
browser_info,
))
.insert_payment_attempt(
Self::make_payment_attempt(
&payment_id,
merchant_id,
connector,
money,
payment_method_type,
request,
browser_info,
),
storage_scheme,
)
.await
{
Ok(payment_attempt) => Ok(payment_attempt),
@ -95,25 +99,32 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
Err(err) => match err.current_context() {
errors::StorageError::DatabaseError(errors::DatabaseError::UniqueViolation) => {
is_update = true;
db.find_payment_attempt_by_payment_id_merchant_id(&payment_id, merchant_id)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
})
db.find_payment_attempt_by_payment_id_merchant_id(
&payment_id,
merchant_id,
storage_scheme,
)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
})
}
_ => Err(err).change_context(errors::ApiErrorResponse::InternalServerError),
},
}?;
payment_intent = match db
.insert_payment_intent(Self::make_payment_intent(
&payment_id,
merchant_id,
&connector.to_string(),
money,
request,
shipping_address.clone().map(|x| x.address_id),
billing_address.clone().map(|x| x.address_id),
))
.insert_payment_intent(
Self::make_payment_intent(
&payment_id,
merchant_id,
&connector.to_string(),
money,
request,
shipping_address.clone().map(|x| x.address_id),
billing_address.clone().map(|x| x.address_id),
),
storage_scheme,
)
.await
{
Ok(payment_intent) => Ok(payment_intent),
@ -121,18 +132,25 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
Err(err) => match err.current_context() {
errors::StorageError::DatabaseError(errors::DatabaseError::UniqueViolation) => {
is_update = true;
db.find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
})
db.find_payment_intent_by_payment_id_merchant_id(
&payment_id,
merchant_id,
storage_scheme,
)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
})
}
_ => Err(err).change_context(errors::ApiErrorResponse::InternalServerError),
},
}?;
connector_response = match db
.insert_connector_response(Self::make_connector_response(&payment_attempt))
.insert_connector_response(
Self::make_connector_response(&payment_attempt),
storage_scheme,
)
.await
{
Ok(connector_resp) => Ok(connector_resp),
@ -195,6 +213,7 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsRequest> for Paymen
_payment_id: &api::PaymentIdType,
mut payment_data: PaymentData<F>,
_customer: Option<storage::Customer>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(BoxedOperation<'b, F, api::PaymentsRequest>, PaymentData<F>)>
where
F: 'b + Send,
@ -225,6 +244,7 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsRequest> for Paymen
shipping_address_id: None,
billing_address_id: None,
},
storage_scheme,
)
.await
.map_err(|error| {
@ -248,9 +268,7 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsRequest> for PaymentCreate
merchant_account: &'a storage::MerchantAccount,
) -> RouterResult<(
BoxedOperation<'b, F, api::PaymentsRequest>,
&'a str,
api::PaymentIdType,
Option<api::MandateTxnType>,
operations::ValidateResult<'a>,
)> {
let given_payment_id = match &request.payment_id {
Some(id_type) => Some(
@ -281,9 +299,12 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsRequest> for PaymentCreate
Ok((
Box::new(self),
&merchant_account.merchant_id,
api::PaymentIdType::PaymentIntentId(payment_id),
mandate_type,
operations::ValidateResult {
merchant_id: &merchant_account.merchant_id,
payment_id: api::PaymentIdType::PaymentIntentId(payment_id),
mandate_type,
storage_scheme: merchant_account.storage_scheme,
},
))
}
}

View File

@ -12,7 +12,7 @@ use crate::{
consts,
core::{
errors::{self, RouterResult, StorageErrorExt},
payments::{self, helpers, Operation, PaymentData},
payments::{self, helpers, operations, Operation, PaymentData},
utils as core_utils,
},
db::StorageInterface,
@ -36,9 +36,7 @@ impl<F: Send + Clone> ValidateRequest<F, api::VerifyRequest> for PaymentMethodVa
merchant_account: &'a types::storage::MerchantAccount,
) -> RouterResult<(
BoxedOperation<'b, F, api::VerifyRequest>,
&'a str,
api::PaymentIdType,
Option<api::MandateTxnType>,
operations::ValidateResult<'a>,
)> {
let request_merchant_id = request.merchant_id.as_deref();
helpers::validate_merchant_id(&merchant_account.merchant_id, request_merchant_id)
@ -49,9 +47,12 @@ impl<F: Send + Clone> ValidateRequest<F, api::VerifyRequest> for PaymentMethodVa
Ok((
Box::new(self),
&merchant_account.merchant_id,
api::PaymentIdType::PaymentIntentId(validation_id),
mandate_type,
operations::ValidateResult {
merchant_id: &merchant_account.merchant_id,
payment_id: api::PaymentIdType::PaymentIntentId(validation_id),
mandate_type,
storage_scheme: merchant_account.storage_scheme,
},
))
}
}
@ -67,6 +68,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::VerifyRequest> for Paym
connector: types::Connector,
request: &api::VerifyRequest,
_mandate_type: Option<api::MandateTxnType>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'a, F, api::VerifyRequest>,
PaymentData<F>,
@ -80,13 +82,16 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::VerifyRequest> for Paym
.change_context(errors::ApiErrorResponse::InternalServerError)?;
payment_attempt = match db
.insert_payment_attempt(Self::make_payment_attempt(
&payment_id,
merchant_id,
connector,
request.payment_method,
request,
))
.insert_payment_attempt(
Self::make_payment_attempt(
&payment_id,
merchant_id,
connector,
request.payment_method,
request,
),
storage_scheme,
)
.await
{
Ok(payment_attempt) => Ok(payment_attempt),
@ -96,12 +101,10 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::VerifyRequest> for Paym
}?;
payment_intent = match db
.insert_payment_intent(Self::make_payment_intent(
&payment_id,
merchant_id,
connector,
request,
))
.insert_payment_intent(
Self::make_payment_intent(&payment_id, merchant_id, connector, request),
storage_scheme,
)
.await
{
Ok(payment_intent) => Ok(payment_intent),
@ -111,7 +114,10 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::VerifyRequest> for Paym
}?;
connector_response = match db
.insert_connector_response(PaymentCreate::make_connector_response(&payment_attempt))
.insert_connector_response(
PaymentCreate::make_connector_response(&payment_attempt),
storage_scheme,
)
.await
{
Ok(connector_resp) => Ok(connector_resp),
@ -159,6 +165,7 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::VerifyRequest> for PaymentM
_payment_id: &api::PaymentIdType,
mut payment_data: PaymentData<F>,
_customer: Option<storage::Customer>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(BoxedOperation<'b, F, api::VerifyRequest>, PaymentData<F>)>
where
F: 'b + Send,
@ -178,6 +185,7 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::VerifyRequest> for PaymentM
shipping_address_id: None,
billing_address_id: None,
},
storage_scheme,
)
.await
.map_err(|err| {
@ -230,6 +238,7 @@ where
payment_attempt: &storage::PaymentAttempt,
request: &Option<api::PaymentMethod>,
token: &Option<String>,
_storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'a, F, api::VerifyRequest>,
Option<api::PaymentMethod>,

View File

@ -36,6 +36,7 @@ impl<F: Clone> PostUpdateTracker<F, PaymentData<F>, types::PaymentsAuthorizeData
response: Option<
types::RouterData<F, types::PaymentsAuthorizeData, types::PaymentsResponseData>,
>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<PaymentData<F>>
where
F: 'b + Send,
@ -44,7 +45,14 @@ impl<F: Clone> PostUpdateTracker<F, PaymentData<F>, types::PaymentsAuthorizeData
payment_data.mandate_id = payment_data
.mandate_id
.or_else(|| router_data.request.mandate_id.clone());
Ok(payment_response_ut(db, payment_id, payment_data, Some(router_data)).await?)
Ok(payment_response_ut(
db,
payment_id,
payment_data,
Some(router_data),
storage_scheme,
)
.await?)
}
}
@ -58,11 +66,12 @@ impl<F: Clone> PostUpdateTracker<F, PaymentData<F>, types::PaymentsSyncData> for
response: Option<
types::RouterData<F, types::PaymentsSyncData, types::PaymentsResponseData>,
>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<PaymentData<F>>
where
F: 'b + Send,
{
Ok(payment_response_ut(db, payment_id, payment_data, response).await?)
Ok(payment_response_ut(db, payment_id, payment_data, response, storage_scheme).await?)
}
}
@ -78,11 +87,12 @@ impl<F: Clone> PostUpdateTracker<F, PaymentData<F>, types::PaymentsCaptureData>
response: Option<
types::RouterData<F, types::PaymentsCaptureData, types::PaymentsResponseData>,
>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<PaymentData<F>>
where
F: 'b + Send,
{
Ok(payment_response_ut(db, payment_id, payment_data, response).await?)
Ok(payment_response_ut(db, payment_id, payment_data, response, storage_scheme).await?)
}
}
@ -96,11 +106,12 @@ impl<F: Clone> PostUpdateTracker<F, PaymentData<F>, types::PaymentsCancelData> f
response: Option<
types::RouterData<F, types::PaymentsCancelData, types::PaymentsResponseData>,
>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<PaymentData<F>>
where
F: 'b + Send,
{
Ok(payment_response_ut(db, payment_id, payment_data, response).await?)
Ok(payment_response_ut(db, payment_id, payment_data, response, storage_scheme).await?)
}
}
@ -114,11 +125,12 @@ impl<F: Clone> PostUpdateTracker<F, PaymentData<F>, types::VerifyRequestData> fo
response: Option<
types::RouterData<F, types::VerifyRequestData, types::PaymentsResponseData>,
>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<PaymentData<F>>
where
F: 'b + Send,
{
Ok(payment_response_ut(db, payment_id, payment_data, response).await?)
Ok(payment_response_ut(db, payment_id, payment_data, response, storage_scheme).await?)
}
}
@ -127,6 +139,7 @@ async fn payment_response_ut<F: Clone, T>(
_payment_id: &api::PaymentIdType,
mut payment_data: PaymentData<F>,
response: Option<types::RouterData<F, T, types::PaymentsResponseData>>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<PaymentData<F>> {
let router_data = response.ok_or(report!(errors::ApiErrorResponse::InternalServerError))?;
let mut connector_response_data = None;
@ -156,7 +169,11 @@ async fn payment_response_ut<F: Clone, T>(
};
payment_data.payment_attempt = db
.update_payment_attempt(payment_data.payment_attempt, payment_attempt_update)
.update_payment_attempt(
payment_data.payment_attempt,
payment_attempt_update,
storage_scheme,
)
.await
.map_err(|error| error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound))?;
@ -180,11 +197,15 @@ async fn payment_response_ut<F: Clone, T>(
encoded_data: payment_data.connector_response.encoded_data.clone(),
};
db.update_connector_response(payment_data.connector_response, connector_response_update)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
})?
db.update_connector_response(
payment_data.connector_response,
connector_response_update,
storage_scheme,
)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
})?
}
None => payment_data.connector_response,
};
@ -201,7 +222,11 @@ async fn payment_response_ut<F: Clone, T>(
};
payment_data.payment_intent = db
.update_payment_intent(payment_data.payment_intent, payment_intent_update)
.update_payment_intent(
payment_data.payment_intent,
payment_intent_update,
storage_scheme,
)
.await
.map_err(|error| error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound))?;

View File

@ -9,7 +9,7 @@ use super::{BoxedOperation, Domain, GetTracker, Operation, UpdateTracker, Valida
use crate::{
core::{
errors::{self, RouterResult, StorageErrorExt},
payments::{self, helpers, PaymentData},
payments::{self, helpers, operations, PaymentData},
},
db::StorageInterface,
routes::AppState,
@ -38,6 +38,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsSessionRequest>
_connector: Connector,
request: &api::PaymentsSessionRequest,
_mandate_type: Option<api::MandateTxnType>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'a, F, api::PaymentsSessionRequest>,
PaymentData<F>,
@ -50,14 +51,18 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsSessionRequest>
let db = &*state.store;
let mut payment_attempt = db
.find_payment_attempt_by_payment_id_merchant_id(&payment_id, merchant_id)
.find_payment_attempt_by_payment_id_merchant_id(
&payment_id,
merchant_id,
storage_scheme,
)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
})?;
let mut payment_intent = db
.find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id)
.find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id, storage_scheme)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
@ -81,6 +86,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsSessionRequest>
payment_intent.shipping_address_id.as_deref(),
)
.await?;
let billing_address = helpers::get_address_for_payment_request(
db,
None,
@ -97,6 +103,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsSessionRequest>
&payment_intent.payment_id,
&payment_intent.merchant_id,
&payment_attempt.txn_id,
storage_scheme,
)
.await
.map_err(|error| {
@ -140,6 +147,7 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsSessionRequest> for
_payment_id: &api::PaymentIdType,
payment_data: PaymentData<F>,
_customer: Option<storage::Customer>,
_storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'b, F, api::PaymentsSessionRequest>,
PaymentData<F>,
@ -159,9 +167,7 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsSessionRequest> for Paymen
merchant_account: &'a storage::MerchantAccount,
) -> RouterResult<(
BoxedOperation<'b, F, api::PaymentsSessionRequest>,
&'a str,
api::PaymentIdType,
Option<api::MandateTxnType>,
operations::ValidateResult<'a>,
)> {
//paymentid is already generated and should be sent in the request
let given_payment_id = request
@ -171,9 +177,12 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsSessionRequest> for Paymen
Ok((
Box::new(self),
&merchant_account.merchant_id,
api::PaymentIdType::PaymentIntentId(given_payment_id),
None,
operations::ValidateResult {
merchant_id: &merchant_account.merchant_id,
payment_id: api::PaymentIdType::PaymentIntentId(given_payment_id),
mandate_type: None,
storage_scheme: merchant_account.storage_scheme,
},
))
}
}
@ -217,6 +226,7 @@ where
_payment_attempt: &storage::PaymentAttempt,
_request: &Option<api::PaymentMethod>,
_token: &Option<String>,
_storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'b, F, api::PaymentsSessionRequest>,
Option<api::PaymentMethod>,

View File

@ -9,7 +9,7 @@ use super::{BoxedOperation, Domain, GetTracker, Operation, UpdateTracker, Valida
use crate::{
core::{
errors::{self, CustomResult, RouterResult, StorageErrorExt},
payments::{helpers, CustomerDetails, PaymentAddress, PaymentData},
payments::{helpers, operations, CustomerDetails, PaymentAddress, PaymentData},
},
db::StorageInterface,
routes::AppState,
@ -36,6 +36,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsStartRequest> f
_connector: Connector,
_request: &api::PaymentsStartRequest,
_mandate_type: Option<api::MandateTxnType>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'a, F, api::PaymentsStartRequest>,
PaymentData<F>,
@ -49,14 +50,18 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsStartRequest> f
.change_context(errors::ApiErrorResponse::PaymentNotFound)?;
payment_intent = db
.find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id)
.find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id, storage_scheme)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
})?;
payment_attempt = db
.find_payment_attempt_by_payment_id_merchant_id(&payment_id, merchant_id)
.find_payment_attempt_by_payment_id_merchant_id(
&payment_id,
merchant_id,
storage_scheme,
)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
@ -95,6 +100,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsStartRequest> f
&payment_intent.payment_id,
&payment_intent.merchant_id,
&payment_attempt.txn_id,
storage_scheme,
)
.await
.map_err(|error| {
@ -147,6 +153,7 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsStartRequest> for P
_payment_id: &api::PaymentIdType,
payment_data: PaymentData<F>,
_customer: Option<Customer>,
_storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'b, F, api::PaymentsStartRequest>,
PaymentData<F>,
@ -166,9 +173,7 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsStartRequest> for PaymentS
merchant_account: &'a storage::MerchantAccount,
) -> RouterResult<(
BoxedOperation<'b, F, api::PaymentsStartRequest>,
&'a str,
api::PaymentIdType,
Option<api::MandateTxnType>,
operations::ValidateResult<'a>,
)> {
let request_merchant_id = Some(&request.merchant_id[..]);
helpers::validate_merchant_id(&merchant_account.merchant_id, request_merchant_id)
@ -181,9 +186,12 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsStartRequest> for PaymentS
Ok((
Box::new(self),
&merchant_account.merchant_id,
api::PaymentIdType::PaymentIntentId(payment_id),
None,
operations::ValidateResult {
merchant_id: &merchant_account.merchant_id,
payment_id: api::PaymentIdType::PaymentIntentId(payment_id),
mandate_type: None,
storage_scheme: merchant_account.storage_scheme,
},
))
}
}
@ -227,6 +235,7 @@ where
payment_attempt: &storage::PaymentAttempt,
request: &Option<api::PaymentMethod>,
token: &Option<String>,
_storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'a, F, api::PaymentsStartRequest>,
Option<api::PaymentMethod>,

View File

@ -9,11 +9,15 @@ use super::{BoxedOperation, Domain, GetTracker, Operation, UpdateTracker, Valida
use crate::{
core::{
errors::{self, ApiErrorResponse, RouterResult, StorageErrorExt},
payments::{helpers, CustomerDetails, PaymentAddress, PaymentData},
payments::{helpers, operations, CustomerDetails, PaymentAddress, PaymentData},
},
db::StorageInterface,
routes::AppState,
types::{api, storage, Connector},
types::{
api,
storage::{self, enums},
Connector,
},
utils::{self, OptionExt},
};
#[derive(Debug, Clone, Copy, PaymentOperation)]
@ -51,6 +55,7 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsRequest> for Paymen
_payment_id: &api::PaymentIdType,
payment_data: PaymentData<F>,
_customer: Option<storage::Customer>,
_storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(BoxedOperation<'b, F, api::PaymentsRequest>, PaymentData<F>)>
where
F: 'b + Send,
@ -67,6 +72,7 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsRetrieveRequest> fo
_payment_id: &api::PaymentIdType,
payment_data: PaymentData<F>,
_customer: Option<storage::Customer>,
_storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'b, F, api::PaymentsRetrieveRequest>,
PaymentData<F>,
@ -91,12 +97,21 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRetrieveRequest
_connector: Connector,
request: &api::PaymentsRetrieveRequest,
_mandate_type: Option<api::MandateTxnType>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'a, F, api::PaymentsRetrieveRequest>,
PaymentData<F>,
Option<CustomerDetails>,
)> {
get_tracker_for_sync(payment_id, merchant_id, &*state.store, request, self).await
get_tracker_for_sync(
payment_id,
merchant_id,
&*state.store,
request,
self,
storage_scheme,
)
.await
}
}
@ -110,6 +125,7 @@ async fn get_tracker_for_sync<
db: &dyn StorageInterface,
request: &api::PaymentsRetrieveRequest,
operation: Op,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'a, F, api::PaymentsRetrieveRequest>,
PaymentData<F>,
@ -119,13 +135,13 @@ async fn get_tracker_for_sync<
payment_attempt = match payment_id {
api::PaymentIdType::PaymentIntentId(ref id) => {
db.find_payment_attempt_by_payment_id_merchant_id(id, merchant_id)
db.find_payment_attempt_by_payment_id_merchant_id(id, merchant_id, storage_scheme)
}
api::PaymentIdType::ConnectorTransactionId(ref id) => {
db.find_payment_attempt_by_merchant_id_connector_txn_id(merchant_id, id)
db.find_payment_attempt_by_merchant_id_connector_txn_id(merchant_id, id, storage_scheme)
}
api::PaymentIdType::PaymentTxnId(ref id) => {
db.find_payment_attempt_by_merchant_id_txn_id(merchant_id, id)
db.find_payment_attempt_by_merchant_id_txn_id(merchant_id, id, storage_scheme)
}
}
.await
@ -134,7 +150,7 @@ async fn get_tracker_for_sync<
let payment_id_str = payment_attempt.payment_id.clone();
payment_intent = db
.find_payment_intent_by_payment_id_merchant_id(&payment_id_str, merchant_id)
.find_payment_intent_by_payment_id_merchant_id(&payment_id_str, merchant_id, storage_scheme)
.await
.map_err(|error| error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound))?;
@ -143,6 +159,7 @@ async fn get_tracker_for_sync<
&payment_intent.payment_id,
&payment_intent.merchant_id,
&payment_attempt.txn_id,
storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
@ -168,7 +185,7 @@ async fn get_tracker_for_sync<
)?;
let refunds = db
.find_refund_by_payment_id_merchant_id(&payment_id_str, merchant_id)
.find_refund_by_payment_id_merchant_id(&payment_id_str, merchant_id, storage_scheme)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)?;
@ -204,9 +221,7 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsRetrieveRequest> for Payme
merchant_account: &'a storage::MerchantAccount,
) -> RouterResult<(
BoxedOperation<'b, F, api::PaymentsRetrieveRequest>,
&'a str,
api::PaymentIdType,
Option<api::MandateTxnType>,
operations::ValidateResult<'a>,
)> {
let request_merchant_id = request.merchant_id.as_deref();
helpers::validate_merchant_id(&merchant_account.merchant_id, request_merchant_id)
@ -217,9 +232,12 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsRetrieveRequest> for Payme
Ok((
Box::new(self),
&merchant_account.merchant_id,
request.resource_id.clone(),
None,
operations::ValidateResult {
merchant_id: &merchant_account.merchant_id,
payment_id: request.resource_id.clone(),
mandate_type: None,
storage_scheme: merchant_account.storage_scheme,
},
))
}
}

View File

@ -9,7 +9,7 @@ use super::{BoxedOperation, Domain, GetTracker, Operation, UpdateTracker, Valida
use crate::{
core::{
errors::{self, RouterResult, StorageErrorExt},
payments::{self, helpers, CustomerDetails, PaymentAddress, PaymentData},
payments::{self, helpers, operations, CustomerDetails, PaymentAddress, PaymentData},
utils as core_utils,
},
db::StorageInterface,
@ -36,6 +36,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
_connector: Connector,
request: &api::PaymentsRequest,
mandate_type: Option<api::MandateTxnType>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(
BoxedOperation<'a, F, api::PaymentsRequest>,
PaymentData<F>,
@ -58,7 +59,11 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
.await?;
payment_attempt = db
.find_payment_attempt_by_payment_id_merchant_id(&payment_id, merchant_id)
.find_payment_attempt_by_payment_id_merchant_id(
&payment_id,
merchant_id,
storage_scheme,
)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
@ -78,7 +83,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
amount = request.amount.unwrap_or(payment_attempt.amount);
payment_intent = db
.find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id)
.find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id, storage_scheme)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
@ -114,6 +119,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
&payment_intent.payment_id,
&payment_intent.merchant_id,
&payment_attempt.txn_id,
storage_scheme,
)
.await
.map_err(|error| {
@ -172,6 +178,7 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsRequest> for Paymen
_payment_id: &api::PaymentIdType,
mut payment_data: PaymentData<F>,
customer: Option<storage::Customer>,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<(BoxedOperation<'b, F, api::PaymentsRequest>, PaymentData<F>)>
where
F: 'b + Send,
@ -200,6 +207,7 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsRequest> for Paymen
authentication_type: None,
payment_method,
},
storage_scheme,
)
.await
.map_err(|error| {
@ -235,6 +243,7 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsRequest> for Paymen
shipping_address_id: shipping_address,
billing_address_id: billing_address,
},
storage_scheme,
)
.await
.map_err(|error| {
@ -258,9 +267,7 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsRequest> for PaymentUpdate
merchant_account: &'a storage::MerchantAccount,
) -> RouterResult<(
BoxedOperation<'b, F, api::PaymentsRequest>,
&'a str,
api::PaymentIdType,
Option<api::MandateTxnType>,
operations::ValidateResult<'a>,
)> {
let given_payment_id = match &request.payment_id {
Some(id_type) => Some(
@ -291,9 +298,12 @@ impl<F: Send + Clone> ValidateRequest<F, api::PaymentsRequest> for PaymentUpdate
Ok((
Box::new(self),
&merchant_account.merchant_id,
api::PaymentIdType::PaymentIntentId(payment_id),
mandate_type,
operations::ValidateResult {
merchant_id: &merchant_account.merchant_id,
payment_id: api::PaymentIdType::PaymentIntentId(payment_id),
mandate_type,
storage_scheme: merchant_account.storage_scheme,
},
))
}
}

View File

@ -38,6 +38,7 @@ pub async fn refund_create_core(
.find_payment_attempt_last_successful_attempt_by_payment_id_merchant_id(
&req.payment_id,
merchant_id,
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::SuccessfulPaymentNotFound)?;
@ -56,7 +57,11 @@ pub async fn refund_create_core(
)?;
payment_intent = db
.find_payment_intent_by_payment_id_merchant_id(&req.payment_id, merchant_id)
.find_payment_intent_by_payment_id_merchant_id(
&req.payment_id,
merchant_id,
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::PaymentNotFound)?;
@ -142,7 +147,11 @@ pub async fn trigger_refund_to_gateway(
let response = state
.store
.update_refund(refund.to_owned(), refund_update)
.update_refund(
refund.to_owned(),
refund_update,
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)?;
Ok(response)
@ -162,13 +171,21 @@ pub async fn refund_retrieve_core(
merchant_id = &merchant_account.merchant_id;
refund = db
.find_refund_by_merchant_id_refund_id(merchant_id, refund_id.as_str())
.find_refund_by_merchant_id_refund_id(
merchant_id,
refund_id.as_str(),
merchant_account.storage_scheme,
)
.await
.map_err(|error| error.to_not_found_response(errors::ApiErrorResponse::RefundNotFound))?;
let payment_id = refund.payment_id.as_str();
payment_intent = db
.find_payment_intent_by_payment_id_merchant_id(payment_id, merchant_id)
.find_payment_intent_by_payment_id_merchant_id(
payment_id,
merchant_id,
merchant_account.storage_scheme,
)
.await
.map_err(|error| error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound))?;
@ -177,6 +194,7 @@ pub async fn refund_retrieve_core(
&refund.transaction_id,
payment_id,
merchant_id,
merchant_account.storage_scheme,
)
.await
.map_err(|error| error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound))?;
@ -251,7 +269,11 @@ pub async fn sync_refund_with_gateway(
let response = state
.store
.update_refund(refund.to_owned(), refund_update)
.update_refund(
refund.to_owned(),
refund_update,
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)?;
Ok(response)
@ -266,7 +288,11 @@ pub async fn refund_update_core(
req: refunds::RefundRequest,
) -> RouterResponse<refunds::RefundResponse> {
let refund = db
.find_refund_by_merchant_id_refund_id(&merchant_account.merchant_id, refund_id)
.find_refund_by_merchant_id_refund_id(
&merchant_account.merchant_id,
refund_id,
merchant_account.storage_scheme,
)
.await
.map_err(|error| error.to_not_found_response(errors::ApiErrorResponse::RefundNotFound))?;
@ -276,6 +302,7 @@ pub async fn refund_update_core(
storage::RefundUpdate::MetadataUpdate {
metadata: req.metadata,
},
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)?;
@ -323,6 +350,7 @@ pub async fn validate_and_create_refund(
&payment_intent.payment_id,
&merchant_account.merchant_id,
&refund_id,
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)?
@ -338,6 +366,7 @@ pub async fn validate_and_create_refund(
.find_refund_by_merchant_id_transaction_id(
&merchant_account.merchant_id,
connecter_transaction_id,
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::RefundNotFound)
@ -371,9 +400,12 @@ pub async fn validate_and_create_refund(
refund_amount,
);
refund = db.insert_refund(refund_create_req).await.map_err(|error| {
error.to_duplicate_response(errors::ApiErrorResponse::DuplicateRefundRequest)
})?;
refund = db
.insert_refund(refund_create_req, merchant_account.storage_scheme)
.await
.map_err(|error| {
error.to_duplicate_response(errors::ApiErrorResponse::DuplicateRefundRequest)
})?;
schedule_refund_execution(
state,
refund,
@ -548,11 +580,20 @@ pub async fn sync_refund_with_gateway_workflow(
refund_tracker.tracking_data
)
})?;
let merchant_account = db
.find_merchant_account_by_merchant_id(&refund_core.merchant_id)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound)
})?;
// FIXME we actually don't use this?
let _refund = db
.find_refund_by_internal_reference_id_merchant_id(
&refund_core.refund_internal_reference_id,
&refund_core.merchant_id,
merchant_account.storage_scheme,
)
.await
.map_err(|error| error.to_not_found_response(errors::ApiErrorResponse::RefundNotFound))?;
@ -592,41 +633,53 @@ pub async fn trigger_refund_execute_workflow(
refund_tracker.tracking_data
)
})?;
let merchant_account = db
.find_merchant_account_by_merchant_id(&refund_core.merchant_id)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound)
})?;
let refund = db
.find_refund_by_internal_reference_id_merchant_id(
&refund_core.refund_internal_reference_id,
&refund_core.merchant_id,
merchant_account.storage_scheme,
)
.await
.map_err(|error| error.to_not_found_response(errors::ApiErrorResponse::RefundNotFound))?;
match (&refund.sent_to_gateway, &refund.refund_status) {
//FIXME: Conversion should come from trait
(false, enums::RefundStatus::Pending) => {
let merchant_account = db
.find_merchant_account_by_merchant_id(&refund.merchant_id)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound)
})?;
let payment_attempt = db
.find_payment_attempt_by_transaction_id_payment_id_merchant_id(
&refund.transaction_id,
&refund_core.payment_id,
&refund.merchant_id,
)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
})?;
let payment_intent = db
.find_payment_intent_by_payment_id_merchant_id(
&payment_attempt.payment_id,
&refund.merchant_id,
merchant_account.storage_scheme,
)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
})?;
let merchant_account = db
.find_merchant_account_by_merchant_id(&refund.merchant_id)
let payment_intent = db
.find_payment_intent_by_payment_id_merchant_id(
&payment_attempt.payment_id,
&refund.merchant_id,
merchant_account.storage_scheme,
)
.await
.map_err(|error| {
error.to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound)
error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
})?;
//trigger refund request to gateway

View File

@ -95,9 +95,10 @@ pub async fn validate_uniqueness_of_refund_id_against_merchant_id(
payment_id: &str,
merchant_id: &str,
refund_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> RouterResult<Option<storage::Refund>> {
let refund = db
.find_refund_by_merchant_id_refund_id(merchant_id, refund_id)
.find_refund_by_merchant_id_refund_id(merchant_id, refund_id, storage_scheme)
.await;
logger::debug!(?refund);
match refund {

View File

@ -16,6 +16,7 @@ pub trait AddressInterface {
&self,
address: AddressNew,
) -> CustomResult<Address, errors::StorageError>;
async fn find_address(&self, address_id: &str) -> CustomResult<Address, errors::StorageError>;
}

View File

@ -2,7 +2,7 @@ use super::MockDb;
use crate::{
connection::pg_connection,
core::errors::{self, CustomResult},
types::storage::{ConnectorResponse, ConnectorResponseNew, ConnectorResponseUpdate},
types::storage::{enums, ConnectorResponse, ConnectorResponseNew, ConnectorResponseUpdate},
};
#[async_trait::async_trait]
@ -10,17 +10,22 @@ pub trait ConnectorResponseInterface {
async fn insert_connector_response(
&self,
connector_response: ConnectorResponseNew,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<ConnectorResponse, errors::StorageError>;
async fn find_connector_response_by_payment_id_merchant_id_txn_id(
&self,
payment_id: &str,
merchant_id: &str,
txn_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<ConnectorResponse, errors::StorageError>;
async fn update_connector_response(
&self,
this: ConnectorResponse,
payment_attempt: ConnectorResponseUpdate,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<ConnectorResponse, errors::StorageError>;
}
@ -29,6 +34,7 @@ impl ConnectorResponseInterface for super::Store {
async fn insert_connector_response(
&self,
connector_response: ConnectorResponseNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<ConnectorResponse, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
connector_response.insert(&conn).await
@ -39,6 +45,7 @@ impl ConnectorResponseInterface for super::Store {
payment_id: &str,
merchant_id: &str,
txn_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<ConnectorResponse, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
ConnectorResponse::find_by_payment_id_and_merchant_id_transaction_id(
@ -54,6 +61,7 @@ impl ConnectorResponseInterface for super::Store {
&self,
this: ConnectorResponse,
connector_response_update: ConnectorResponseUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<ConnectorResponse, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
this.update(&conn, connector_response_update).await
@ -65,6 +73,7 @@ impl ConnectorResponseInterface for MockDb {
async fn insert_connector_response(
&self,
new: ConnectorResponseNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<ConnectorResponse, errors::StorageError> {
let mut connector_response = self.connector_response.lock().await;
let response = ConnectorResponse {
@ -88,6 +97,7 @@ impl ConnectorResponseInterface for MockDb {
_payment_id: &str,
_merchant_id: &str,
_txn_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<ConnectorResponse, errors::StorageError> {
todo!()
}
@ -96,6 +106,7 @@ impl ConnectorResponseInterface for MockDb {
&self,
this: ConnectorResponse,
connector_response_update: ConnectorResponseUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<ConnectorResponse, errors::StorageError> {
let mut connector_response = self.connector_response.lock().await;
let response = connector_response

View File

@ -5,7 +5,7 @@ use super::MockDb;
use crate::{
connection::pg_connection,
core::errors::{self, CustomResult, DatabaseError, StorageError},
types::storage::{MerchantAccount, MerchantAccountNew, MerchantAccountUpdate},
types::storage::{enums, MerchantAccount, MerchantAccountNew, MerchantAccountUpdate},
};
#[async_trait::async_trait]
@ -122,6 +122,7 @@ impl MerchantAccountInterface for MockDb {
sub_merchants_enabled: merchant_account.sub_merchants_enabled,
parent_merchant_id: merchant_account.parent_merchant_id,
publishable_key: merchant_account.publishable_key,
storage_scheme: enums::MerchantStorageScheme::PostgresOnly,
};
accounts.push(account.clone());
Ok(account)

View File

@ -1,7 +1,7 @@
use super::MockDb;
use crate::{
core::errors::{self, CustomResult},
types::storage::{PaymentAttempt, PaymentAttemptNew, PaymentAttemptUpdate},
types::storage::{enums, PaymentAttempt, PaymentAttemptNew, PaymentAttemptUpdate},
};
#[async_trait::async_trait]
@ -9,18 +9,21 @@ pub trait PaymentAttemptInterface {
async fn insert_payment_attempt(
&self,
payment_attempt: PaymentAttemptNew,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError>;
async fn update_payment_attempt(
&self,
this: PaymentAttempt,
payment_attempt: PaymentAttemptUpdate,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError>;
async fn find_payment_attempt_by_payment_id_merchant_id(
&self,
payment_id: &str,
merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError>;
async fn find_payment_attempt_by_transaction_id_payment_id_merchant_id(
@ -28,24 +31,28 @@ pub trait PaymentAttemptInterface {
transaction_id: &str,
payment_id: &str,
merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError>;
async fn find_payment_attempt_last_successful_attempt_by_payment_id_merchant_id(
&self,
payment_id: &str,
merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError>;
async fn find_payment_attempt_by_merchant_id_connector_txn_id(
&self,
merchant_id: &str,
connector_txn_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError>;
async fn find_payment_attempt_by_merchant_id_txn_id(
&self,
merchant_id: &str,
txn_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError>;
}
@ -56,7 +63,7 @@ mod storage {
connection::pg_connection,
core::errors::{self, CustomResult},
services::Store,
types::storage::payment_attempt::*,
types::storage::{enums, payment_attempt::*},
};
#[async_trait::async_trait]
@ -64,6 +71,7 @@ mod storage {
async fn insert_payment_attempt(
&self,
payment_attempt: PaymentAttemptNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
payment_attempt.insert_diesel(&conn).await
@ -73,6 +81,7 @@ mod storage {
&self,
this: PaymentAttempt,
payment_attempt: PaymentAttemptUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
this.update(&conn, payment_attempt).await
@ -82,6 +91,7 @@ mod storage {
&self,
payment_id: &str,
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
PaymentAttempt::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id).await
@ -92,6 +102,7 @@ mod storage {
transaction_id: &str,
payment_id: &str,
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
PaymentAttempt::find_by_transaction_id_payment_id_merchant_id(
@ -107,6 +118,7 @@ mod storage {
&self,
payment_id: &str,
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
PaymentAttempt::find_last_successful_attempt_by_payment_id_merchant_id(
@ -121,6 +133,7 @@ mod storage {
&self,
merchant_id: &str,
connector_txn_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
// TODO: update logic to lookup all payment attempts for an intent
@ -137,6 +150,7 @@ mod storage {
&self,
merchant_id: &str,
txn_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
@ -151,6 +165,7 @@ impl PaymentAttemptInterface for MockDb {
&self,
_merchant_id: &str,
_txn_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
todo!()
}
@ -159,6 +174,7 @@ impl PaymentAttemptInterface for MockDb {
&self,
_merchant_id: &str,
_connector_txn_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
todo!()
}
@ -167,6 +183,7 @@ impl PaymentAttemptInterface for MockDb {
async fn insert_payment_attempt(
&self,
payment_attempt: PaymentAttemptNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let mut payment_attempts = self.payment_attempts.lock().await;
let id = payment_attempts.len() as i32;
@ -211,6 +228,7 @@ impl PaymentAttemptInterface for MockDb {
&self,
this: PaymentAttempt,
payment_attempt: PaymentAttemptUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let mut payment_attempts = self.payment_attempts.lock().await;
@ -228,6 +246,7 @@ impl PaymentAttemptInterface for MockDb {
&self,
_payment_id: &str,
_merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
todo!()
}
@ -237,6 +256,7 @@ impl PaymentAttemptInterface for MockDb {
_transaction_id: &str,
_payment_id: &str,
_merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
todo!()
}
@ -245,6 +265,7 @@ impl PaymentAttemptInterface for MockDb {
&self,
payment_id: &str,
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let payment_attempts = self.payment_attempts.lock().await;
@ -280,69 +301,142 @@ mod storage {
async fn insert_payment_attempt(
&self,
payment_attempt: PaymentAttemptNew,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let key = format!(
"{}_{}",
payment_attempt.payment_id, payment_attempt.merchant_id
);
// TODO: need to add an application generated payment attempt id to distinguish between multiple attempts for the same payment id
// Check for database presence as well Maybe use a read replica here ?
let created_attempt = PaymentAttempt {
id: 0i32,
payment_id: payment_attempt.payment_id.clone(),
merchant_id: payment_attempt.merchant_id.clone(),
txn_id: payment_attempt.txn_id.clone(),
status: payment_attempt.status,
amount: payment_attempt.amount,
currency: payment_attempt.currency,
save_to_locker: payment_attempt.save_to_locker,
connector: payment_attempt.connector.clone(),
error_message: payment_attempt.error_message.clone(),
offer_amount: payment_attempt.offer_amount,
surcharge_amount: payment_attempt.surcharge_amount,
tax_amount: payment_attempt.tax_amount,
payment_method_id: payment_attempt.payment_method_id.clone(),
payment_method: payment_attempt.payment_method,
payment_flow: payment_attempt.payment_flow,
redirect: payment_attempt.redirect,
connector_transaction_id: payment_attempt.connector_transaction_id.clone(),
capture_method: payment_attempt.capture_method,
capture_on: payment_attempt.capture_on,
confirm: payment_attempt.confirm,
authentication_type: payment_attempt.authentication_type,
created_at: payment_attempt.created_at.unwrap_or_else(date_time::now),
modified_at: payment_attempt.created_at.unwrap_or_else(date_time::now),
last_synced: payment_attempt.last_synced,
amount_to_capture: payment_attempt.amount_to_capture,
cancellation_reason: payment_attempt.cancellation_reason.clone(),
mandate_id: payment_attempt.mandate_id.clone(),
browser_info: payment_attempt.browser_info.clone(),
};
// TODO: Add a proper error for serialization failure
let redis_value = serde_json::to_string(&created_attempt)
.into_report()
.change_context(errors::StorageError::KVError)?;
match self
.redis_conn
.pool
.hsetnx::<u8, &str, &str, &str>(&key, "pa", &redis_value)
.await
{
Ok(0) => Err(errors::StorageError::DuplicateValue(format!(
"Payment Attempt already exists for payment_id: {}",
key
)))
.into_report(),
Ok(1) => {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
let query = payment_attempt
.insert_diesel(&conn)
payment_attempt.insert_diesel(&conn).await
}
enums::MerchantStorageScheme::RedisKv => {
let key = format!(
"{}_{}",
payment_attempt.payment_id, payment_attempt.merchant_id
);
// TODO: need to add an application generated payment attempt id to distinguish between multiple attempts for the same payment id
// Check for database presence as well Maybe use a read replica here ?
let created_attempt = PaymentAttempt {
id: 0i32,
payment_id: payment_attempt.payment_id.clone(),
merchant_id: payment_attempt.merchant_id.clone(),
txn_id: payment_attempt.txn_id.clone(),
status: payment_attempt.status,
amount: payment_attempt.amount,
currency: payment_attempt.currency,
save_to_locker: payment_attempt.save_to_locker,
connector: payment_attempt.connector.clone(),
error_message: payment_attempt.error_message.clone(),
offer_amount: payment_attempt.offer_amount,
surcharge_amount: payment_attempt.surcharge_amount,
tax_amount: payment_attempt.tax_amount,
payment_method_id: payment_attempt.payment_method_id.clone(),
payment_method: payment_attempt.payment_method,
payment_flow: payment_attempt.payment_flow,
redirect: payment_attempt.redirect,
connector_transaction_id: payment_attempt.connector_transaction_id.clone(),
capture_method: payment_attempt.capture_method,
capture_on: payment_attempt.capture_on,
confirm: payment_attempt.confirm,
authentication_type: payment_attempt.authentication_type,
created_at: payment_attempt.created_at.unwrap_or_else(date_time::now),
modified_at: payment_attempt.created_at.unwrap_or_else(date_time::now),
last_synced: payment_attempt.last_synced,
amount_to_capture: payment_attempt.amount_to_capture,
cancellation_reason: payment_attempt.cancellation_reason.clone(),
mandate_id: payment_attempt.mandate_id.clone(),
browser_info: payment_attempt.browser_info.clone(),
};
// TODO: Add a proper error for serialization failure
let redis_value = serde_json::to_string(&created_attempt)
.into_report()
.change_context(errors::StorageError::KVError)?;
match self
.redis_conn
.pool
.hsetnx::<u8, &str, &str, &str>(&key, "pa", &redis_value)
.await
{
Ok(0) => Err(errors::StorageError::DuplicateValue(format!(
"Payment Attempt already exists for payment_id: {}",
key
)))
.into_report(),
Ok(1) => {
let conn = pg_connection(&self.master_pool).await;
let query = payment_attempt
.insert_diesel_query(&conn)
.await
.change_context(errors::StorageError::KVError)?;
let stream_name = self.drainer_stream(&PaymentAttempt::shard_key(
crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId {
merchant_id: &created_attempt.merchant_id,
payment_id: &created_attempt.payment_id,
},
self.config.drainer_num_partitions,
));
self.redis_conn
.stream_append_entry(
&stream_name,
&RedisEntryId::AutoGeneratedID,
query.to_field_value_pairs(),
)
.await
.change_context(errors::StorageError::KVError)?;
Ok(created_attempt)
}
Ok(i) => Err(errors::StorageError::KVError)
.into_report()
.attach_printable_lazy(|| {
format!("Invalid response for HSETNX: {}", i)
}),
Err(er) => Err(er)
.into_report()
.change_context(errors::StorageError::KVError),
}
}
}
}
async fn update_payment_attempt(
&self,
this: PaymentAttempt,
payment_attempt: PaymentAttemptUpdate,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
this.update(&conn, payment_attempt).await
}
enums::MerchantStorageScheme::RedisKv => {
let key = format!("{}_{}", this.payment_id, this.merchant_id);
let updated_attempt = payment_attempt.clone().apply_changeset(this.clone());
// Check for database presence as well Maybe use a read replica here ?
// TODO: Add a proper error for serialization failure
let redis_value = serde_json::to_string(&updated_attempt)
.into_report()
.change_context(errors::StorageError::KVError)?;
let updated_attempt = self
.redis_conn
.pool
.hset::<u8, &str, (&str, String)>(&key, ("pa", redis_value))
.await
.map(|_| updated_attempt)
.into_report()
.change_context(errors::StorageError::KVError)?;
let conn = pg_connection(&self.master_pool).await;
let query = this
.update_query(&conn, payment_attempt)
.await
.change_context(errors::StorageError::KVError)?;
let stream_name = self.drainer_stream(&PaymentAttempt::shard_key(
crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId {
merchant_id: &created_attempt.merchant_id,
payment_id: &created_attempt.payment_id,
merchant_id: &updated_attempt.merchant_id,
payment_id: &updated_attempt.payment_id,
},
self.config.drainer_num_partitions,
));
@ -354,80 +448,40 @@ mod storage {
)
.await
.change_context(errors::StorageError::KVError)?;
Ok(created_attempt)
Ok(updated_attempt)
}
Ok(i) => Err(errors::StorageError::KVError)
.into_report()
.attach_printable_lazy(|| format!("Invalid response for HSETNX: {}", i)),
Err(er) => Err(er)
.into_report()
.change_context(errors::StorageError::KVError),
}
}
async fn update_payment_attempt(
&self,
this: PaymentAttempt,
payment_attempt: PaymentAttemptUpdate,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let key = format!("{}_{}", this.payment_id, this.merchant_id);
let updated_attempt = payment_attempt.clone().apply_changeset(this.clone());
// Check for database presence as well Maybe use a read replica here ?
// TODO: Add a proper error for serialization failure
let redis_value = serde_json::to_string(&updated_attempt)
.into_report()
.change_context(errors::StorageError::KVError)?;
let updated_attempt = self
.redis_conn
.pool
.hset::<u8, &str, (&str, String)>(&key, ("pa", redis_value))
.await
.map(|_| updated_attempt)
.into_report()
.change_context(errors::StorageError::KVError)?;
let conn = pg_connection(&self.master_pool).await;
let query = this
.update(&conn, payment_attempt)
.await
.change_context(errors::StorageError::KVError)?;
let stream_name = self.drainer_stream(&PaymentAttempt::shard_key(
crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId {
merchant_id: &updated_attempt.merchant_id,
payment_id: &updated_attempt.payment_id,
},
self.config.drainer_num_partitions,
));
self.redis_conn
.stream_append_entry(
&stream_name,
&RedisEntryId::AutoGeneratedID,
query.to_field_value_pairs(),
)
.await
.change_context(errors::StorageError::KVError)?;
Ok(updated_attempt)
}
async fn find_payment_attempt_by_payment_id_merchant_id(
&self,
payment_id: &str,
merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let key = format!("{}_{}", payment_id, merchant_id);
self.redis_conn
.pool
.hget::<String, String, &str>(key, "pa")
.await
.into_report()
.change_context(errors::StorageError::KVError)
.and_then(|redis_resp| {
serde_json::from_str::<PaymentAttempt>(&redis_resp)
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
PaymentAttempt::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id)
.await
}
enums::MerchantStorageScheme::RedisKv => {
let key = format!("{}_{}", payment_id, merchant_id);
self.redis_conn
.pool
.hget::<String, String, &str>(key, "pa")
.await
.into_report()
.change_context(errors::StorageError::KVError)
})
// Check for database presence as well Maybe use a read replica here ?
.and_then(|redis_resp| {
serde_json::from_str::<PaymentAttempt>(&redis_resp)
.into_report()
.change_context(errors::StorageError::KVError)
})
// Check for database presence as well Maybe use a read replica here ?
}
}
}
async fn find_payment_attempt_by_transaction_id_payment_id_merchant_id(
@ -435,54 +489,92 @@ mod storage {
transaction_id: &str,
payment_id: &str,
merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
// We assume that PaymentAttempt <=> PaymentIntent is a one-to-one relation for now
self.find_payment_attempt_by_payment_id_merchant_id(payment_id, merchant_id)
.await
.and_then(|attempt| {
if attempt.connector_transaction_id.as_deref() == Some(transaction_id) {
Ok(attempt)
} else {
Err(errors::StorageError::ValueNotFound(format!(
"Successful payment attempt does not exist for {}_{}",
payment_id, merchant_id
)))
.into_report()
}
})
self.find_payment_attempt_by_payment_id_merchant_id(
payment_id,
merchant_id,
storage_scheme,
)
.await
.and_then(|attempt| {
if attempt.connector_transaction_id.as_deref() == Some(transaction_id) {
Ok(attempt)
} else {
Err(errors::StorageError::ValueNotFound(format!(
"Successful payment attempt does not exist for {}_{}",
payment_id, merchant_id
)))
.into_report()
}
})
}
async fn find_payment_attempt_last_successful_attempt_by_payment_id_merchant_id(
&self,
payment_id: &str,
merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
self.find_payment_attempt_by_payment_id_merchant_id(payment_id, merchant_id)
.await
.and_then(|attempt| match attempt.status {
enums::AttemptStatus::Charged => Ok(attempt),
_ => Err(errors::StorageError::ValueNotFound(format!(
"Successful payment attempt does not exist for {}_{}",
payment_id, merchant_id
)))
.into_report(),
})
self.find_payment_attempt_by_payment_id_merchant_id(
payment_id,
merchant_id,
storage_scheme,
)
.await
.and_then(|attempt| match attempt.status {
enums::AttemptStatus::Charged => Ok(attempt),
_ => Err(errors::StorageError::ValueNotFound(format!(
"Successful payment attempt does not exist for {}_{}",
payment_id, merchant_id
)))
.into_report(),
})
}
async fn find_payment_attempt_by_merchant_id_connector_txn_id(
&self,
_merchant_id: &str,
_connector_txn_id: &str,
merchant_id: &str,
connector_txn_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
Err(errors::StorageError::KVError).into_report()
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
// TODO: update logic to lookup all payment attempts for an intent
// and apply filter logic on top of them to get the desired one.
PaymentAttempt::find_by_merchant_id_connector_txn_id(
&conn,
merchant_id,
connector_txn_id,
)
.await
}
enums::MerchantStorageScheme::RedisKv => {
Err(errors::StorageError::KVError).into_report()
}
}
}
async fn find_payment_attempt_by_merchant_id_txn_id(
&self,
_merchant_id: &str,
_txn_id: &str,
merchant_id: &str,
txn_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
Err(errors::StorageError::KVError).into_report()
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
PaymentAttempt::find_by_merchant_id_transaction_id(&conn, merchant_id, txn_id)
.await
}
enums::MerchantStorageScheme::RedisKv => {
Err(errors::StorageError::KVError).into_report()
}
}
}
}
}

View File

@ -3,7 +3,7 @@ use crate::{
core::errors::{self, CustomResult},
types::{
api,
storage::{PaymentIntent, PaymentIntentNew, PaymentIntentUpdate},
storage::{enums, PaymentIntent, PaymentIntentNew, PaymentIntentUpdate},
},
};
@ -13,23 +13,27 @@ pub trait PaymentIntentInterface {
&self,
this: PaymentIntent,
payment_intent: PaymentIntentUpdate,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError>;
async fn insert_payment_intent(
&self,
new: PaymentIntentNew,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError>;
async fn find_payment_intent_by_payment_id_merchant_id(
&self,
payment_id: &str,
merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError>;
async fn filter_payment_intent_by_constraints(
&self,
merchant_id: &str,
pc: &api::PaymentListConstraints,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<PaymentIntent>, errors::StorageError>;
}
@ -45,7 +49,10 @@ mod storage {
connection::pg_connection,
core::errors::{self, CustomResult},
services::Store,
types::{api, storage::payment_intent::*},
types::{
api,
storage::{enums, payment_intent::*},
},
utils::storage_partitioning::KvStorePartition,
};
@ -54,56 +61,129 @@ mod storage {
async fn insert_payment_intent(
&self,
new: PaymentIntentNew,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError> {
let key = format!("{}_{}", new.payment_id, new.merchant_id);
let created_intent = PaymentIntent {
id: 0i32,
payment_id: new.payment_id.clone(),
merchant_id: new.merchant_id.clone(),
status: new.status,
amount: new.amount,
currency: new.currency,
amount_captured: new.amount_captured,
customer_id: new.customer_id.clone(),
description: new.description.clone(),
return_url: new.return_url.clone(),
metadata: new.metadata.clone(),
connector_id: new.connector_id.clone(),
shipping_address_id: new.shipping_address_id.clone(),
billing_address_id: new.billing_address_id.clone(),
statement_descriptor_name: new.statement_descriptor_name.clone(),
statement_descriptor_suffix: new.statement_descriptor_suffix.clone(),
created_at: new.created_at.unwrap_or_else(date_time::now),
modified_at: new.created_at.unwrap_or_else(date_time::now),
last_synced: new.last_synced,
setup_future_usage: new.setup_future_usage,
off_session: new.off_session,
client_secret: new.client_secret.clone(),
};
// TODO: Add a proper error for serialization failure
let redis_value = serde_json::to_string(&created_intent)
.into_report()
.change_context(errors::StorageError::KVError)?;
match self
.redis_conn
.pool
.hsetnx::<u8, &str, &str, &str>(&key, "pi", &redis_value)
.await
{
Ok(0) => Err(errors::StorageError::DuplicateValue(format!(
"Payment Intent already exists for payment_id: {key}"
)))
.into_report(),
Ok(1) => {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
let query = new
.insert_diesel(&conn)
new.insert_diesel(&conn).await
}
enums::MerchantStorageScheme::RedisKv => {
let key = format!("{}_{}", new.payment_id, new.merchant_id);
let created_intent = PaymentIntent {
id: 0i32,
payment_id: new.payment_id.clone(),
merchant_id: new.merchant_id.clone(),
status: new.status,
amount: new.amount,
currency: new.currency,
amount_captured: new.amount_captured,
customer_id: new.customer_id.clone(),
description: new.description.clone(),
return_url: new.return_url.clone(),
metadata: new.metadata.clone(),
connector_id: new.connector_id.clone(),
shipping_address_id: new.shipping_address_id.clone(),
billing_address_id: new.billing_address_id.clone(),
statement_descriptor_name: new.statement_descriptor_name.clone(),
statement_descriptor_suffix: new.statement_descriptor_suffix.clone(),
created_at: new.created_at.unwrap_or_else(date_time::now),
modified_at: new.created_at.unwrap_or_else(date_time::now),
last_synced: new.last_synced,
setup_future_usage: new.setup_future_usage,
off_session: new.off_session,
client_secret: new.client_secret.clone(),
};
// TODO: Add a proper error for serialization failure
let redis_value = serde_json::to_string(&created_intent)
.into_report()
.change_context(errors::StorageError::KVError)?;
match self
.redis_conn
.pool
.hsetnx::<u8, &str, &str, &str>(&key, "pi", &redis_value)
.await
{
Ok(0) => Err(errors::StorageError::DuplicateValue(format!(
"Payment Intent already exists for payment_id: {key}"
)))
.into_report(),
Ok(1) => {
let conn = pg_connection(&self.master_pool).await;
let query = new
.insert_diesel_query(&conn)
.await
.change_context(errors::StorageError::KVError)?;
let stream_name = self.drainer_stream(&PaymentIntent::shard_key(
crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId {
merchant_id: &created_intent.merchant_id,
payment_id: &created_intent.payment_id,
},
self.config.drainer_num_partitions,
));
self.redis_conn
.stream_append_entry(
&stream_name,
&RedisEntryId::AutoGeneratedID,
query.to_field_value_pairs(),
)
.await
.change_context(errors::StorageError::KVError)?;
Ok(created_intent)
}
Ok(i) => Err(errors::StorageError::KVError)
.into_report()
.attach_printable_lazy(|| {
format!("Invalid response for HSETNX: {}", i)
}),
Err(er) => Err(er)
.into_report()
.change_context(errors::StorageError::KVError),
}
}
}
}
async fn update_payment_intent(
&self,
this: PaymentIntent,
payment_intent: PaymentIntentUpdate,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
this.update(&conn, payment_intent).await
}
enums::MerchantStorageScheme::RedisKv => {
let key = format!("{}_{}", this.payment_id, this.merchant_id);
let updated_intent = payment_intent.clone().apply_changeset(this.clone());
// Check for database presence as well Maybe use a read replica here ?
// TODO: Add a proper error for serialization failure
let redis_value = serde_json::to_string(&updated_intent)
.into_report()
.change_context(errors::StorageError::KVError)?;
let updated_intent = self
.redis_conn
.pool
.hset::<u8, &str, (&str, String)>(&key, ("pi", redis_value))
.await
.map(|_| updated_intent)
.into_report()
.change_context(errors::StorageError::KVError)?;
let conn = pg_connection(&self.master_pool).await;
let query = this
.update_query(&conn, payment_intent)
.await
.change_context(errors::StorageError::KVError)?;
let stream_name = self.drainer_stream(&PaymentIntent::shard_key(
crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId {
merchant_id: &created_intent.merchant_id,
payment_id: &created_intent.payment_id,
merchant_id: &updated_intent.merchant_id,
payment_id: &updated_intent.payment_id,
},
self.config.drainer_num_partitions,
));
@ -115,94 +195,64 @@ mod storage {
)
.await
.change_context(errors::StorageError::KVError)?;
Ok(created_intent)
Ok(updated_intent)
}
Ok(i) => Err(errors::StorageError::KVError)
.into_report()
.attach_printable_lazy(|| format!("Invalid response for HSETNX: {}", i)),
Err(er) => Err(er)
.into_report()
.change_context(errors::StorageError::KVError),
}
}
async fn update_payment_intent(
&self,
this: PaymentIntent,
payment_intent: PaymentIntentUpdate,
) -> CustomResult<PaymentIntent, errors::StorageError> {
let key = format!("{}_{}", this.payment_id, this.merchant_id);
let updated_intent = payment_intent.clone().apply_changeset(this.clone());
// Check for database presence as well Maybe use a read replica here ?
// TODO: Add a proper error for serialization failure
let redis_value = serde_json::to_string(&updated_intent)
.into_report()
.change_context(errors::StorageError::KVError)?;
let updated_intent = self
.redis_conn
.pool
.hset::<u8, &str, (&str, String)>(&key, ("pi", redis_value))
.await
.map(|_| updated_intent)
.into_report()
.change_context(errors::StorageError::KVError)?;
let conn = pg_connection(&self.master_pool).await;
let query = this
.update(&conn, payment_intent)
.await
.change_context(errors::StorageError::KVError)?;
let stream_name = self.drainer_stream(&PaymentIntent::shard_key(
crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId {
merchant_id: &updated_intent.merchant_id,
payment_id: &updated_intent.payment_id,
},
self.config.drainer_num_partitions,
));
self.redis_conn
.stream_append_entry(
&stream_name,
&RedisEntryId::AutoGeneratedID,
query.to_field_value_pairs(),
)
.await
.change_context(errors::StorageError::KVError)?;
Ok(updated_intent)
}
async fn find_payment_intent_by_payment_id_merchant_id(
&self,
payment_id: &str,
merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError> {
let key = format!("{}_{}", payment_id, merchant_id);
self.redis_conn
.pool
.hget::<String, &str, &str>(&key, "pi")
.await
.map_err(|err| match err.kind() {
RedisErrorKind::NotFound => errors::StorageError::ValueNotFound(format!(
"Payment Intent does not exist for {}",
key
)),
_ => errors::StorageError::KVError,
})
.into_report()
.and_then(|redis_resp| {
serde_json::from_str::<PaymentIntent>(&redis_resp)
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
PaymentIntent::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id)
.await
}
enums::MerchantStorageScheme::RedisKv => {
let key = format!("{}_{}", payment_id, merchant_id);
self.redis_conn
.pool
.hget::<String, &str, &str>(&key, "pi")
.await
.map_err(|err| match err.kind() {
RedisErrorKind::NotFound => errors::StorageError::ValueNotFound(
format!("Payment Intent does not exist for {}", key),
),
_ => errors::StorageError::KVError,
})
.into_report()
.change_context(errors::StorageError::KVError)
})
// Check for database presence as well Maybe use a read replica here ?
.and_then(|redis_resp| {
serde_json::from_str::<PaymentIntent>(&redis_resp)
.into_report()
.change_context(errors::StorageError::KVError)
})
// Check for database presence as well Maybe use a read replica here ?
}
}
}
async fn filter_payment_intent_by_constraints(
&self,
_merchant_id: &str,
_pc: &api::PaymentListConstraints,
merchant_id: &str,
pc: &api::PaymentListConstraints,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<PaymentIntent>, errors::StorageError> {
//TODO: Implement this
Err(errors::StorageError::KVError.into())
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
PaymentIntent::filter_by_constraints(&conn, merchant_id, pc).await
}
enums::MerchantStorageScheme::RedisKv => {
//TODO: Implement this
Err(errors::StorageError::KVError.into())
}
}
}
}
}
@ -214,7 +264,10 @@ mod storage {
connection::pg_connection,
core::errors::{self, CustomResult},
services::Store,
types::{api, storage::payment_intent::*},
types::{
api,
storage::{enums, payment_intent::*},
},
};
#[async_trait::async_trait]
@ -222,6 +275,7 @@ mod storage {
async fn insert_payment_intent(
&self,
new: PaymentIntentNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
new.insert_diesel(&conn).await
@ -231,6 +285,7 @@ mod storage {
&self,
this: PaymentIntent,
payment_intent: PaymentIntentUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
this.update(&conn, payment_intent).await
@ -240,6 +295,7 @@ mod storage {
&self,
payment_id: &str,
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
PaymentIntent::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id).await
@ -249,6 +305,7 @@ mod storage {
&self,
merchant_id: &str,
pc: &api::PaymentListConstraints,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<PaymentIntent>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
PaymentIntent::filter_by_constraints(&conn, merchant_id, pc).await
@ -262,6 +319,7 @@ impl PaymentIntentInterface for MockDb {
&self,
_merchant_id: &str,
_pc: &api::PaymentListConstraints,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<PaymentIntent>, errors::StorageError> {
todo!()
}
@ -270,6 +328,7 @@ impl PaymentIntentInterface for MockDb {
async fn insert_payment_intent(
&self,
new: PaymentIntentNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError> {
let mut payment_intents = self.payment_intents.lock().await;
let time = common_utils::date_time::now();
@ -305,6 +364,7 @@ impl PaymentIntentInterface for MockDb {
&self,
this: PaymentIntent,
update: PaymentIntentUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError> {
let mut payment_intents = self.payment_intents.lock().await;
let payment_intent = payment_intents
@ -319,6 +379,7 @@ impl PaymentIntentInterface for MockDb {
&self,
payment_id: &str,
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError> {
let payment_intents = self.payment_intents.lock().await;

View File

@ -4,7 +4,7 @@ use super::MockDb;
use crate::{
connection::pg_connection,
core::errors::{self, CustomResult, DatabaseError, StorageError},
types::storage::{Refund, RefundNew, RefundUpdate},
types::storage::{enums, Refund, RefundNew, RefundUpdate},
};
#[async_trait::async_trait]
@ -13,12 +13,14 @@ pub trait RefundInterface {
&self,
internal_reference_id: &str,
merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Refund, errors::StorageError>;
async fn find_refund_by_payment_id_merchant_id(
&self,
payment_id: &str,
merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<Refund>, errors::StorageError>;
// async fn find_refund_by_payment_id_merchant_id_refund_id(
@ -32,21 +34,28 @@ pub trait RefundInterface {
&self,
merchant_id: &str,
refund_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Refund, errors::StorageError>;
async fn update_refund(
&self,
this: Refund,
refund: RefundUpdate,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Refund, errors::StorageError>;
async fn find_refund_by_merchant_id_transaction_id(
&self,
merchant_id: &str,
txn_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<Refund>, errors::StorageError>;
async fn insert_refund(&self, new: RefundNew) -> CustomResult<Refund, errors::StorageError>;
async fn insert_refund(
&self,
new: RefundNew,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Refund, errors::StorageError>;
}
#[async_trait::async_trait]
@ -55,13 +64,18 @@ impl RefundInterface for super::Store {
&self,
internal_reference_id: &str,
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
Refund::find_by_internal_reference_id_merchant_id(&conn, internal_reference_id, merchant_id)
.await
}
async fn insert_refund(&self, new: RefundNew) -> CustomResult<Refund, errors::StorageError> {
async fn insert_refund(
&self,
new: RefundNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
new.insert(&conn).await
}
@ -69,6 +83,7 @@ impl RefundInterface for super::Store {
&self,
merchant_id: &str,
txn_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<Refund>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
Refund::find_by_merchant_id_transaction_id(&conn, merchant_id, txn_id).await
@ -78,6 +93,7 @@ impl RefundInterface for super::Store {
&self,
this: Refund,
refund: RefundUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
this.update(&conn, refund).await
@ -87,6 +103,7 @@ impl RefundInterface for super::Store {
&self,
merchant_id: &str,
refund_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
Refund::find_by_merchant_id_refund_id(&conn, merchant_id, refund_id).await
@ -107,6 +124,7 @@ impl RefundInterface for super::Store {
&self,
payment_id: &str,
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<Refund>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
Refund::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id).await
@ -119,11 +137,16 @@ impl RefundInterface for MockDb {
&self,
_internal_reference_id: &str,
_merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Refund, errors::StorageError> {
todo!()
}
async fn insert_refund(&self, new: RefundNew) -> CustomResult<Refund, errors::StorageError> {
async fn insert_refund(
&self,
new: RefundNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Refund, errors::StorageError> {
let mut refunds = self.refunds.lock().await;
let current_time = common_utils::date_time::now();
@ -157,6 +180,7 @@ impl RefundInterface for MockDb {
&self,
merchant_id: &str,
txn_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<Refund>, errors::StorageError> {
let refunds = self.refunds.lock().await;
@ -173,6 +197,7 @@ impl RefundInterface for MockDb {
&self,
_this: Refund,
_refund: RefundUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Refund, errors::StorageError> {
todo!()
}
@ -181,6 +206,7 @@ impl RefundInterface for MockDb {
&self,
merchant_id: &str,
refund_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Refund, errors::StorageError> {
let refunds = self.refunds.lock().await;
@ -195,6 +221,7 @@ impl RefundInterface for MockDb {
&self,
_payment_id: &str,
_merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<Refund>, errors::StorageError> {
todo!()
}

View File

@ -121,7 +121,6 @@ pub fn mk_app(
/// Unwrap used because without the value we can't start the server
pub async fn start_server(conf: Settings) -> BachResult<(Server, AppState)> {
logger::debug!(startup_config=?conf);
let server = conf.server.clone();
let state = routes::AppState::new(conf).await;
// Cloning to close connections before shutdown

View File

@ -150,6 +150,7 @@ diesel::table! {
sub_merchants_enabled -> Nullable<Bool>,
parent_merchant_id -> Nullable<Varchar>,
publishable_key -> Nullable<Varchar>,
storage_scheme -> MerchantStorageScheme,
}
}

View File

@ -26,7 +26,11 @@ use crate::{
db::StorageInterface,
logger, routes,
routes::AppState,
types::{self, api, storage, ErrorResponse, Response},
types::{
self, api,
storage::{self, enums},
ErrorResponse, Response,
},
utils::OptionExt,
};
@ -556,6 +560,7 @@ pub async fn authenticate_merchant<'a>(
payment_response_hash_key: None,
redirect_to_merchant_with_http_post: false,
publishable_key: None,
storage_scheme: enums::MerchantStorageScheme::PostgresOnly,
})
}

View File

@ -6,7 +6,8 @@ pub mod diesel_exports {
DbEventClass as EventClass, DbEventObjectType as EventObjectType, DbEventType as EventType,
DbFutureUsage as FutureUsage, DbIntentStatus as IntentStatus,
DbMandateStatus as MandateStatus, DbMandateType as MandateType,
DbPaymentFlow as PaymentFlow, DbPaymentMethodIssuerCode as PaymentMethodIssuerCode,
DbMerchantStorageScheme as MerchantStorageScheme, DbPaymentFlow as PaymentFlow,
DbPaymentMethodIssuerCode as PaymentMethodIssuerCode,
DbPaymentMethodSubType as PaymentMethodSubType, DbPaymentMethodType as PaymentMethodType,
DbProcessTrackerStatus as ProcessTrackerStatus, DbRefundStatus as RefundStatus,
DbRefundType as RefundType, DbRoutingAlgorithm as RoutingAlgorithm,
@ -362,6 +363,28 @@ pub enum FutureUsage {
OnSession,
}
#[derive(
Clone,
Copy,
Debug,
Default,
Eq,
PartialEq,
serde::Deserialize,
serde::Serialize,
strum::Display,
strum::EnumString,
router_derive::DieselEnum,
)]
#[router_derive::diesel_enum]
#[serde(rename_all = "snake_case")]
#[strum(serialize_all = "snake_case")]
pub enum MerchantStorageScheme {
#[default]
PostgresOnly,
RedisKv,
}
#[derive(
Clone,
Copy,

View File

@ -20,6 +20,7 @@ pub struct MerchantAccount {
pub sub_merchants_enabled: Option<bool>,
pub parent_merchant_id: Option<String>,
pub publishable_key: Option<String>,
pub storage_scheme: enums::MerchantStorageScheme,
}
#[derive(Clone, Debug, Default, Insertable, router_derive::DebugAsDisplay)]

View File

@ -238,7 +238,12 @@ mod tests {
use uuid::Uuid;
use super::*;
use crate::{configs::settings::Settings, db::StorageImpl, routes, types};
use crate::{
configs::settings::Settings,
db::StorageImpl,
routes,
types::{self, storage::enums},
};
#[actix_rt::test]
#[ignore]
@ -259,7 +264,7 @@ mod tests {
let response = state
.store
.insert_payment_attempt(payment_attempt)
.insert_payment_attempt(payment_attempt, enums::MerchantStorageScheme::PostgresOnly)
.await
.unwrap();
eprintln!("{:?}", response);
@ -289,13 +294,17 @@ mod tests {
};
state
.store
.insert_payment_attempt(payment_attempt)
.insert_payment_attempt(payment_attempt, enums::MerchantStorageScheme::PostgresOnly)
.await
.unwrap();
let response = state
.store
.find_payment_attempt_by_payment_id_merchant_id(&payment_id, &merchant_id)
.find_payment_attempt_by_payment_id_merchant_id(
&payment_id,
&merchant_id,
enums::MerchantStorageScheme::PostgresOnly,
)
.await
.unwrap();
@ -326,13 +335,17 @@ mod tests {
};
state
.store
.insert_payment_attempt(payment_attempt)
.insert_payment_attempt(payment_attempt, enums::MerchantStorageScheme::PostgresOnly)
.await
.unwrap();
let response = state
.store
.find_payment_attempt_by_payment_id_merchant_id(&uuid, "1")
.find_payment_attempt_by_payment_id_merchant_id(
&uuid,
"1",
enums::MerchantStorageScheme::PostgresOnly,
)
.await
.unwrap();
// checking it after fetch

View File

@ -8,7 +8,7 @@ use router_env::tracing::{self, instrument};
#[cfg(not(feature = "kv_store"))]
use super::generics::{self, ExecuteQuery};
#[cfg(feature = "kv_store")]
use super::generics::{self, RawQuery, RawSqlQuery};
use super::generics::{self, ExecuteQuery, RawQuery, RawSqlQuery};
use crate::{
connection::PgPooledConn,
core::errors::{self, CustomResult},
@ -21,7 +21,6 @@ use crate::{
};
impl PaymentAttemptNew {
#[cfg(not(feature = "kv_store"))]
#[instrument(skip(conn))]
pub async fn insert_diesel(
self,
@ -32,7 +31,7 @@ impl PaymentAttemptNew {
#[cfg(feature = "kv_store")]
#[instrument(skip(conn))]
pub async fn insert_diesel(
pub async fn insert_diesel_query(
self,
conn: &PgPooledConn,
) -> CustomResult<RawSqlQuery, errors::StorageError> {
@ -41,7 +40,6 @@ impl PaymentAttemptNew {
}
impl PaymentAttempt {
#[cfg(not(feature = "kv_store"))]
#[instrument(skip(conn))]
pub async fn update(
self,
@ -68,7 +66,7 @@ impl PaymentAttempt {
#[cfg(feature = "kv_store")]
#[instrument(skip(conn))]
pub async fn update(
pub async fn update_query(
self,
conn: &PgPooledConn,
payment_attempt: PaymentAttemptUpdate,

View File

@ -6,7 +6,7 @@ use router_env::tracing::{self, instrument};
#[cfg(not(feature = "kv_store"))]
use super::generics::{self, ExecuteQuery};
#[cfg(feature = "kv_store")]
use super::generics::{self, RawQuery, RawSqlQuery};
use super::generics::{self, ExecuteQuery, RawQuery, RawSqlQuery};
use crate::{
connection::PgPooledConn,
core::errors::{self, CustomResult},
@ -20,7 +20,6 @@ use crate::{
};
impl PaymentIntentNew {
#[cfg(not(feature = "kv_store"))]
#[instrument(skip(conn))]
pub async fn insert_diesel(
self,
@ -31,7 +30,7 @@ impl PaymentIntentNew {
#[cfg(feature = "kv_store")]
#[instrument(skip(conn))]
pub async fn insert_diesel(
pub async fn insert_diesel_query(
self,
conn: &PgPooledConn,
) -> CustomResult<RawSqlQuery, errors::StorageError> {
@ -40,7 +39,6 @@ impl PaymentIntentNew {
}
impl PaymentIntent {
#[cfg(not(feature = "kv_store"))]
#[instrument(skip(conn))]
pub async fn update(
self,
@ -67,7 +65,7 @@ impl PaymentIntent {
#[cfg(feature = "kv_store")]
#[instrument(skip(conn))]
pub async fn update(
pub async fn update_query(
self,
conn: &PgPooledConn,
payment_intent: PaymentIntentUpdate,