feat(payouts): implement KVRouterStore (#3889)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: Kashif <mohammed.kashif@juspay.in>
This commit is contained in:
Kashif
2024-03-19 15:33:37 +05:30
committed by GitHub
parent 3eb464250e
commit 944089d691
59 changed files with 2176 additions and 782 deletions

View File

@ -14,17 +14,23 @@ mod lookup;
pub mod metrics;
pub mod mock_db;
pub mod payments;
#[cfg(feature = "payouts")]
pub mod payouts;
pub mod redis;
pub mod refund;
mod reverse_lookup;
mod utils;
use common_utils::errors::CustomResult;
#[cfg(not(feature = "payouts"))]
use data_models::{PayoutAttemptInterface, PayoutsInterface};
use database::store::PgPool;
pub use mock_db::MockDb;
use redis_interface::{errors::RedisError, SaddReply};
pub use crate::database::store::DatabaseStore;
#[cfg(not(feature = "payouts"))]
pub use crate::database::store::Store;
#[derive(Debug, Clone)]
pub struct RouterStore<T: DatabaseStore> {
@ -331,3 +337,35 @@ impl UniqueConstraints for diesel_models::ReverseLookup {
"ReverseLookup"
}
}
#[cfg(feature = "payouts")]
impl UniqueConstraints for diesel_models::Payouts {
fn unique_constraints(&self) -> Vec<String> {
vec![format!("po_{}_{}", self.merchant_id, self.payout_id)]
}
fn table_name(&self) -> &str {
"Payouts"
}
}
#[cfg(feature = "payouts")]
impl UniqueConstraints for diesel_models::PayoutAttempt {
fn unique_constraints(&self) -> Vec<String> {
vec![format!(
"poa_{}_{}",
self.merchant_id, self.payout_attempt_id
)]
}
fn table_name(&self) -> &str {
"PayoutAttempt"
}
}
#[cfg(not(feature = "payouts"))]
impl<T: DatabaseStore> PayoutAttemptInterface for KVRouterStore<T> {}
#[cfg(not(feature = "payouts"))]
impl<T: DatabaseStore> PayoutAttemptInterface for RouterStore<T> {}
#[cfg(not(feature = "payouts"))]
impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {}
#[cfg(not(feature = "payouts"))]
impl<T: DatabaseStore> PayoutsInterface for RouterStore<T> {}

View File

@ -13,7 +13,13 @@ use crate::redis::RedisStore;
pub mod payment_attempt;
pub mod payment_intent;
#[cfg(feature = "payouts")]
pub mod payout_attempt;
#[cfg(feature = "payouts")]
pub mod payouts;
pub mod redis_conn;
#[cfg(not(feature = "payouts"))]
use data_models::{PayoutAttemptInterface, PayoutsInterface};
#[derive(Clone)]
pub struct MockDb {
@ -45,6 +51,10 @@ pub struct MockDb {
pub user_roles: Arc<Mutex<Vec<store::user_role::UserRole>>>,
pub authorizations: Arc<Mutex<Vec<store::authorization::Authorization>>>,
pub dashboard_metadata: Arc<Mutex<Vec<store::user::dashboard_metadata::DashboardMetadata>>>,
#[cfg(feature = "payouts")]
pub payout_attempt: Arc<Mutex<Vec<store::payout_attempt::PayoutAttempt>>>,
#[cfg(feature = "payouts")]
pub payouts: Arc<Mutex<Vec<store::payouts::Payouts>>>,
pub authentications: Arc<Mutex<Vec<store::authentication::Authentication>>>,
pub roles: Arc<Mutex<Vec<store::role::Role>>>,
}
@ -84,8 +94,18 @@ impl MockDb {
user_roles: Default::default(),
authorizations: Default::default(),
dashboard_metadata: Default::default(),
#[cfg(feature = "payouts")]
payout_attempt: Default::default(),
#[cfg(feature = "payouts")]
payouts: Default::default(),
authentications: Default::default(),
roles: Default::default(),
})
}
}
#[cfg(not(feature = "payouts"))]
impl PayoutsInterface for MockDb {}
#[cfg(not(feature = "payouts"))]
impl PayoutAttemptInterface for MockDb {}

