refactor(customer): refactor customer db with storage utils and move trait to domain_models and impl to storage_model (#7538)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Jagan
2025-04-15 12:53:33 +05:30
committed by GitHub
parent 92f6821316
commit e8e0b5df0e
8 changed files with 1114 additions and 1580 deletions

View File

@ -1,3 +1,4 @@
use common_enums::enums::MerchantStorageScheme;
#[cfg(all(feature = "v2", feature = "customer_v2"))]
use common_enums::DeleteStatus;
use common_utils::{
@ -11,13 +12,15 @@ use common_utils::{
Description,
},
};
use diesel_models::customers::CustomerUpdateInternal;
use diesel_models::{
customers as storage_types, customers::CustomerUpdateInternal, query::customers as query,
};
use error_stack::ResultExt;
use masking::{PeekInterface, Secret, SwitchStrategy};
use rustc_hash::FxHashMap;
use time::PrimitiveDateTime;
use crate::type_encryption as types;
use crate::{behaviour, merchant_key_store::MerchantKeyStore, type_encryption as types};
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
#[derive(Clone, Debug, router_derive::ToEncryption)]
@ -109,7 +112,7 @@ impl Customer {
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
#[async_trait::async_trait]
impl super::behaviour::Conversion for Customer {
impl behaviour::Conversion for Customer {
type DstType = diesel_models::customers::Customer;
type NewDstType = diesel_models::customers::CustomerNew;
async fn convert(self) -> CustomResult<Self::DstType, ValidationError> {
@ -213,7 +216,7 @@ impl super::behaviour::Conversion for Customer {
#[cfg(all(feature = "v2", feature = "customer_v2"))]
#[async_trait::async_trait]
impl super::behaviour::Conversion for Customer {
impl behaviour::Conversion for Customer {
type DstType = diesel_models::customers::Customer;
type NewDstType = diesel_models::customers::CustomerNew;
async fn convert(self) -> CustomResult<Self::DstType, ValidationError> {
@ -501,3 +504,136 @@ impl From<CustomerUpdate> for CustomerUpdateInternal {
}
}
}
pub struct CustomerListConstraints {
pub limit: u16,
pub offset: Option<u32>,
}
impl From<CustomerListConstraints> for query::CustomerListConstraints {
fn from(value: CustomerListConstraints) -> Self {
Self {
limit: i64::from(value.limit),
offset: value.offset.map(i64::from),
}
}
}
#[async_trait::async_trait]
pub trait CustomerInterface
where
Customer: behaviour::Conversion<
DstType = storage_types::Customer,
NewDstType = storage_types::CustomerNew,
>,
{
type Error;
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
async fn delete_customer_by_customer_id_merchant_id(
&self,
customer_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
) -> CustomResult<bool, Self::Error>;
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
async fn find_customer_optional_by_customer_id_merchant_id(
&self,
state: &KeyManagerState,
customer_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<Option<Customer>, Self::Error>;
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
async fn find_customer_optional_with_redacted_customer_details_by_customer_id_merchant_id(
&self,
state: &KeyManagerState,
customer_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<Option<Customer>, Self::Error>;
#[cfg(all(feature = "v2", feature = "customer_v2"))]
async fn find_optional_by_merchant_id_merchant_reference_id(
&self,
state: &KeyManagerState,
customer_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<Option<Customer>, Self::Error>;
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
#[allow(clippy::too_many_arguments)]
async fn update_customer_by_customer_id_merchant_id(
&self,
state: &KeyManagerState,
customer_id: id_type::CustomerId,
merchant_id: id_type::MerchantId,
customer: Customer,
customer_update: CustomerUpdate,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<Customer, Self::Error>;
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
async fn find_customer_by_customer_id_merchant_id(
&self,
state: &KeyManagerState,
customer_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<Customer, Self::Error>;
#[cfg(all(feature = "v2", feature = "customer_v2"))]
async fn find_customer_by_merchant_reference_id_merchant_id(
&self,
state: &KeyManagerState,
merchant_reference_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<Customer, Self::Error>;
async fn list_customers_by_merchant_id(
&self,
state: &KeyManagerState,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
constraints: CustomerListConstraints,
) -> CustomResult<Vec<Customer>, Self::Error>;
async fn insert_customer(
&self,
customer_data: Customer,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<Customer, Self::Error>;
#[cfg(all(feature = "v2", feature = "customer_v2"))]
#[allow(clippy::too_many_arguments)]
async fn update_customer_by_global_id(
&self,
state: &KeyManagerState,
id: &id_type::GlobalCustomerId,
customer: Customer,
merchant_id: &id_type::MerchantId,
customer_update: CustomerUpdate,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<Customer, Self::Error>;
#[cfg(all(feature = "v2", feature = "customer_v2"))]
async fn find_customer_by_global_id(
&self,
state: &KeyManagerState,
id: &id_type::GlobalCustomerId,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<Customer, Self::Error>;
}

View File

@ -96,7 +96,7 @@ pub trait StorageInterface:
+ blocklist_lookup::BlocklistLookupInterface
+ configs::ConfigInterface
+ capture::CaptureInterface
+ customers::CustomerInterface
+ customers::CustomerInterface<Error = StorageError>
+ dashboard_metadata::DashboardMetadataInterface
+ dispute::DisputeInterface
+ ephemeral_key::EphemeralKeyInterface

File diff suppressed because it is too large Load Diff

View File

@ -372,6 +372,7 @@ impl ConfigInterface for KafkaStore {
#[async_trait::async_trait]
impl CustomerInterface for KafkaStore {
type Error = errors::StorageError;
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
async fn delete_customer_by_customer_id_merchant_id(
&self,

View File

@ -1,5 +1,901 @@
use diesel_models::customers::Customer;
use common_utils::{id_type, pii};
use diesel_models::{customers, kv};
use error_stack::ResultExt;
use futures::future::try_join_all;
use hyperswitch_domain_models::{
behaviour::{Conversion, ReverseConversion},
customer as domain,
merchant_key_store::MerchantKeyStore,
};
use masking::PeekInterface;
use router_env::{instrument, tracing};
use crate::redis::kv_store::KvStorePartition;
use crate::{
diesel_error_to_data_error,
errors::StorageError,
kv_router_store,
redis::kv_store::{decide_storage_scheme, KvStorePartition, Op, PartitionKey},
store::enums::MerchantStorageScheme,
utils::{pg_connection_read, pg_connection_write},
CustomResult, DatabaseStore, KeyManagerState, MockDb, RouterStore,
};
impl KvStorePartition for Customer {}
impl KvStorePartition for customers::Customer {}
#[async_trait::async_trait]
impl<T: DatabaseStore> domain::CustomerInterface for kv_router_store::KVRouterStore<T> {
type Error = StorageError;
#[instrument(skip_all)]
// check customer not found in kv and fallback to db
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
async fn find_customer_optional_by_customer_id_merchant_id(
&self,
state: &KeyManagerState,
customer_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<Option<domain::Customer>, StorageError> {
let conn = pg_connection_read(self).await?;
let maybe_result = self
.find_optional_resource_by_id(
state,
key_store,
storage_scheme,
customers::Customer::find_optional_by_customer_id_merchant_id(
&conn,
customer_id,
merchant_id,
),
kv_router_store::FindResourceBy::Id(
format!("cust_{}", customer_id.get_string_repr()),
PartitionKey::MerchantIdCustomerId {
merchant_id,
customer_id,
},
),
)
.await?;
maybe_result.map_or(Ok(None), |customer: domain::Customer| match customer.name {
Some(ref name) if name.peek() == pii::REDACTED => Err(StorageError::CustomerRedacted)?,
_ => Ok(Some(customer)),
})
}
#[instrument(skip_all)]
// check customer not found in kv and fallback to db
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
async fn find_customer_optional_with_redacted_customer_details_by_customer_id_merchant_id(
&self,
state: &KeyManagerState,
customer_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<Option<domain::Customer>, StorageError> {
let conn = pg_connection_read(self).await?;
self.find_optional_resource_by_id(
state,
key_store,
storage_scheme,
customers::Customer::find_optional_by_customer_id_merchant_id(
&conn,
customer_id,
merchant_id,
),
kv_router_store::FindResourceBy::Id(
format!("cust_{}", customer_id.get_string_repr()),
PartitionKey::MerchantIdCustomerId {
merchant_id,
customer_id,
},
),
)
.await
}
#[cfg(all(feature = "v2", feature = "customer_v2"))]
async fn find_optional_by_merchant_id_merchant_reference_id(
&self,
state: &KeyManagerState,
merchant_reference_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<Option<domain::Customer>, StorageError> {
let conn = pg_connection_read(self).await?;
let maybe_result = self
.find_optional_resource_by_id(
state,
key_store,
storage_scheme,
customers::Customer::find_optional_by_merchant_id_merchant_reference_id(
&conn,
merchant_reference_id,
merchant_id,
),
kv_router_store::FindResourceBy::Id(
format!("cust_{}", merchant_reference_id.get_string_repr()),
PartitionKey::MerchantIdMerchantReferenceId {
merchant_id,
merchant_reference_id: merchant_reference_id.get_string_repr(),
},
),
)
.await?;
maybe_result.map_or(Ok(None), |customer: domain::Customer| match customer.name {
Some(ref name) if name.peek() == pii::REDACTED => Err(StorageError::CustomerRedacted)?,
_ => Ok(Some(customer)),
})
}
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
#[instrument(skip_all)]
async fn update_customer_by_customer_id_merchant_id(
&self,
state: &KeyManagerState,
customer_id: id_type::CustomerId,
merchant_id: id_type::MerchantId,
customer: domain::Customer,
customer_update: domain::CustomerUpdate,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<domain::Customer, StorageError> {
let conn = pg_connection_write(self).await?;
let customer = Conversion::convert(customer)
.await
.change_context(StorageError::EncryptionError)?;
let updated_customer = diesel_models::CustomerUpdateInternal::from(customer_update.clone())
.apply_changeset(customer.clone());
let key = PartitionKey::MerchantIdCustomerId {
merchant_id: &merchant_id,
customer_id: &customer_id,
};
let field = format!("cust_{}", customer_id.get_string_repr());
self.update_resource(
state,
key_store,
storage_scheme,
customers::Customer::update_by_customer_id_merchant_id(
&conn,
customer_id.clone(),
merchant_id.clone(),
customer_update.clone().into(),
),
updated_customer,
kv_router_store::UpdateResourceParams {
updateable: kv::Updateable::CustomerUpdate(kv::CustomerUpdateMems {
orig: customer.clone(),
update_data: customer_update.clone().into(),
}),
operation: Op::Update(key.clone(), &field, customer.updated_by.as_deref()),
},
)
.await
}
#[cfg(all(feature = "v2", feature = "customer_v2"))]
#[instrument(skip_all)]
async fn find_customer_by_merchant_reference_id_merchant_id(
&self,
state: &KeyManagerState,
merchant_reference_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<domain::Customer, StorageError> {
let conn = pg_connection_read(self).await?;
let result: domain::Customer = self
.find_resource_by_id(
state,
key_store,
storage_scheme,
customers::Customer::find_by_merchant_reference_id_merchant_id(
&conn,
merchant_reference_id,
merchant_id,
),
kv_router_store::FindResourceBy::Id(
format!("cust_{}", merchant_reference_id.get_string_repr()),
PartitionKey::MerchantIdMerchantReferenceId {
merchant_id,
merchant_reference_id: merchant_reference_id.get_string_repr(),
},
),
)
.await?;
match result.name {
Some(ref name) if name.peek() == pii::REDACTED => Err(StorageError::CustomerRedacted)?,
_ => Ok(result),
}
}
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
#[instrument(skip_all)]
async fn find_customer_by_customer_id_merchant_id(
&self,
state: &KeyManagerState,
customer_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<domain::Customer, StorageError> {
let conn = pg_connection_read(self).await?;
let result: domain::Customer = self
.find_resource_by_id(
state,
key_store,
storage_scheme,
customers::Customer::find_by_customer_id_merchant_id(
&conn,
customer_id,
merchant_id,
),
kv_router_store::FindResourceBy::Id(
format!("cust_{}", customer_id.get_string_repr()),
PartitionKey::MerchantIdCustomerId {
merchant_id,
customer_id,
},
),
)
.await?;
match result.name {
Some(ref name) if name.peek() == pii::REDACTED => Err(StorageError::CustomerRedacted)?,
_ => Ok(result),
}
}
#[instrument(skip_all)]
async fn list_customers_by_merchant_id(
&self,
state: &KeyManagerState,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
constraints: domain::CustomerListConstraints,
) -> CustomResult<Vec<domain::Customer>, StorageError> {
self.router_store
.list_customers_by_merchant_id(state, merchant_id, key_store, constraints)
.await
}
#[cfg(all(feature = "v2", feature = "customer_v2"))]
#[instrument(skip_all)]
async fn insert_customer(
&self,
customer_data: domain::Customer,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<domain::Customer, StorageError> {
let conn = pg_connection_write(self).await?;
let id = customer_data.id.clone();
let key = PartitionKey::GlobalId {
id: id.get_string_repr(),
};
let identifier = format!("cust_{}", id.get_string_repr());
let mut new_customer = customer_data
.construct_new()
.await
.change_context(StorageError::EncryptionError)?;
let storage_scheme = Box::pin(decide_storage_scheme::<_, customers::Customer>(
self,
storage_scheme,
Op::Insert,
))
.await;
new_customer.update_storage_scheme(storage_scheme);
self.insert_resource(
state,
key_store,
storage_scheme,
new_customer.clone().insert(&conn),
new_customer.clone().into(),
kv_router_store::InsertResourceParams {
insertable: kv::Insertable::Customer(new_customer.clone()),
reverse_lookups: vec![],
identifier,
key,
resource_type: "customer",
},
)
.await
}
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
#[instrument(skip_all)]
async fn insert_customer(
&self,
customer_data: domain::Customer,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<domain::Customer, StorageError> {
let conn = pg_connection_write(self).await?;
let key = PartitionKey::MerchantIdCustomerId {
merchant_id: &customer_data.merchant_id.clone(),
customer_id: &customer_data.customer_id.clone(),
};
let identifier = format!("cust_{}", customer_data.customer_id.get_string_repr());
let mut new_customer = customer_data
.construct_new()
.await
.change_context(StorageError::EncryptionError)?;
let storage_scheme = Box::pin(decide_storage_scheme::<_, customers::Customer>(
self,
storage_scheme,
Op::Insert,
))
.await;
new_customer.update_storage_scheme(storage_scheme);
let customer = new_customer.clone().into();
self.insert_resource(
state,
key_store,
storage_scheme,
new_customer.clone().insert(&conn),
customer,
kv_router_store::InsertResourceParams {
insertable: kv::Insertable::Customer(new_customer.clone()),
reverse_lookups: vec![],
identifier,
key,
resource_type: "customer",
},
)
.await
}
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
#[instrument(skip_all)]
async fn delete_customer_by_customer_id_merchant_id(
&self,
customer_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
) -> CustomResult<bool, StorageError> {
self.router_store
.delete_customer_by_customer_id_merchant_id(customer_id, merchant_id)
.await
}
#[cfg(all(feature = "v2", feature = "customer_v2"))]
#[instrument(skip_all)]
async fn find_customer_by_global_id(
&self,
state: &KeyManagerState,
id: &id_type::GlobalCustomerId,
_merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<domain::Customer, StorageError> {
let conn = pg_connection_read(self).await?;
let result: domain::Customer = self
.find_resource_by_id(
state,
key_store,
storage_scheme,
customers::Customer::find_by_global_id(&conn, id),
kv_router_store::FindResourceBy::Id(
format!("cust_{}", id.get_string_repr()),
PartitionKey::GlobalId {
id: id.get_string_repr(),
},
),
)
.await?;
if result.status == common_enums::DeleteStatus::Redacted {
Err(StorageError::CustomerRedacted)?
} else {
Ok(result)
}
}
#[cfg(all(feature = "v2", feature = "customer_v2"))]
#[instrument(skip_all)]
async fn update_customer_by_global_id(
&self,
state: &KeyManagerState,
id: &id_type::GlobalCustomerId,
customer: domain::Customer,
_merchant_id: &id_type::MerchantId,
customer_update: domain::CustomerUpdate,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<domain::Customer, StorageError> {
let conn = pg_connection_write(self).await?;
let customer = Conversion::convert(customer)
.await
.change_context(StorageError::EncryptionError)?;
let database_call =
customers::Customer::update_by_id(&conn, id.clone(), customer_update.clone().into());
let key = PartitionKey::GlobalId {
id: id.get_string_repr(),
};
let field = format!("cust_{}", id.get_string_repr());
self.update_resource(
state,
key_store,
storage_scheme,
database_call,
diesel_models::CustomerUpdateInternal::from(customer_update.clone())
.apply_changeset(customer.clone()),
kv_router_store::UpdateResourceParams {
updateable: kv::Updateable::CustomerUpdate(kv::CustomerUpdateMems {
orig: customer.clone(),
update_data: customer_update.into(),
}),
operation: Op::Update(key.clone(), &field, customer.updated_by.as_deref()),
},
)
.await
}
}
#[async_trait::async_trait]
impl<T: DatabaseStore> domain::CustomerInterface for RouterStore<T> {
type Error = StorageError;
#[instrument(skip_all)]
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
async fn find_customer_optional_by_customer_id_merchant_id(
&self,
state: &KeyManagerState,
customer_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<Option<domain::Customer>, StorageError> {
let conn = pg_connection_read(self).await?;
let maybe_customer: Option<domain::Customer> = self
.find_optional_resource(
state,
key_store,
customers::Customer::find_optional_by_customer_id_merchant_id(
&conn,
customer_id,
merchant_id,
),
)
.await?;
maybe_customer.map_or(Ok(None), |customer| {
// in the future, once #![feature(is_some_and)] is stable, we can make this more concise:
// `if customer.name.is_some_and(|ref name| name == pii::REDACTED) ...`
match customer.name {
Some(ref name) if name.peek() == pii::REDACTED => {
Err(StorageError::CustomerRedacted)?
}
_ => Ok(Some(customer)),
}
})
}
#[instrument(skip_all)]
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
async fn find_customer_optional_with_redacted_customer_details_by_customer_id_merchant_id(
&self,
state: &KeyManagerState,
customer_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<Option<domain::Customer>, StorageError> {
let conn = pg_connection_read(self).await?;
self.find_optional_resource(
state,
key_store,
customers::Customer::find_optional_by_customer_id_merchant_id(
&conn,
customer_id,
merchant_id,
),
)
.await
}
#[instrument(skip_all)]
#[cfg(all(feature = "v2", feature = "customer_v2"))]
async fn find_optional_by_merchant_id_merchant_reference_id(
&self,
state: &KeyManagerState,
customer_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<Option<domain::Customer>, StorageError> {
let conn = pg_connection_read(self).await?;
let maybe_customer: Option<domain::Customer> = self
.find_optional_resource(
state,
key_store,
customers::Customer::find_optional_by_merchant_id_merchant_reference_id(
&conn,
customer_id,
merchant_id,
),
)
.await?;
maybe_customer.map_or(Ok(None), |customer| {
// in the future, once #![feature(is_some_and)] is stable, we can make this more concise:
// `if customer.name.is_some_and(|ref name| name == pii::REDACTED) ...`
match customer.name {
Some(ref name) if name.peek() == pii::REDACTED => {
Err(StorageError::CustomerRedacted)?
}
_ => Ok(Some(customer)),
}
})
}
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
#[instrument(skip_all)]
async fn update_customer_by_customer_id_merchant_id(
&self,
state: &KeyManagerState,
customer_id: id_type::CustomerId,
merchant_id: id_type::MerchantId,
_customer: domain::Customer,
customer_update: domain::CustomerUpdate,
key_store: &MerchantKeyStore,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<domain::Customer, StorageError> {
let conn = pg_connection_write(self).await?;
self.call_database(
state,
key_store,
customers::Customer::update_by_customer_id_merchant_id(
&conn,
customer_id,
merchant_id.clone(),
customer_update.into(),
),
)
.await
}
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
#[instrument(skip_all)]
async fn find_customer_by_customer_id_merchant_id(
&self,
state: &KeyManagerState,
customer_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<domain::Customer, StorageError> {
let conn = pg_connection_read(self).await?;
let customer: domain::Customer = self
.call_database(
state,
key_store,
customers::Customer::find_by_customer_id_merchant_id(
&conn,
customer_id,
merchant_id,
),
)
.await?;
match customer.name {
Some(ref name) if name.peek() == pii::REDACTED => Err(StorageError::CustomerRedacted)?,
_ => Ok(customer),
}
}
#[cfg(all(feature = "v2", feature = "customer_v2"))]
#[instrument(skip_all)]
async fn find_customer_by_merchant_reference_id_merchant_id(
&self,
state: &KeyManagerState,
merchant_reference_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<domain::Customer, StorageError> {
let conn = pg_connection_read(self).await?;
let customer: domain::Customer = self
.call_database(
state,
key_store,
customers::Customer::find_by_merchant_reference_id_merchant_id(
&conn,
merchant_reference_id,
merchant_id,
),
)
.await?;
match customer.name {
Some(ref name) if name.peek() == pii::REDACTED => Err(StorageError::CustomerRedacted)?,
_ => Ok(customer),
}
}
#[instrument(skip_all)]
async fn list_customers_by_merchant_id(
&self,
state: &KeyManagerState,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
constraints: domain::CustomerListConstraints,
) -> CustomResult<Vec<domain::Customer>, StorageError> {
let conn = pg_connection_read(self).await?;
let customer_list_constraints =
diesel_models::query::customers::CustomerListConstraints::from(constraints);
self.find_resources(
state,
key_store,
customers::Customer::list_by_merchant_id(&conn, merchant_id, customer_list_constraints),
)
.await
}
#[instrument(skip_all)]
async fn insert_customer(
&self,
customer_data: domain::Customer,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<domain::Customer, StorageError> {
let conn = pg_connection_write(self).await?;
let customer_new = customer_data
.construct_new()
.await
.change_context(StorageError::EncryptionError)?;
self.call_database(state, key_store, customer_new.insert(&conn))
.await
}
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
#[instrument(skip_all)]
async fn delete_customer_by_customer_id_merchant_id(
&self,
customer_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
) -> CustomResult<bool, StorageError> {
let conn = pg_connection_write(self).await?;
customers::Customer::delete_by_customer_id_merchant_id(&conn, customer_id, merchant_id)
.await
.map_err(|error| {
let new_err = diesel_error_to_data_error(*error.current_context());
error.change_context(new_err)
})
}
#[cfg(all(feature = "v2", feature = "customer_v2"))]
#[allow(clippy::too_many_arguments)]
async fn update_customer_by_global_id(
&self,
state: &KeyManagerState,
id: &id_type::GlobalCustomerId,
customer: domain::Customer,
merchant_id: &id_type::MerchantId,
customer_update: domain::CustomerUpdate,
key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<domain::Customer, StorageError> {
let conn = pg_connection_write(self).await?;
self.call_database(
state,
key_store,
customers::Customer::update_by_id(&conn, id.clone(), customer_update.into()),
)
.await
}
#[cfg(all(feature = "v2", feature = "customer_v2"))]
#[instrument(skip_all)]
async fn find_customer_by_global_id(
&self,
state: &KeyManagerState,
id: &id_type::GlobalCustomerId,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<domain::Customer, StorageError> {
let conn = pg_connection_read(self).await?;
let customer: domain::Customer = self
.call_database(
state,
key_store,
customers::Customer::find_by_global_id(&conn, id),
)
.await?;
match customer.name {
Some(ref name) if name.peek() == pii::REDACTED => Err(StorageError::CustomerRedacted)?,
_ => Ok(customer),
}
}
}
#[async_trait::async_trait]
impl domain::CustomerInterface for MockDb {
type Error = StorageError;
#[allow(clippy::panic)]
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
async fn find_customer_optional_by_customer_id_merchant_id(
&self,
state: &KeyManagerState,
customer_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<Option<domain::Customer>, StorageError> {
let customers = self.customers.lock().await;
self.find_resource(state, key_store, customers, |customer| {
customer.customer_id == *customer_id && &customer.merchant_id == merchant_id
})
.await
}
#[allow(clippy::panic)]
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
async fn find_customer_optional_with_redacted_customer_details_by_customer_id_merchant_id(
&self,
state: &KeyManagerState,
customer_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<Option<domain::Customer>, StorageError> {
let customers = self.customers.lock().await;
self.find_resource(state, key_store, customers, |customer| {
customer.customer_id == *customer_id && &customer.merchant_id == merchant_id
})
.await
}
#[allow(clippy::panic)]
#[cfg(all(feature = "v2", feature = "customer_v2"))]
async fn find_optional_by_merchant_id_merchant_reference_id(
&self,
state: &KeyManagerState,
customer_id: &id_type::CustomerId,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<Option<domain::Customer>, StorageError> {
todo!()
}
async fn list_customers_by_merchant_id(
&self,
state: &KeyManagerState,
merchant_id: &id_type::MerchantId,
key_store: &MerchantKeyStore,
constraints: domain::CustomerListConstraints,
) -> CustomResult<Vec<domain::Customer>, StorageError> {
let customers = self.customers.lock().await;
let customers = try_join_all(
customers
.iter()
.filter(|customer| customer.merchant_id == *merchant_id)
.take(usize::from(constraints.limit))
.skip(usize::try_from(constraints.offset.unwrap_or(0)).unwrap_or(0))
.map(|customer| async {
customer
.to_owned()
.convert(
state,
key_store.key.get_inner(),
key_store.merchant_id.clone().into(),
)
.await
.change_context(StorageError::DecryptionError)
}),
)
.await?;
Ok(customers)
}
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
#[instrument(skip_all)]
async fn update_customer_by_customer_id_merchant_id(
&self,
_state: &KeyManagerState,
_customer_id: id_type::CustomerId,
_merchant_id: id_type::MerchantId,
_customer: domain::Customer,
_customer_update: domain::CustomerUpdate,
_key_store: &MerchantKeyStore,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<domain::Customer, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
async fn find_customer_by_customer_id_merchant_id(
&self,
_state: &KeyManagerState,
_customer_id: &id_type::CustomerId,
_merchant_id: &id_type::MerchantId,
_key_store: &MerchantKeyStore,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<domain::Customer, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
#[cfg(all(feature = "v2", feature = "customer_v2"))]
async fn find_customer_by_merchant_reference_id_merchant_id(
&self,
_state: &KeyManagerState,
_merchant_reference_id: &id_type::CustomerId,
_merchant_id: &id_type::MerchantId,
_key_store: &MerchantKeyStore,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<domain::Customer, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
#[allow(clippy::panic)]
async fn insert_customer(
&self,
customer_data: domain::Customer,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<domain::Customer, StorageError> {
let mut customers = self.customers.lock().await;
let customer = Conversion::convert(customer_data)
.await
.change_context(StorageError::EncryptionError)?;
customers.push(customer.clone());
customer
.convert(
state,
key_store.key.get_inner(),
key_store.merchant_id.clone().into(),
)
.await
.change_context(StorageError::DecryptionError)
}
#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))]
async fn delete_customer_by_customer_id_merchant_id(
&self,
_customer_id: &id_type::CustomerId,
_merchant_id: &id_type::MerchantId,
) -> CustomResult<bool, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
#[cfg(all(feature = "v2", feature = "customer_v2"))]
#[allow(clippy::too_many_arguments)]
async fn update_customer_by_global_id(
&self,
_state: &KeyManagerState,
_id: &id_type::GlobalCustomerId,
_customer: domain::Customer,
_merchant_id: &id_type::MerchantId,
_customer_update: domain::CustomerUpdate,
_key_store: &MerchantKeyStore,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<domain::Customer, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
#[cfg(all(feature = "v2", feature = "customer_v2"))]
async fn find_customer_by_global_id(
&self,
_state: &KeyManagerState,
_id: &id_type::GlobalCustomerId,
_merchant_id: &id_type::MerchantId,
_key_store: &MerchantKeyStore,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<domain::Customer, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
}

View File

@ -179,6 +179,37 @@ impl<T: DatabaseStore> RouterStore<T> {
.change_context(StorageError::DecryptionError)
}
pub async fn find_optional_resource<D, R, M>(
&self,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
execute_query_fut: R,
) -> error_stack::Result<Option<D>, StorageError>
where
D: Debug + Sync + Conversion,
R: futures::Future<
Output = error_stack::Result<Option<M>, diesel_models::errors::DatabaseError>,
> + Send,
M: ReverseConversion<D>,
{
match execute_query_fut.await.map_err(|error| {
let new_err = diesel_error_to_data_error(*error.current_context());
error.change_context(new_err)
})? {
Some(resource) => Ok(Some(
resource
.convert(
state,
key_store.key.get_inner(),
key_store.merchant_id.clone().into(),
)
.await
.change_context(StorageError::DecryptionError)?,
)),
None => Ok(None),
}
}
pub async fn find_resources<D, R, M>(
&self,
state: &KeyManagerState,

View File

@ -112,33 +112,56 @@ impl MockDb {
})
}
/// Returns an option of the resource if it exists
pub async fn find_resource<D, R>(
&self,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
resources: MutexGuard<'_, Vec<D>>,
filter_fn: impl Fn(&&D) -> bool,
) -> CustomResult<Option<R>, StorageError>
where
D: Sync + ReverseConversion<R> + Clone,
R: Conversion,
{
let resource = resources.iter().find(filter_fn).cloned();
match resource {
Some(res) => Ok(Some(
res.convert(
state,
key_store.key.get_inner(),
key_store.merchant_id.clone().into(),
)
.await
.change_context(StorageError::DecryptionError)?,
)),
None => Ok(None),
}
}
/// Throws errors when the requested resource is not found
pub async fn get_resource<D, R>(
&self,
state: &KeyManagerState,
key_store: &MerchantKeyStore,
resources: MutexGuard<'_, Vec<D>>,
filter_fn: impl Fn(&&D) -> bool,
error_message: String,
) -> CustomResult<R, StorageError>
where
D: Sync + ReverseConversion<R> + Clone,
R: Conversion,
{
let resource = resources.iter().find(filter_fn).cloned();
match resource {
Some(res) => Ok(res
.convert(
state,
key_store.key.get_inner(),
key_store.merchant_id.clone().into(),
)
.await
.change_context(StorageError::DecryptionError)?),
match self
.find_resource(state, key_store, resources, filter_fn)
.await?
{
Some(res) => Ok(res),
None => Err(StorageError::ValueNotFound(error_message).into()),
}
}
pub async fn find_resources<D, R>(
pub async fn get_resources<D, R>(
&self,
state: &KeyManagerState,
key_store: &MerchantKeyStore,

View File

@ -769,7 +769,7 @@ impl PaymentMethodInterface for MockDb {
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<DomainPaymentMethod, errors::StorageError> {
let payment_methods = self.payment_methods.lock().await;
self.find_resource::<PaymentMethod, _>(
self.get_resource::<PaymentMethod, _>(
state,
key_store,
payment_methods,
@ -788,7 +788,7 @@ impl PaymentMethodInterface for MockDb {
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<DomainPaymentMethod, errors::StorageError> {
let payment_methods = self.payment_methods.lock().await;
self.find_resource::<PaymentMethod, _>(
self.get_resource::<PaymentMethod, _>(
state,
key_store,
payment_methods,
@ -810,7 +810,7 @@ impl PaymentMethodInterface for MockDb {
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<DomainPaymentMethod, errors::StorageError> {
let payment_methods = self.payment_methods.lock().await;
self.find_resource::<PaymentMethod, _>(
self.get_resource::<PaymentMethod, _>(
state,
key_store,
payment_methods,
@ -885,7 +885,7 @@ impl PaymentMethodInterface for MockDb {
_limit: Option<i64>,
) -> CustomResult<Vec<DomainPaymentMethod>, errors::StorageError> {
let payment_methods = self.payment_methods.lock().await;
self.find_resources(
self.get_resources(
state,
key_store,
payment_methods,
@ -922,7 +922,7 @@ impl PaymentMethodInterface for MockDb {
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<Vec<DomainPaymentMethod>, errors::StorageError> {
let payment_methods = self.payment_methods.lock().await;
self.find_resources(
self.get_resources(
state,
key_store,
payment_methods,
@ -952,7 +952,7 @@ impl PaymentMethodInterface for MockDb {
pm.customer_id == *customer_id && pm.merchant_id == *merchant_id && pm.status == status
};
let error_message = "cannot find payment method".to_string();
self.find_resources(state, key_store, payment_methods, find_pm_by, error_message)
self.get_resources(state, key_store, payment_methods, find_pm_by, error_message)
.await
}
@ -1050,7 +1050,7 @@ impl PaymentMethodInterface for MockDb {
fingerprint_id: &str,
) -> CustomResult<DomainPaymentMethod, errors::StorageError> {
let payment_methods = self.payment_methods.lock().await;
self.find_resource::<PaymentMethod, _>(
self.get_resource::<PaymentMethod, _>(
state,
key_store,
payment_methods,