feat(mandate_kv): add kv support for mandate (#4275)

Co-authored-by: Akshay S <akshay.s@Akshay-Subramanian-D66TQ6D97K.local>
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
akshay-97
2024-04-16 15:35:15 +05:30
committed by GitHub
parent c3361ef5eb
commit 00340a3369
17 changed files with 628 additions and 124 deletions

View File

@ -23,7 +23,8 @@ use crate::{
mandates::{self, MandateResponseExt},
ConnectorData, GetToken,
},
domain, storage,
domain,
storage::{self, enums::MerchantStorageScheme},
transformers::ForeignFrom,
},
utils::OptionExt,
@ -39,7 +40,11 @@ pub async fn get_mandate(
let mandate = state
.store
.as_ref()
.find_mandate_by_merchant_id_mandate_id(&merchant_account.merchant_id, &req.mandate_id)
.find_mandate_by_merchant_id_mandate_id(
&merchant_account.merchant_id,
&req.mandate_id,
merchant_account.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?;
Ok(services::ApplicationResponse::Json(
@ -62,7 +67,11 @@ pub async fn revoke_mandate(
) -> RouterResponse<mandates::MandateRevokedResponse> {
let db = state.store.as_ref();
let mandate = db
.find_mandate_by_merchant_id_mandate_id(&merchant_account.merchant_id, &req.mandate_id)
.find_mandate_by_merchant_id_mandate_id(
&merchant_account.merchant_id,
&req.mandate_id,
merchant_account.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?;
match mandate.mandate_status {
@ -123,6 +132,8 @@ pub async fn revoke_mandate(
storage::MandateUpdate::StatusUpdate {
mandate_status: storage::enums::MandateStatus::Revoked,
},
mandate,
merchant_account.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?;
@ -162,6 +173,7 @@ pub async fn update_connector_mandate_id(
mandate_ids_opt: Option<String>,
payment_method_id: Option<String>,
resp: Result<types::PaymentsResponseData, types::ErrorResponse>,
storage_scheme: MerchantStorageScheme,
) -> RouterResponse<mandates::MandateResponse> {
let mandate_details = Option::foreign_from(resp);
let connector_mandate_id = mandate_details
@ -176,7 +188,7 @@ pub async fn update_connector_mandate_id(
//Ignore updation if the payment_attempt mandate_id or connector_mandate_id is not present
if let Some((mandate_id, connector_id)) = mandate_ids_opt.zip(connector_mandate_id) {
let mandate = db
.find_mandate_by_merchant_id_mandate_id(&merchant_account, &mandate_id)
.find_mandate_by_merchant_id_mandate_id(&merchant_account, &mandate_id, storage_scheme)
.await
.change_context(errors::ApiErrorResponse::MandateNotFound)?;
@ -199,6 +211,8 @@ pub async fn update_connector_mandate_id(
&merchant_account,
&mandate_id,
update_mandate_details,
mandate,
storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::MandateUpdateFailed)?;
@ -262,6 +276,7 @@ pub async fn update_mandate_procedure<F, FData>(
mandate: Mandate,
merchant_id: &str,
pm_id: Option<String>,
storage_scheme: MerchantStorageScheme,
) -> errors::RouterResult<types::RouterData<F, FData, types::PaymentsResponseData>>
where
FData: MandateBehaviour,
@ -276,13 +291,14 @@ where
};
let old_record = payments::UpdateHistory {
connector_mandate_id: mandate.connector_mandate_id,
payment_method_id: mandate.payment_method_id,
original_payment_id: mandate.original_payment_id,
connector_mandate_id: mandate.connector_mandate_id.clone(),
payment_method_id: mandate.payment_method_id.clone(),
original_payment_id: mandate.original_payment_id.clone(),
};
let mandate_ref = mandate
.connector_mandate_ids
.clone()
.parse_value::<payments::ConnectorMandateReferenceId>("Connector Reference Id")
.change_context(errors::ApiErrorResponse::MandateDeserializationFailed)?;
@ -302,11 +318,12 @@ where
.change_context(errors::ApiErrorResponse::InternalServerError)
.map(masking::Secret::new)?;
let mandate_id = mandate.mandate_id.clone();
let _update_mandate_details = state
.store
.update_mandate_by_merchant_id_mandate_id(
merchant_id,
&mandate.mandate_id,
&mandate_id,
diesel_models::MandateUpdate::ConnectorMandateIdUpdate {
connector_mandate_id: mandate_details
.as_ref()
@ -316,6 +333,8 @@ where
.unwrap_or("Error retrieving the payment_method_id".to_string()),
original_payment_id: Some(resp.payment_id.clone()),
},
mandate,
storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::MandateUpdateFailed)?;
@ -327,6 +346,7 @@ pub async fn mandate_procedure<F, FData>(
maybe_customer: &Option<domain::Customer>,
pm_id: Option<String>,
merchant_connector_id: Option<String>,
storage_scheme: MerchantStorageScheme,
) -> errors::RouterResult<types::RouterData<F, FData, types::PaymentsResponseData>>
where
FData: MandateBehaviour,
@ -336,15 +356,16 @@ where
Ok(_) => match resp.request.get_mandate_id() {
Some(mandate_id) => {
if let Some(ref mandate_id) = mandate_id.mandate_id {
let mandate = state
let orig_mandate = state
.store
.find_mandate_by_merchant_id_mandate_id(
resp.merchant_id.as_ref(),
mandate_id,
storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?;
let mandate = match mandate.mandate_type {
let mandate = match orig_mandate.mandate_type {
storage_enums::MandateType::SingleUse => state
.store
.update_mandate_by_merchant_id_mandate_id(
@ -353,6 +374,8 @@ where
storage::MandateUpdate::StatusUpdate {
mandate_status: storage_enums::MandateStatus::Revoked,
},
orig_mandate,
storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::MandateUpdateFailed),
@ -363,10 +386,12 @@ where
mandate_id,
storage::MandateUpdate::CaptureAmountUpdate {
amount_captured: Some(
mandate.amount_captured.unwrap_or(0)
orig_mandate.amount_captured.unwrap_or(0)
+ resp.request.get_amount(),
),
},
orig_mandate,
storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::MandateUpdateFailed),
@ -451,7 +476,7 @@ where
}));
state
.store
.insert_mandate(new_mandate_data)
.insert_mandate(new_mandate_data, storage_scheme)
.await
.to_duplicate_response(errors::ApiErrorResponse::DuplicateMandate)?;
metrics::MANDATE_COUNT.add(

View File

@ -117,6 +117,7 @@ impl Feature<api::Authorize, types::PaymentsAuthorizeData> for types::PaymentsAu
maybe_customer,
payment_method_id,
connector.merchant_connector_id.clone(),
merchant_account.storage_scheme,
)
.await?)
} else {

View File

@ -123,6 +123,7 @@ impl Feature<api::SetupMandate, types::SetupMandateRequestData> for types::Setup
maybe_customer,
pm_id,
connector.merchant_connector_id.clone(),
merchant_account.storage_scheme,
)
.await
}
@ -265,6 +266,7 @@ impl types::SetupMandateRouterData {
maybe_customer,
pm_id,
connector.merchant_connector_id.clone(),
merchant_account.storage_scheme,
)
.await?)
}
@ -348,7 +350,11 @@ impl types::SetupMandateRouterData {
.0;
let mandate = state
.store
.find_mandate_by_merchant_id_mandate_id(&merchant_account.merchant_id, &mandate_id)
.find_mandate_by_merchant_id_mandate_id(
&merchant_account.merchant_id,
&mandate_id,
merchant_account.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?;
@ -400,6 +406,7 @@ impl types::SetupMandateRouterData {
mandate,
&merchant_account.merchant_id,
pm_id,
merchant_account.storage_scheme,
)
.await
}

View File

@ -560,7 +560,11 @@ pub async fn get_token_for_recurring_mandate(
let db = &*state.store;
let mandate = db
.find_mandate_by_merchant_id_mandate_id(&merchant_account.merchant_id, mandate_id.as_str())
.find_mandate_by_merchant_id_mandate_id(
&merchant_account.merchant_id,
mandate_id.as_str(),
merchant_account.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?;

View File

@ -320,7 +320,7 @@ impl<F: Send + Clone, Ctx: PaymentMethodRetrieve>
})
.async_and_then(|mandate_id| async {
let mandate = db
.find_mandate_by_merchant_id_mandate_id(merchant_id, mandate_id)
.find_mandate_by_merchant_id_mandate_id(merchant_id, mandate_id, storage_scheme)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound);
Some(mandate.and_then(|mandate_obj| {

View File

@ -939,6 +939,7 @@ async fn payment_response_update_tracker<F: Clone, T: types::Capturable>(
m_payment_data_mandate_id,
m_payment_method_id,
m_router_data_response,
storage_scheme,
)
.await
}

View File

@ -290,7 +290,7 @@ impl<F: Send + Clone, Ctx: PaymentMethodRetrieve>
})
.async_and_then(|mandate_id| async {
let mandate = db
.find_mandate_by_merchant_id_mandate_id(merchant_id, mandate_id)
.find_mandate_by_merchant_id_mandate_id(merchant_id, mandate_id, merchant_account.storage_scheme)
.await
.change_context(errors::ApiErrorResponse::MandateNotFound);
Some(mandate.and_then(|mandate_obj| {

View File

@ -434,6 +434,7 @@ pub async fn mandates_incoming_webhook_flow(
.find_mandate_by_merchant_id_mandate_id(
&merchant_account.merchant_id,
mandate_id.as_str(),
merchant_account.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?,
@ -443,6 +444,7 @@ pub async fn mandates_incoming_webhook_flow(
.find_mandate_by_merchant_id_connector_mandate_id(
&merchant_account.merchant_id,
connector_mandate_id.as_str(),
merchant_account.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?,
@ -452,11 +454,14 @@ pub async fn mandates_incoming_webhook_flow(
let mandate_status = common_enums::MandateStatus::foreign_try_from(event_type)
.change_context(errors::ApiErrorResponse::WebhookProcessingFailure)
.attach_printable("event type to mandate status mapping failed")?;
let mandate_id = mandate.mandate_id.clone();
let updated_mandate = db
.update_mandate_by_merchant_id_mandate_id(
&merchant_account.merchant_id,
&mandate.mandate_id,
&mandate_id,
storage::MandateUpdate::StatusUpdate { mandate_status },
mandate,
merchant_account.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?;

View File

@ -668,9 +668,10 @@ impl MandateInterface for KafkaStore {
&self,
merchant_id: &str,
mandate_id: &str,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage::Mandate, errors::StorageError> {
self.diesel_store
.find_mandate_by_merchant_id_mandate_id(merchant_id, mandate_id)
.find_mandate_by_merchant_id_mandate_id(merchant_id, mandate_id, storage_scheme)
.await
}
@ -678,9 +679,14 @@ impl MandateInterface for KafkaStore {
&self,
merchant_id: &str,
connector_mandate_id: &str,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage::Mandate, errors::StorageError> {
self.diesel_store
.find_mandate_by_merchant_id_connector_mandate_id(merchant_id, connector_mandate_id)
.find_mandate_by_merchant_id_connector_mandate_id(
merchant_id,
connector_mandate_id,
storage_scheme,
)
.await
}
@ -698,10 +704,18 @@ impl MandateInterface for KafkaStore {
&self,
merchant_id: &str,
mandate_id: &str,
mandate: storage::MandateUpdate,
mandate_update: storage::MandateUpdate,
mandate: storage::Mandate,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage::Mandate, errors::StorageError> {
self.diesel_store
.update_mandate_by_merchant_id_mandate_id(merchant_id, mandate_id, mandate)
.update_mandate_by_merchant_id_mandate_id(
merchant_id,
mandate_id,
mandate_update,
mandate,
storage_scheme,
)
.await
}
@ -718,8 +732,11 @@ impl MandateInterface for KafkaStore {
async fn insert_mandate(
&self,
mandate: storage::MandateNew,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage::Mandate, errors::StorageError> {
self.diesel_store.insert_mandate(mandate).await
self.diesel_store
.insert_mandate(mandate, storage_scheme)
.await
}
}

View File

@ -1,11 +1,9 @@
use error_stack::{report, ResultExt};
use router_env::{instrument, tracing};
use error_stack::ResultExt;
use super::{MockDb, Store};
use super::MockDb;
use crate::{
connection,
core::{errors, errors::CustomResult},
types::storage::{self, MandateDbExt},
core::errors::{self, CustomResult},
types::storage::{self as storage_types, enums::MerchantStorageScheme},
};
#[async_trait::async_trait]
@ -14,116 +12,439 @@ pub trait MandateInterface {
&self,
merchant_id: &str,
mandate_id: &str,
) -> CustomResult<storage::Mandate, errors::StorageError>;
storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage_types::Mandate, errors::StorageError>;
async fn find_mandate_by_merchant_id_connector_mandate_id(
&self,
merchant_id: &str,
connector_mandate_id: &str,
) -> CustomResult<storage::Mandate, errors::StorageError>;
storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage_types::Mandate, errors::StorageError>;
async fn find_mandate_by_merchant_id_customer_id(
&self,
merchant_id: &str,
customer_id: &str,
) -> CustomResult<Vec<storage::Mandate>, errors::StorageError>;
) -> CustomResult<Vec<storage_types::Mandate>, errors::StorageError>;
async fn update_mandate_by_merchant_id_mandate_id(
&self,
merchant_id: &str,
mandate_id: &str,
mandate: storage::MandateUpdate,
) -> CustomResult<storage::Mandate, errors::StorageError>;
mandate_update: storage_types::MandateUpdate,
mandate: storage_types::Mandate,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage_types::Mandate, errors::StorageError>;
async fn find_mandates_by_merchant_id(
&self,
merchant_id: &str,
mandate_constraints: api_models::mandates::MandateListConstraints,
) -> CustomResult<Vec<storage::Mandate>, errors::StorageError>;
) -> CustomResult<Vec<storage_types::Mandate>, errors::StorageError>;
async fn insert_mandate(
&self,
mandate: storage::MandateNew,
) -> CustomResult<storage::Mandate, errors::StorageError>;
mandate: storage_types::MandateNew,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage_types::Mandate, errors::StorageError>;
}
#[async_trait::async_trait]
impl MandateInterface for Store {
#[instrument(skip_all)]
async fn find_mandate_by_merchant_id_mandate_id(
&self,
merchant_id: &str,
mandate_id: &str,
) -> CustomResult<storage::Mandate, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage::Mandate::find_by_merchant_id_mandate_id(&conn, merchant_id, mandate_id)
#[cfg(feature = "kv_store")]
mod storage {
use common_utils::fallback_reverse_lookup_not_found;
use diesel_models::kv;
use error_stack::{report, ResultExt};
use redis_interface::HsetnxReply;
use router_env::{instrument, tracing};
use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey};
use super::MandateInterface;
use crate::{
connection,
core::errors::{self, utils::RedisErrorExt, CustomResult},
db::reverse_lookup::ReverseLookupInterface,
services::Store,
types::storage::{self as storage_types, enums::MerchantStorageScheme, MandateDbExt},
utils::db_utils,
};
#[async_trait::async_trait]
impl MandateInterface for Store {
#[instrument(skip_all)]
async fn find_mandate_by_merchant_id_mandate_id(
&self,
merchant_id: &str,
mandate_id: &str,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage_types::Mandate, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
let database_call = || async {
storage_types::Mandate::find_by_merchant_id_mandate_id(
&conn,
merchant_id,
mandate_id,
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
};
match storage_scheme {
MerchantStorageScheme::PostgresOnly => database_call().await,
MerchantStorageScheme::RedisKv => {
let key = PartitionKey::MerchantIdMandateId {
merchant_id,
mandate_id,
};
let field = format!("mandate_{}", mandate_id);
Box::pin(db_utils::try_redis_get_else_try_database_get(
async {
kv_wrapper(
self,
KvOperation::<diesel_models::Mandate>::HGet(&field),
key,
)
.await?
.try_into_hget()
},
database_call,
))
.await
}
}
}
#[instrument(skip_all)]
async fn find_mandate_by_merchant_id_connector_mandate_id(
&self,
merchant_id: &str,
connector_mandate_id: &str,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage_types::Mandate, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
let database_call = || async {
storage_types::Mandate::find_by_merchant_id_connector_mandate_id(
&conn,
merchant_id,
connector_mandate_id,
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
};
match storage_scheme {
MerchantStorageScheme::PostgresOnly => database_call().await,
MerchantStorageScheme::RedisKv => {
let lookup_id =
format!("mid_{}_conn_mandate_{}", merchant_id, connector_mandate_id);
let lookup = fallback_reverse_lookup_not_found!(
self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
.await,
database_call().await
);
let key = PartitionKey::CombinationKey {
combination: &lookup.pk_id,
};
Box::pin(db_utils::try_redis_get_else_try_database_get(
async {
kv_wrapper(
self,
KvOperation::<diesel_models::Mandate>::HGet(&lookup.sk_id),
key,
)
.await?
.try_into_hget()
},
database_call,
))
.await
}
}
}
#[instrument(skip_all)]
async fn find_mandate_by_merchant_id_customer_id(
&self,
merchant_id: &str,
customer_id: &str,
) -> CustomResult<Vec<storage_types::Mandate>, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage_types::Mandate::find_by_merchant_id_customer_id(&conn, merchant_id, customer_id)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
#[instrument(skip_all)]
async fn update_mandate_by_merchant_id_mandate_id(
&self,
merchant_id: &str,
mandate_id: &str,
mandate_update: storage_types::MandateUpdate,
mandate: storage_types::Mandate,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage_types::Mandate, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
storage_types::Mandate::update_by_merchant_id_mandate_id(
&conn,
merchant_id,
mandate_id,
storage_types::MandateUpdateInternal::from(mandate_update),
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
MerchantStorageScheme::RedisKv => {
let key = PartitionKey::MerchantIdMandateId {
merchant_id,
mandate_id,
};
let field = format!("mandate_{}", mandate_id);
let key_str = key.to_string();
if let diesel_models::MandateUpdate::ConnectorMandateIdUpdate {
connector_mandate_id: Some(val),
..
} = &mandate_update
{
let rev_lookup = diesel_models::ReverseLookupNew {
sk_id: field.clone(),
pk_id: key_str.clone(),
lookup_id: format!("mid_{}_conn_mandate_{}", merchant_id, val),
source: "mandate".to_string(),
updated_by: storage_scheme.to_string(),
};
self.insert_reverse_lookup(rev_lookup, storage_scheme)
.await?;
}
let m_update = diesel_models::MandateUpdateInternal::from(mandate_update);
let updated_mandate = m_update.clone().apply_changeset(mandate.clone());
let redis_value = serde_json::to_string(&updated_mandate)
.change_context(errors::StorageError::SerializationFailed)?;
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Update {
updatable: kv::Updateable::MandateUpdate(kv::MandateUpdateMems {
orig: mandate,
update_data: m_update,
}),
},
};
kv_wrapper::<(), _, _>(
self,
KvOperation::<diesel_models::Mandate>::Hset(
(&field, redis_value),
redis_entry,
),
key,
)
.await
.map_err(|err| err.to_redis_failed_response(&key_str))?
.try_into_hset()
.change_context(errors::StorageError::KVError)?;
Ok(updated_mandate)
}
}
}
#[instrument(skip_all)]
async fn find_mandates_by_merchant_id(
&self,
merchant_id: &str,
mandate_constraints: api_models::mandates::MandateListConstraints,
) -> CustomResult<Vec<storage_types::Mandate>, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage_types::Mandate::filter_by_constraints(&conn, merchant_id, mandate_constraints)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
#[instrument(skip_all)]
async fn insert_mandate(
&self,
mandate: storage_types::MandateNew,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage_types::Mandate, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => mandate
.insert(&conn)
.await
.map_err(|error| report!(errors::StorageError::from(error))),
MerchantStorageScheme::RedisKv => {
let mandate_id = mandate.mandate_id.clone();
let merchant_id = mandate.merchant_id.clone();
let connector_mandate_id = mandate.connector_mandate_id.clone();
let key = PartitionKey::MerchantIdMandateId {
merchant_id: merchant_id.as_str(),
mandate_id: mandate_id.as_str(),
};
let key_str = key.to_string();
let field = format!("mandate_{}", mandate_id);
let storage_mandate = storage_types::Mandate::from(&mandate);
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Insert {
insertable: kv::Insertable::Mandate(mandate),
},
};
if let Some(connector_val) = connector_mandate_id {
let lookup_id =
format!("mid_{}_conn_mandate_{}", merchant_id, connector_val);
let reverse_lookup_entry = diesel_models::ReverseLookupNew {
sk_id: field.clone(),
pk_id: key_str.clone(),
lookup_id,
source: "mandate".to_string(),
updated_by: storage_scheme.to_string(),
};
self.insert_reverse_lookup(reverse_lookup_entry, storage_scheme)
.await?;
}
match kv_wrapper::<diesel_models::Mandate, _, _>(
self,
KvOperation::<diesel_models::Mandate>::HSetNx(
&field,
&storage_mandate,
redis_entry,
),
key,
)
.await
.map_err(|err| err.to_redis_failed_response(&key_str))?
.try_into_hsetnx()
{
Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue {
entity: "mandate",
key: Some(storage_mandate.mandate_id),
}
.into()),
Ok(HsetnxReply::KeySet) => Ok(storage_mandate),
Err(er) => Err(er).change_context(errors::StorageError::KVError),
}
}
}
}
}
}
#[cfg(not(feature = "kv_store"))]
mod storage {
use error_stack::report;
use router_env::{instrument, tracing};
use super::MandateInterface;
use crate::{
connection,
core::errors::{self, CustomResult},
services::Store,
types::storage::{self as storage_types, enums::MerchantStorageScheme, MandateDbExt},
};
#[async_trait::async_trait]
impl MandateInterface for Store {
#[instrument(skip_all)]
async fn find_mandate_by_merchant_id_mandate_id(
&self,
merchant_id: &str,
mandate_id: &str,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage_types::Mandate, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage_types::Mandate::find_by_merchant_id_mandate_id(&conn, merchant_id, mandate_id)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
#[instrument(skip_all)]
async fn find_mandate_by_merchant_id_connector_mandate_id(
&self,
merchant_id: &str,
connector_mandate_id: &str,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage_types::Mandate, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage_types::Mandate::find_by_merchant_id_connector_mandate_id(
&conn,
merchant_id,
connector_mandate_id,
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
}
#[instrument(skip_all)]
async fn find_mandate_by_merchant_id_connector_mandate_id(
&self,
merchant_id: &str,
connector_mandate_id: &str,
) -> CustomResult<storage::Mandate, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage::Mandate::find_by_merchant_id_connector_mandate_id(
&conn,
merchant_id,
connector_mandate_id,
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
#[instrument(skip_all)]
async fn find_mandate_by_merchant_id_customer_id(
&self,
merchant_id: &str,
customer_id: &str,
) -> CustomResult<Vec<storage_types::Mandate>, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage_types::Mandate::find_by_merchant_id_customer_id(&conn, merchant_id, customer_id)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
#[instrument(skip_all)]
async fn find_mandate_by_merchant_id_customer_id(
&self,
merchant_id: &str,
customer_id: &str,
) -> CustomResult<Vec<storage::Mandate>, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage::Mandate::find_by_merchant_id_customer_id(&conn, merchant_id, customer_id)
#[instrument(skip_all)]
async fn update_mandate_by_merchant_id_mandate_id(
&self,
merchant_id: &str,
mandate_id: &str,
mandate_update: storage_types::MandateUpdate,
_mandate: storage_types::Mandate,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage_types::Mandate, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
storage_types::Mandate::update_by_merchant_id_mandate_id(
&conn,
merchant_id,
mandate_id,
storage_types::MandateUpdateInternal::from(mandate_update),
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
}
#[instrument(skip_all)]
async fn update_mandate_by_merchant_id_mandate_id(
&self,
merchant_id: &str,
mandate_id: &str,
mandate: storage::MandateUpdate,
) -> CustomResult<storage::Mandate, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
storage::Mandate::update_by_merchant_id_mandate_id(&conn, merchant_id, mandate_id, mandate)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
#[instrument(skip_all)]
async fn find_mandates_by_merchant_id(
&self,
merchant_id: &str,
mandate_constraints: api_models::mandates::MandateListConstraints,
) -> CustomResult<Vec<storage_types::Mandate>, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage_types::Mandate::filter_by_constraints(&conn, merchant_id, mandate_constraints)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
#[instrument(skip_all)]
async fn find_mandates_by_merchant_id(
&self,
merchant_id: &str,
mandate_constraints: api_models::mandates::MandateListConstraints,
) -> CustomResult<Vec<storage::Mandate>, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage::Mandate::filter_by_constraints(&conn, merchant_id, mandate_constraints)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
#[instrument(skip_all)]
async fn insert_mandate(
&self,
mandate: storage::MandateNew,
) -> CustomResult<storage::Mandate, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
mandate
.insert(&conn)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
#[instrument(skip_all)]
async fn insert_mandate(
&self,
mandate: storage_types::MandateNew,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage_types::Mandate, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
mandate
.insert(&conn)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
}
}
@ -133,7 +454,8 @@ impl MandateInterface for MockDb {
&self,
merchant_id: &str,
mandate_id: &str,
) -> CustomResult<storage::Mandate, errors::StorageError> {
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage_types::Mandate, errors::StorageError> {
self.mandates
.lock()
.await
@ -148,7 +470,8 @@ impl MandateInterface for MockDb {
&self,
merchant_id: &str,
connector_mandate_id: &str,
) -> CustomResult<storage::Mandate, errors::StorageError> {
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage_types::Mandate, errors::StorageError> {
self.mandates
.lock()
.await
@ -166,7 +489,7 @@ impl MandateInterface for MockDb {
&self,
merchant_id: &str,
customer_id: &str,
) -> CustomResult<Vec<storage::Mandate>, errors::StorageError> {
) -> CustomResult<Vec<storage_types::Mandate>, errors::StorageError> {
return Ok(self
.mandates
.lock()
@ -183,8 +506,10 @@ impl MandateInterface for MockDb {
&self,
merchant_id: &str,
mandate_id: &str,
mandate_update: storage::MandateUpdate,
) -> CustomResult<storage::Mandate, errors::StorageError> {
mandate_update: storage_types::MandateUpdate,
_mandate: storage_types::Mandate,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage_types::Mandate, errors::StorageError> {
let mut mandates = self.mandates.lock().await;
match mandates
.iter_mut()
@ -192,13 +517,13 @@ impl MandateInterface for MockDb {
{
Some(mandate) => {
match mandate_update {
storage::MandateUpdate::StatusUpdate { mandate_status } => {
storage_types::MandateUpdate::StatusUpdate { mandate_status } => {
mandate.mandate_status = mandate_status;
}
storage::MandateUpdate::CaptureAmountUpdate { amount_captured } => {
storage_types::MandateUpdate::CaptureAmountUpdate { amount_captured } => {
mandate.amount_captured = amount_captured;
}
storage::MandateUpdate::ConnectorReferenceUpdate {
storage_types::MandateUpdate::ConnectorReferenceUpdate {
connector_mandate_ids,
} => {
mandate.connector_mandate_ids = connector_mandate_ids;
@ -228,7 +553,7 @@ impl MandateInterface for MockDb {
&self,
merchant_id: &str,
mandate_constraints: api_models::mandates::MandateListConstraints,
) -> CustomResult<Vec<storage::Mandate>, errors::StorageError> {
) -> CustomResult<Vec<storage_types::Mandate>, errors::StorageError> {
let mandates = self.mandates.lock().await;
let mandates_iter = mandates.iter().filter(|mandate| {
let mut checker = mandate.merchant_id == merchant_id;
@ -263,7 +588,7 @@ impl MandateInterface for MockDb {
mandate_constraints.offset.unwrap_or(0)
}) as usize;
let mandates: Vec<storage::Mandate> = if let Some(limit) = mandate_constraints.limit {
let mandates: Vec<storage_types::Mandate> = if let Some(limit) = mandate_constraints.limit {
#[allow(clippy::as_conversions)]
mandates_iter
.skip(offset)
@ -278,10 +603,11 @@ impl MandateInterface for MockDb {
async fn insert_mandate(
&self,
mandate_new: storage::MandateNew,
) -> CustomResult<storage::Mandate, errors::StorageError> {
mandate_new: storage_types::MandateNew,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage_types::Mandate, errors::StorageError> {
let mut mandates = self.mandates.lock().await;
let mandate = storage::Mandate {
let mandate = storage_types::Mandate {
id: i32::try_from(mandates.len()).change_context(errors::StorageError::MockDbError)?,
mandate_id: mandate_new.mandate_id.clone(),
customer_id: mandate_new.customer_id,

View File

@ -298,6 +298,7 @@ pub async fn find_payment_intent_from_mandate_id_type(
.find_mandate_by_merchant_id_mandate_id(
&merchant_account.merchant_id,
mandate_id.as_str(),
merchant_account.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?,
@ -305,6 +306,7 @@ pub async fn find_payment_intent_from_mandate_id_type(
.find_mandate_by_merchant_id_connector_mandate_id(
&merchant_account.merchant_id,
connector_mandate_id.as_str(),
merchant_account.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?,