View File

@ -0,0 +1,42 @@
use common_utils::errors::CustomResult;
use data_models::{
errors::StorageError,
payouts::payout_attempt::{
PayoutAttempt, PayoutAttemptInterface, PayoutAttemptNew, PayoutAttemptUpdate,
},
};
use diesel_models::enums as storage_enums;
use super::MockDb;
#[async_trait::async_trait]
impl PayoutAttemptInterface for MockDb {
async fn update_payout_attempt(
&self,
_this: &PayoutAttempt,
_payout_attempt_update: PayoutAttemptUpdate,
_storage_scheme: storage_enums::MerchantStorageScheme,
) -> CustomResult<PayoutAttempt, StorageError> {
// TODO: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
async fn insert_payout_attempt(
&self,
_payout: PayoutAttemptNew,
_storage_scheme: storage_enums::MerchantStorageScheme,
) -> CustomResult<PayoutAttempt, StorageError> {
// TODO: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
async fn find_payout_attempt_by_merchant_id_payout_attempt_id(
&self,
_merchant_id: &str,
_payout_attempt_id: &str,
_storage_scheme: storage_enums::MerchantStorageScheme,
) -> CustomResult<PayoutAttempt, StorageError> {
// TODO: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
}

View File

@ -0,0 +1,50 @@
use common_utils::errors::CustomResult;
use data_models::{
errors::StorageError,
payouts::payouts::{Payouts, PayoutsInterface, PayoutsNew, PayoutsUpdate},
};
use diesel_models::enums as storage_enums;
use super::MockDb;
#[async_trait::async_trait]
impl PayoutsInterface for MockDb {
async fn find_payout_by_merchant_id_payout_id(
&self,
_merchant_id: &str,
_payout_id: &str,
_storage_scheme: storage_enums::MerchantStorageScheme,
) -> CustomResult<Payouts, StorageError> {
// TODO: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
async fn update_payout(
&self,
_this: &Payouts,
_payout_update: PayoutsUpdate,
_storage_scheme: storage_enums::MerchantStorageScheme,
) -> CustomResult<Payouts, StorageError> {
// TODO: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
async fn insert_payout(
&self,
_payout: PayoutsNew,
_storage_scheme: storage_enums::MerchantStorageScheme,
) -> CustomResult<Payouts, StorageError> {
// TODO: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
async fn find_optional_payout_by_merchant_id_payout_id(
&self,
_merchant_id: &str,
_payout_id: &str,
_storage_scheme: storage_enums::MerchantStorageScheme,
) -> CustomResult<Option<Payouts>, StorageError> {
// TODO: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
}

View File

@ -0,0 +1,10 @@
pub mod payout_attempt;
#[allow(clippy::module_inception)]
pub mod payouts;
use diesel_models::{payout_attempt::PayoutAttempt, payouts::Payouts};
use crate::redis::kv_store::KvStorePartition;
impl KvStorePartition for Payouts {}
impl KvStorePartition for PayoutAttempt {}

View File

@ -0,0 +1,435 @@
use common_utils::{ext_traits::Encode, fallback_reverse_lookup_not_found};
use data_models::{
errors,
payouts::payout_attempt::{
PayoutAttempt, PayoutAttemptInterface, PayoutAttemptNew, PayoutAttemptUpdate,
},
};
use diesel_models::{
enums::MerchantStorageScheme,
kv,
payout_attempt::{
PayoutAttempt as DieselPayoutAttempt, PayoutAttemptNew as DieselPayoutAttemptNew,
PayoutAttemptUpdate as DieselPayoutAttemptUpdate,
},
ReverseLookupNew,
};
use error_stack::{IntoReport, ResultExt};
use redis_interface::HsetnxReply;
use router_env::{instrument, tracing};
use crate::{
diesel_error_to_data_error,
errors::RedisErrorExt,
lookup::ReverseLookupInterface,
redis::kv_store::{kv_wrapper, KvOperation},
utils::{self, pg_connection_read, pg_connection_write},
DataModelExt, DatabaseStore, KVRouterStore,
};
#[async_trait::async_trait]
impl<T: DatabaseStore> PayoutAttemptInterface for KVRouterStore<T> {
#[instrument(skip_all)]
async fn insert_payout_attempt(
&self,
new_payout_attempt: PayoutAttemptNew,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PayoutAttempt, errors::StorageError> {
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store
.insert_payout_attempt(new_payout_attempt, storage_scheme)
.await
}
MerchantStorageScheme::RedisKv => {
let key = format!(
"mid_{}_poa_{}",
new_payout_attempt.merchant_id, new_payout_attempt.payout_id
);
let now = common_utils::date_time::now();
let created_attempt = PayoutAttempt {
payout_attempt_id: new_payout_attempt.payout_attempt_id.clone(),
payout_id: new_payout_attempt.payout_id.clone(),
customer_id: new_payout_attempt.customer_id.clone(),
merchant_id: new_payout_attempt.merchant_id.clone(),
address_id: new_payout_attempt.address_id.clone(),
connector: new_payout_attempt.connector.clone(),
connector_payout_id: new_payout_attempt.connector_payout_id.clone(),
payout_token: new_payout_attempt.payout_token.clone(),
status: new_payout_attempt.status,
is_eligible: new_payout_attempt.is_eligible,
error_message: new_payout_attempt.error_message.clone(),
error_code: new_payout_attempt.error_code.clone(),
business_country: new_payout_attempt.business_country,
business_label: new_payout_attempt.business_label.clone(),
created_at: new_payout_attempt.created_at.unwrap_or(now),
last_modified_at: new_payout_attempt.last_modified_at.unwrap_or(now),
profile_id: new_payout_attempt.profile_id.clone(),
merchant_connector_id: new_payout_attempt.merchant_connector_id.clone(),
routing_info: new_payout_attempt.routing_info.clone(),
};
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Insert {
insertable: kv::Insertable::PayoutAttempt(
new_payout_attempt.to_storage_model(),
),
},
};
// Reverse lookup for payout_attempt_id
let field = format!("poa_{}", created_attempt.payout_attempt_id);
let reverse_lookup = ReverseLookupNew {
lookup_id: format!(
"poa_{}_{}",
&created_attempt.merchant_id, &created_attempt.payout_attempt_id,
),
pk_id: key.clone(),
sk_id: field.clone(),
source: "payout_attempt".to_string(),
updated_by: storage_scheme.to_string(),
};
self.insert_reverse_lookup(reverse_lookup, storage_scheme)
.await?;
match kv_wrapper::<DieselPayoutAttempt, _, _>(
self,
KvOperation::<DieselPayoutAttempt>::HSetNx(
&field,
&created_attempt.clone().to_storage_model(),
redis_entry,
),
&key,
)
.await
.map_err(|err| err.to_redis_failed_response(&key))?
.try_into_hsetnx()
{
Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue {
entity: "payout attempt",
key: Some(key),
})
.into_report(),
Ok(HsetnxReply::KeySet) => Ok(created_attempt),
Err(error) => Err(error.change_context(errors::StorageError::KVError)),
}
}
}
}
#[instrument(skip_all)]
async fn update_payout_attempt(
&self,
this: &PayoutAttempt,
payout_update: PayoutAttemptUpdate,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PayoutAttempt, errors::StorageError> {
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store
.update_payout_attempt(this, payout_update, storage_scheme)
.await
}
MerchantStorageScheme::RedisKv => {
let key = format!("mid_{}_poa_{}", this.merchant_id, this.payout_id);
let field = format!("poa_{}", this.payout_attempt_id);
let diesel_payout_update = payout_update.to_storage_model();
let origin_diesel_payout = this.clone().to_storage_model();
let diesel_payout = diesel_payout_update
.clone()
.apply_changeset(origin_diesel_payout.clone());
// Check for database presence as well Maybe use a read replica here ?
let redis_value = diesel_payout
.encode_to_string_of_json()
.change_context(errors::StorageError::SerializationFailed)?;
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Update {
updatable: kv::Updateable::PayoutAttemptUpdate(
kv::PayoutAttemptUpdateMems {
orig: origin_diesel_payout,
update_data: diesel_payout_update,
},
),
},
};
kv_wrapper::<(), _, _>(
self,
KvOperation::<DieselPayoutAttempt>::Hset((&field, redis_value), redis_entry),
&key,
)
.await
.map_err(|err| err.to_redis_failed_response(&key))?
.try_into_hset()
.change_context(errors::StorageError::KVError)?;
Ok(PayoutAttempt::from_storage_model(diesel_payout))
}
}
}
#[instrument(skip_all)]
async fn find_payout_attempt_by_merchant_id_payout_attempt_id(
&self,
merchant_id: &str,
payout_attempt_id: &str,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PayoutAttempt, errors::StorageError> {
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store
.find_payout_attempt_by_merchant_id_payout_attempt_id(
merchant_id,
payout_attempt_id,
storage_scheme,
)
.await
}
MerchantStorageScheme::RedisKv => {
let lookup_id = format!("poa_{merchant_id}_{payout_attempt_id}");
let lookup = fallback_reverse_lookup_not_found!(
self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
.await,
self.router_store
.find_payout_attempt_by_merchant_id_payout_attempt_id(
merchant_id,
payout_attempt_id,
storage_scheme
)
.await
);
let key = &lookup.pk_id;
Box::pin(utils::try_redis_get_else_try_database_get(
async {
kv_wrapper(
self,
KvOperation::<DieselPayoutAttempt>::HGet(&lookup.sk_id),
key,
)
.await?
.try_into_hget()
},
|| async {
self.router_store
.find_payout_attempt_by_merchant_id_payout_attempt_id(
merchant_id,
payout_attempt_id,
storage_scheme,
)
.await
},
))
.await
}
}
}
}
#[async_trait::async_trait]
impl<T: DatabaseStore> PayoutAttemptInterface for crate::RouterStore<T> {
#[instrument(skip_all)]
async fn insert_payout_attempt(
&self,
new: PayoutAttemptNew,
_storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PayoutAttempt, errors::StorageError> {
let conn = pg_connection_write(self).await?;
new.to_storage_model()
.insert(&conn)
.await
.map_err(|er| {
let new_err = diesel_error_to_data_error(er.current_context());
er.change_context(new_err)
})
.map(PayoutAttempt::from_storage_model)
}
#[instrument(skip_all)]
async fn update_payout_attempt(
&self,
this: &PayoutAttempt,
payout: PayoutAttemptUpdate,
_storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PayoutAttempt, errors::StorageError> {
let conn = pg_connection_write(self).await?;
this.clone()
.to_storage_model()
.update_with_attempt_id(&conn, payout.to_storage_model())
.await
.map_err(|er| {
let new_err = diesel_error_to_data_error(er.current_context());
er.change_context(new_err)
})
.map(PayoutAttempt::from_storage_model)
}
#[instrument(skip_all)]
async fn find_payout_attempt_by_merchant_id_payout_attempt_id(
&self,
merchant_id: &str,
payout_attempt_id: &str,
_storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PayoutAttempt, errors::StorageError> {
let conn = pg_connection_read(self).await?;
DieselPayoutAttempt::find_by_merchant_id_payout_attempt_id(
&conn,
merchant_id,
payout_attempt_id,
)
.await
.map(PayoutAttempt::from_storage_model)
.map_err(|er| {
let new_err = diesel_error_to_data_error(er.current_context());
er.change_context(new_err)
})
}
}
impl DataModelExt for PayoutAttempt {
type StorageModel = DieselPayoutAttempt;
fn to_storage_model(self) -> Self::StorageModel {
DieselPayoutAttempt {
payout_attempt_id: self.payout_attempt_id,
payout_id: self.payout_id,
customer_id: self.customer_id,
merchant_id: self.merchant_id,
address_id: self.address_id,
connector: self.connector,
connector_payout_id: self.connector_payout_id,
payout_token: self.payout_token,
status: self.status,
is_eligible: self.is_eligible,
error_message: self.error_message,
error_code: self.error_code,
business_country: self.business_country,
business_label: self.business_label,
created_at: self.created_at,
last_modified_at: self.last_modified_at,
profile_id: self.profile_id,
merchant_connector_id: self.merchant_connector_id,
routing_info: self.routing_info,
}
}
fn from_storage_model(storage_model: Self::StorageModel) -> Self {
Self {
payout_attempt_id: storage_model.payout_attempt_id,
payout_id: storage_model.payout_id,
customer_id: storage_model.customer_id,
merchant_id: storage_model.merchant_id,
address_id: storage_model.address_id,
connector: storage_model.connector,
connector_payout_id: storage_model.connector_payout_id,
payout_token: storage_model.payout_token,
status: storage_model.status,
is_eligible: storage_model.is_eligible,
error_message: storage_model.error_message,
error_code: storage_model.error_code,
business_country: storage_model.business_country,
business_label: storage_model.business_label,
created_at: storage_model.created_at,
last_modified_at: storage_model.last_modified_at,
profile_id: storage_model.profile_id,
merchant_connector_id: storage_model.merchant_connector_id,
routing_info: storage_model.routing_info,
}
}
}
impl DataModelExt for PayoutAttemptNew {
type StorageModel = DieselPayoutAttemptNew;
fn to_storage_model(self) -> Self::StorageModel {
DieselPayoutAttemptNew {
payout_attempt_id: self.payout_attempt_id,
payout_id: self.payout_id,
customer_id: self.customer_id,
merchant_id: self.merchant_id,
address_id: self.address_id,
connector: self.connector,
connector_payout_id: self.connector_payout_id,
payout_token: self.payout_token,
status: self.status,
is_eligible: self.is_eligible,
error_message: self.error_message,
error_code: self.error_code,
business_country: self.business_country,
business_label: self.business_label,
created_at: self.created_at,
last_modified_at: self.last_modified_at,
profile_id: self.profile_id,
merchant_connector_id: self.merchant_connector_id,
routing_info: self.routing_info,
}
}
fn from_storage_model(storage_model: Self::StorageModel) -> Self {
Self {
payout_attempt_id: storage_model.payout_attempt_id,
payout_id: storage_model.payout_id,
customer_id: storage_model.customer_id,
merchant_id: storage_model.merchant_id,
address_id: storage_model.address_id,
connector: storage_model.connector,
connector_payout_id: storage_model.connector_payout_id,
payout_token: storage_model.payout_token,
status: storage_model.status,
is_eligible: storage_model.is_eligible,
error_message: storage_model.error_message,
error_code: storage_model.error_code,
business_country: storage_model.business_country,
business_label: storage_model.business_label,
created_at: storage_model.created_at,
last_modified_at: storage_model.last_modified_at,
profile_id: storage_model.profile_id,
merchant_connector_id: storage_model.merchant_connector_id,
routing_info: storage_model.routing_info,
}
}
}
impl DataModelExt for PayoutAttemptUpdate {
type StorageModel = DieselPayoutAttemptUpdate;
fn to_storage_model(self) -> Self::StorageModel {
match self {
Self::StatusUpdate {
connector_payout_id,
status,
error_message,
error_code,
is_eligible,
} => DieselPayoutAttemptUpdate::StatusUpdate {
connector_payout_id,
status,
error_message,
error_code,
is_eligible,
},
Self::PayoutTokenUpdate { payout_token } => {
DieselPayoutAttemptUpdate::PayoutTokenUpdate { payout_token }
}
Self::BusinessUpdate {
business_country,
business_label,
} => DieselPayoutAttemptUpdate::BusinessUpdate {
business_country,
business_label,
},
Self::UpdateRouting {
connector,
routing_info,
} => DieselPayoutAttemptUpdate::UpdateRouting {
connector,
routing_info,
},
}
}
#[allow(clippy::todo)]
fn from_storage_model(_storage_model: Self::StorageModel) -> Self {
todo!("Reverse map should no longer be needed")
}
}

View File

@ -0,0 +1,462 @@
use common_utils::ext_traits::Encode;
use data_models::{
errors::StorageError,
payouts::payouts::{Payouts, PayoutsInterface, PayoutsNew, PayoutsUpdate},
};
use diesel_models::{
enums::MerchantStorageScheme,
kv,
payouts::{
Payouts as DieselPayouts, PayoutsNew as DieselPayoutsNew,
PayoutsUpdate as DieselPayoutsUpdate,
},
};
use error_stack::{IntoReport, ResultExt};
use redis_interface::HsetnxReply;
use router_env::{instrument, tracing};
use crate::{
diesel_error_to_data_error,
errors::RedisErrorExt,
redis::kv_store::{kv_wrapper, KvOperation},
utils::{self, pg_connection_read, pg_connection_write},
DataModelExt, DatabaseStore, KVRouterStore,
};
#[async_trait::async_trait]
impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {
#[instrument(skip_all)]
async fn insert_payout(
&self,
new: PayoutsNew,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Payouts, StorageError> {
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store.insert_payout(new, storage_scheme).await
}
MerchantStorageScheme::RedisKv => {
let key = format!("mid_{}_po_{}", new.merchant_id, new.payout_id);
let field = format!("po_{}", new.payout_id);
let now = common_utils::date_time::now();
let created_payout = Payouts {
payout_id: new.payout_id.clone(),
merchant_id: new.merchant_id.clone(),
customer_id: new.customer_id.clone(),
address_id: new.address_id.clone(),
payout_type: new.payout_type,
payout_method_id: new.payout_method_id.clone(),
amount: new.amount,
destination_currency: new.destination_currency,
source_currency: new.source_currency,
description: new.description.clone(),
recurring: new.recurring,
auto_fulfill: new.auto_fulfill,
return_url: new.return_url.clone(),
entity_type: new.entity_type,
metadata: new.metadata.clone(),
created_at: new.created_at.unwrap_or(now),
last_modified_at: new.last_modified_at.unwrap_or(now),
profile_id: new.profile_id.clone(),
status: new.status,
attempt_count: new.attempt_count,
};
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Insert {
insertable: kv::Insertable::Payouts(new.to_storage_model()),
},
};
match kv_wrapper::<DieselPayouts, _, _>(
self,
KvOperation::<DieselPayouts>::HSetNx(
&field,
&created_payout.clone().to_storage_model(),
redis_entry,
),
&key,
)
.await
.map_err(|err| err.to_redis_failed_response(&key))?
.try_into_hsetnx()
{
Ok(HsetnxReply::KeyNotSet) => Err(StorageError::DuplicateValue {
entity: "payouts",
key: Some(key),
})
.into_report(),
Ok(HsetnxReply::KeySet) => Ok(created_payout),
Err(error) => Err(error.change_context(StorageError::KVError)),
}
}
}
}
#[instrument(skip_all)]
async fn update_payout(
&self,
this: &Payouts,
payout_update: PayoutsUpdate,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Payouts, StorageError> {
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store
.update_payout(this, payout_update, storage_scheme)
.await
}
MerchantStorageScheme::RedisKv => {
let key = format!("mid_{}_po_{}", this.merchant_id, this.payout_id);
let field = format!("po_{}", this.payout_id);
let diesel_payout_update = payout_update.to_storage_model();
let origin_diesel_payout = this.clone().to_storage_model();
let diesel_payout = diesel_payout_update
.clone()
.apply_changeset(origin_diesel_payout.clone());
// Check for database presence as well Maybe use a read replica here ?
let redis_value = diesel_payout
.encode_to_string_of_json()
.change_context(StorageError::SerializationFailed)?;
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Update {
updatable: kv::Updateable::PayoutsUpdate(kv::PayoutsUpdateMems {
orig: origin_diesel_payout,
update_data: diesel_payout_update,
}),
},
};
kv_wrapper::<(), _, _>(
self,
KvOperation::<DieselPayouts>::Hset((&field, redis_value), redis_entry),
&key,
)
.await
.map_err(|err| err.to_redis_failed_response(&key))?
.try_into_hset()
.change_context(StorageError::KVError)?;
Ok(Payouts::from_storage_model(diesel_payout))
}
}
}
#[instrument(skip_all)]
async fn find_payout_by_merchant_id_payout_id(
&self,
merchant_id: &str,
payout_id: &str,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Payouts, StorageError> {
let database_call = || async {
let conn = pg_connection_read(self).await?;
DieselPayouts::find_by_merchant_id_payout_id(&conn, merchant_id, payout_id)
.await
.map_err(|er| {
let new_err = diesel_error_to_data_error(er.current_context());
er.change_context(new_err)
})
};
match storage_scheme {
MerchantStorageScheme::PostgresOnly => database_call().await,
MerchantStorageScheme::RedisKv => {
let key = format!("mid_{merchant_id}_po_{payout_id}");
let field = format!("po_{payout_id}");
Box::pin(utils::try_redis_get_else_try_database_get(
async {
kv_wrapper::<DieselPayouts, _, _>(
self,
KvOperation::<DieselPayouts>::HGet(&field),
&key,
)
.await?
.try_into_hget()
},
database_call,
))
.await
}
}
.map(Payouts::from_storage_model)
}
#[instrument(skip_all)]
async fn find_optional_payout_by_merchant_id_payout_id(
&self,
merchant_id: &str,
payout_id: &str,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Option<Payouts>, StorageError> {
let database_call = || async {
let conn = pg_connection_read(self).await?;
DieselPayouts::find_optional_by_merchant_id_payout_id(&conn, merchant_id, payout_id)
.await
.map_err(|er| {
let new_err = diesel_error_to_data_error(er.current_context());
er.change_context(new_err)
})
};
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
let maybe_payouts = database_call().await?;
Ok(maybe_payouts.and_then(|payout| {
if payout.payout_id == payout_id {
Some(payout)
} else {
None
}
}))
}
MerchantStorageScheme::RedisKv => {
let key = format!("mid_{merchant_id}_po_{payout_id}");
let field = format!("po_{payout_id}");
Box::pin(utils::try_redis_get_else_try_database_get(
async {
kv_wrapper::<DieselPayouts, _, _>(
self,
KvOperation::<DieselPayouts>::HGet(&field),
&key,
)
.await?
.try_into_hget()
.map(Some)
},
database_call,
))
.await
}
}
.map(|payout| payout.map(Payouts::from_storage_model))
}
}
#[async_trait::async_trait]
impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
#[instrument(skip_all)]
async fn insert_payout(
&self,
new: PayoutsNew,
_storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Payouts, StorageError> {
let conn = pg_connection_write(self).await?;
new.to_storage_model()
.insert(&conn)
.await
.map_err(|er| {
let new_err = diesel_error_to_data_error(er.current_context());
er.change_context(new_err)
})
.map(Payouts::from_storage_model)
}
#[instrument(skip_all)]
async fn update_payout(
&self,
this: &Payouts,
payout: PayoutsUpdate,
_storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Payouts, StorageError> {
let conn = pg_connection_write(self).await?;
this.clone()
.to_storage_model()
.update(&conn, payout.to_storage_model())
.await
.map_err(|er| {
let new_err = diesel_error_to_data_error(er.current_context());
er.change_context(new_err)
})
.map(Payouts::from_storage_model)
}
#[instrument(skip_all)]
async fn find_payout_by_merchant_id_payout_id(
&self,
merchant_id: &str,
payout_id: &str,
_storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Payouts, StorageError> {
let conn = pg_connection_read(self).await?;
DieselPayouts::find_by_merchant_id_payout_id(&conn, merchant_id, payout_id)
.await
.map(Payouts::from_storage_model)
.map_err(|er| {
let new_err = diesel_error_to_data_error(er.current_context());
er.change_context(new_err)
})
}
#[instrument(skip_all)]
async fn find_optional_payout_by_merchant_id_payout_id(
&self,
merchant_id: &str,
payout_id: &str,
_storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Option<Payouts>, StorageError> {
let conn = pg_connection_read(self).await?;
DieselPayouts::find_optional_by_merchant_id_payout_id(&conn, merchant_id, payout_id)
.await
.map(|x| x.map(Payouts::from_storage_model))
.map_err(|er| {
let new_err = diesel_error_to_data_error(er.current_context());
er.change_context(new_err)
})
}
}
impl DataModelExt for Payouts {
type StorageModel = DieselPayouts;
fn to_storage_model(self) -> Self::StorageModel {
DieselPayouts {
payout_id: self.payout_id,
merchant_id: self.merchant_id,
customer_id: self.customer_id,
address_id: self.address_id,
payout_type: self.payout_type,
payout_method_id: self.payout_method_id,
amount: self.amount,
destination_currency: self.destination_currency,
source_currency: self.source_currency,
description: self.description,
recurring: self.recurring,
auto_fulfill: self.auto_fulfill,
return_url: self.return_url,
entity_type: self.entity_type,
metadata: self.metadata,
created_at: self.created_at,
last_modified_at: self.last_modified_at,
profile_id: self.profile_id,
status: self.status,
attempt_count: self.attempt_count,
}
}
fn from_storage_model(storage_model: Self::StorageModel) -> Self {
Self {
payout_id: storage_model.payout_id,
merchant_id: storage_model.merchant_id,
customer_id: storage_model.customer_id,
address_id: storage_model.address_id,
payout_type: storage_model.payout_type,
payout_method_id: storage_model.payout_method_id,
amount: storage_model.amount,
destination_currency: storage_model.destination_currency,
source_currency: storage_model.source_currency,
description: storage_model.description,
recurring: storage_model.recurring,
auto_fulfill: storage_model.auto_fulfill,
return_url: storage_model.return_url,
entity_type: storage_model.entity_type,
metadata: storage_model.metadata,
created_at: storage_model.created_at,
last_modified_at: storage_model.last_modified_at,
profile_id: storage_model.profile_id,
status: storage_model.status,
attempt_count: storage_model.attempt_count,
}
}
}
impl DataModelExt for PayoutsNew {
type StorageModel = DieselPayoutsNew;
fn to_storage_model(self) -> Self::StorageModel {
DieselPayoutsNew {
payout_id: self.payout_id,
merchant_id: self.merchant_id,
customer_id: self.customer_id,
address_id: self.address_id,
payout_type: self.payout_type,
payout_method_id: self.payout_method_id,
amount: self.amount,
destination_currency: self.destination_currency,
source_currency: self.source_currency,
description: self.description,
recurring: self.recurring,
auto_fulfill: self.auto_fulfill,
return_url: self.return_url,
entity_type: self.entity_type,
metadata: self.metadata,
created_at: self.created_at,
last_modified_at: self.last_modified_at,
profile_id: self.profile_id,
status: self.status,
attempt_count: self.attempt_count,
}
}
fn from_storage_model(storage_model: Self::StorageModel) -> Self {
Self {
payout_id: storage_model.payout_id,
merchant_id: storage_model.merchant_id,
customer_id: storage_model.customer_id,
address_id: storage_model.address_id,
payout_type: storage_model.payout_type,
payout_method_id: storage_model.payout_method_id,
amount: storage_model.amount,
destination_currency: storage_model.destination_currency,
source_currency: storage_model.source_currency,
description: storage_model.description,
recurring: storage_model.recurring,
auto_fulfill: storage_model.auto_fulfill,
return_url: storage_model.return_url,
entity_type: storage_model.entity_type,
metadata: storage_model.metadata,
created_at: storage_model.created_at,
last_modified_at: storage_model.last_modified_at,
profile_id: storage_model.profile_id,
status: storage_model.status,
attempt_count: storage_model.attempt_count,
}
}
}
impl DataModelExt for PayoutsUpdate {
type StorageModel = DieselPayoutsUpdate;
fn to_storage_model(self) -> Self::StorageModel {
match self {
Self::Update {
amount,
destination_currency,
source_currency,
description,
recurring,
auto_fulfill,
return_url,
entity_type,
metadata,
profile_id,
status,
} => DieselPayoutsUpdate::Update {
amount,
destination_currency,
source_currency,
description,
recurring,
auto_fulfill,
return_url,
entity_type,
metadata,
profile_id,
status,
},
Self::PayoutMethodIdUpdate { payout_method_id } => {
DieselPayoutsUpdate::PayoutMethodIdUpdate { payout_method_id }
}
Self::RecurringUpdate { recurring } => {
DieselPayoutsUpdate::RecurringUpdate { recurring }
}
Self::AttemptCountUpdate { attempt_count } => {
DieselPayoutsUpdate::AttemptCountUpdate { attempt_count }
}
}
}
#[allow(clippy::todo)]
fn from_storage_model(_storage_model: Self::StorageModel) -> Self {
todo!("Reverse map should no longer be needed")
}
}