feat(payouts): add domain type for PayoutId (#8395)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Kashif
2025-07-01 19:12:01 +05:30
committed by GitHub
parent 18a779f94d
commit a6e3d2c71e
58 changed files with 506 additions and 269 deletions

View File

@ -427,7 +427,7 @@ impl UniqueConstraints for diesel_models::Payouts {
vec![format!(
"po_{}_{}",
self.merchant_id.get_string_repr(),
self.payout_id
self.payout_id.get_string_repr()
)]
}
fn table_name(&self) -> &str {

View File

@ -65,4 +65,13 @@ impl PayoutAttemptInterface for MockDb {
> {
Err(StorageError::MockDbError)?
}
async fn find_payout_attempt_by_merchant_id_merchant_order_reference_id(
&self,
_merchant_id: &common_utils::id_type::MerchantId,
_merchant_order_reference_id: &str,
_storage_scheme: storage_enums::MerchantStorageScheme,
) -> CustomResult<PayoutAttempt, StorageError> {
Err(StorageError::MockDbError)?
}
}

View File

@ -13,7 +13,7 @@ impl PayoutsInterface for MockDb {
async fn find_payout_by_merchant_id_payout_id(
&self,
_merchant_id: &common_utils::id_type::MerchantId,
_payout_id: &str,
_payout_id: &common_utils::id_type::PayoutId,
_storage_scheme: storage_enums::MerchantStorageScheme,
) -> CustomResult<Payouts, StorageError> {
// TODO: Implement function for `MockDb`
@ -43,7 +43,7 @@ impl PayoutsInterface for MockDb {
async fn find_optional_payout_by_merchant_id_payout_id(
&self,
_merchant_id: &common_utils::id_type::MerchantId,
_payout_id: &str,
_payout_id: &common_utils::id_type::PayoutId,
_storage_scheme: storage_enums::MerchantStorageScheme,
) -> CustomResult<Option<Payouts>, StorageError> {
// TODO: Implement function for `MockDb`
@ -95,7 +95,7 @@ impl PayoutsInterface for MockDb {
async fn get_total_count_of_filtered_payouts(
&self,
_merchant_id: &common_utils::id_type::MerchantId,
_active_payout_ids: &[String],
_active_payout_ids: &[common_utils::id_type::PayoutId],
_connector: Option<Vec<api_models::enums::PayoutConnectors>>,
_currency: Option<Vec<storage_enums::Currency>>,
_status: Option<Vec<storage_enums::PayoutStatus>>,
@ -110,7 +110,7 @@ impl PayoutsInterface for MockDb {
&self,
_merchant_id: &common_utils::id_type::MerchantId,
_constraints: &hyperswitch_domain_models::payouts::PayoutFetchConstraints,
) -> CustomResult<Vec<String>, StorageError> {
) -> CustomResult<Vec<common_utils::id_type::PayoutId>, StorageError> {
// TODO: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}

View File

@ -60,7 +60,7 @@ impl<T: DatabaseStore> PayoutAttemptInterface for KVRouterStore<T> {
let payout_attempt_id = new_payout_attempt.payout_id.clone();
let key = PartitionKey::MerchantIdPayoutAttemptId {
merchant_id: &merchant_id,
payout_attempt_id: &payout_attempt_id,
payout_attempt_id: payout_attempt_id.get_string_repr(),
};
let key_str = key.to_string();
let created_attempt = PayoutAttempt {
@ -88,6 +88,9 @@ impl<T: DatabaseStore> PayoutAttemptInterface for KVRouterStore<T> {
routing_info: new_payout_attempt.routing_info.clone(),
unified_code: new_payout_attempt.unified_code.clone(),
unified_message: new_payout_attempt.unified_message.clone(),
merchant_order_reference_id: new_payout_attempt
.merchant_order_reference_id
.clone(),
};
let redis_entry = kv::TypedSql {
@ -149,7 +152,7 @@ impl<T: DatabaseStore> PayoutAttemptInterface for KVRouterStore<T> {
) -> error_stack::Result<PayoutAttempt, errors::StorageError> {
let key = PartitionKey::MerchantIdPayoutAttemptId {
merchant_id: &this.merchant_id,
payout_attempt_id: &this.payout_id,
payout_attempt_id: this.payout_id.get_string_repr(),
};
let field = format!("poa_{}", this.payout_attempt_id);
let storage_scheme = Box::pin(decide_storage_scheme::<_, DieselPayoutAttempt>(
@ -378,6 +381,22 @@ impl<T: DatabaseStore> PayoutAttemptInterface for KVRouterStore<T> {
.get_filters_for_payouts(payouts, merchant_id, storage_scheme)
.await
}
#[instrument(skip_all)]
async fn find_payout_attempt_by_merchant_id_merchant_order_reference_id(
&self,
merchant_id: &common_utils::id_type::MerchantId,
merchant_order_reference_id: &str,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PayoutAttempt, errors::StorageError> {
self.router_store
.find_payout_attempt_by_merchant_id_merchant_order_reference_id(
merchant_id,
merchant_order_reference_id,
storage_scheme,
)
.await
}
}
#[async_trait::async_trait]
@ -504,6 +523,27 @@ impl<T: DatabaseStore> PayoutAttemptInterface for crate::RouterStore<T> {
},
)
}
#[instrument(skip_all)]
async fn find_payout_attempt_by_merchant_id_merchant_order_reference_id(
&self,
merchant_id: &common_utils::id_type::MerchantId,
merchant_order_reference_id: &str,
_storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PayoutAttempt, errors::StorageError> {
let conn = pg_connection_read(self).await?;
DieselPayoutAttempt::find_by_merchant_id_merchant_order_reference_id(
&conn,
merchant_id,
merchant_order_reference_id,
)
.await
.map(PayoutAttempt::from_storage_model)
.map_err(|er| {
let new_err = diesel_error_to_data_error(*er.current_context());
er.change_context(new_err)
})
}
}
impl DataModelExt for PayoutAttempt {
@ -533,6 +573,7 @@ impl DataModelExt for PayoutAttempt {
unified_code: self.unified_code,
unified_message: self.unified_message,
additional_payout_method_data: self.additional_payout_method_data,
merchant_order_reference_id: self.merchant_order_reference_id,
}
}
@ -560,6 +601,7 @@ impl DataModelExt for PayoutAttempt {
unified_code: storage_model.unified_code,
unified_message: storage_model.unified_message,
additional_payout_method_data: storage_model.additional_payout_method_data,
merchant_order_reference_id: storage_model.merchant_order_reference_id,
}
}
}
@ -590,6 +632,7 @@ impl DataModelExt for PayoutAttemptNew {
unified_code: self.unified_code,
unified_message: self.unified_message,
additional_payout_method_data: self.additional_payout_method_data,
merchant_order_reference_id: self.merchant_order_reference_id,
}
}
@ -617,6 +660,7 @@ impl DataModelExt for PayoutAttemptNew {
unified_code: storage_model.unified_code,
unified_message: storage_model.unified_message,
additional_payout_method_data: storage_model.additional_payout_method_data,
merchant_order_reference_id: storage_model.merchant_order_reference_id,
}
}
}

View File

@ -80,7 +80,7 @@ impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {
payout_id: &payout_id,
};
let key_str = key.to_string();
let field = format!("po_{}", new.payout_id);
let field = format!("po_{}", new.payout_id.get_string_repr());
let created_payout = Payouts {
payout_id: new.payout_id.clone(),
merchant_id: new.merchant_id.clone(),
@ -151,7 +151,7 @@ impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {
merchant_id: &this.merchant_id,
payout_id: &this.payout_id,
};
let field = format!("po_{}", this.payout_id);
let field = format!("po_{}", this.payout_id.get_string_repr());
let storage_scheme = Box::pin(decide_storage_scheme::<_, DieselPayouts>(
self,
storage_scheme,
@ -207,7 +207,7 @@ impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {
async fn find_payout_by_merchant_id_payout_id(
&self,
merchant_id: &common_utils::id_type::MerchantId,
payout_id: &str,
payout_id: &common_utils::id_type::PayoutId,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Payouts, StorageError> {
let database_call = || async {
@ -232,7 +232,7 @@ impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {
merchant_id,
payout_id,
};
let field = format!("po_{payout_id}");
let field = format!("po_{}", payout_id.get_string_repr());
Box::pin(utils::try_redis_get_else_try_database_get(
async {
Box::pin(kv_wrapper::<DieselPayouts, _, _>(
@ -255,7 +255,7 @@ impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {
async fn find_optional_payout_by_merchant_id_payout_id(
&self,
merchant_id: &common_utils::id_type::MerchantId,
payout_id: &str,
payout_id: &common_utils::id_type::PayoutId,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Option<Payouts>, StorageError> {
let database_call = || async {
@ -276,20 +276,14 @@ impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
let maybe_payouts = database_call().await?;
Ok(maybe_payouts.and_then(|payout| {
if payout.payout_id == payout_id {
Some(payout)
} else {
None
}
}))
Ok(maybe_payouts.filter(|payout| &payout.payout_id == payout_id))
}
MerchantStorageScheme::RedisKv => {
let key = PartitionKey::MerchantIdPayoutId {
merchant_id,
payout_id,
};
let field = format!("po_{payout_id}");
let field = format!("po_{}", payout_id.get_string_repr());
Box::pin(utils::try_redis_get_else_try_database_get(
async {
Box::pin(kv_wrapper::<DieselPayouts, _, _>(
@ -360,7 +354,7 @@ impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {
async fn get_total_count_of_filtered_payouts(
&self,
merchant_id: &common_utils::id_type::MerchantId,
active_payout_ids: &[String],
active_payout_ids: &[common_utils::id_type::PayoutId],
connector: Option<Vec<PayoutConnectors>>,
currency: Option<Vec<storage_enums::Currency>>,
status: Option<Vec<storage_enums::PayoutStatus>>,
@ -383,7 +377,7 @@ impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {
&self,
merchant_id: &common_utils::id_type::MerchantId,
constraints: &PayoutFetchConstraints,
) -> error_stack::Result<Vec<String>, StorageError> {
) -> error_stack::Result<Vec<common_utils::id_type::PayoutId>, StorageError> {
self.router_store
.filter_active_payout_ids_by_constraints(merchant_id, constraints)
.await
@ -434,7 +428,7 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
async fn find_payout_by_merchant_id_payout_id(
&self,
merchant_id: &common_utils::id_type::MerchantId,
payout_id: &str,
payout_id: &common_utils::id_type::PayoutId,
_storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Payouts, StorageError> {
let conn = pg_connection_read(self).await?;
@ -451,7 +445,7 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
async fn find_optional_payout_by_merchant_id_payout_id(
&self,
merchant_id: &common_utils::id_type::MerchantId,
payout_id: &str,
payout_id: &common_utils::id_type::PayoutId,
_storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Option<Payouts>, StorageError> {
let conn = pg_connection_read(self).await?;
@ -498,7 +492,7 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
query = query.filter(po_dsl::profile_id.eq(profile_id.clone()));
}
query = match (params.starting_at, &params.starting_after_id) {
query = match (params.starting_at, params.starting_after_id.as_ref()) {
(Some(starting_at), _) => query.filter(po_dsl::created_at.ge(starting_at)),
(None, Some(starting_after_id)) => {
// TODO: Fetch partial columns for this query since we only need some columns
@ -515,7 +509,7 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
(None, None) => query,
};
query = match (params.ending_at, &params.ending_before_id) {
query = match (params.ending_at, params.ending_before_id.as_ref()) {
(Some(ending_at), _) => query.filter(po_dsl::created_at.le(ending_at)),
(None, Some(ending_before_id)) => {
// TODO: Fetch partial columns for this query since we only need some columns
@ -619,10 +613,18 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
query = query.filter(po_dsl::profile_id.eq(profile_id.clone()));
}
query = match (params.starting_at, &params.starting_after_id) {
if let Some(merchant_order_reference_id_filter) =
&params.merchant_order_reference_id
{
query = query.filter(
poa_dsl::merchant_order_reference_id
.eq(merchant_order_reference_id_filter.clone()),
);
}
query = match (params.starting_at, params.starting_after_id.as_ref()) {
(Some(starting_at), _) => query.filter(po_dsl::created_at.ge(starting_at)),
(None, Some(starting_after_id)) => {
// TODO: Fetch partial columns for this query since we only need some columns
let starting_at = self
.find_payout_by_merchant_id_payout_id(
merchant_id,
@ -636,10 +638,9 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
(None, None) => query,
};
query = match (params.ending_at, &params.ending_before_id) {
query = match (params.ending_at, params.ending_before_id.as_ref()) {
(Some(ending_at), _) => query.filter(po_dsl::created_at.le(ending_at)),
(None, Some(ending_before_id)) => {
// TODO: Fetch partial columns for this query since we only need some columns
let ending_at = self
.find_payout_by_merchant_id_payout_id(
merchant_id,
@ -665,20 +666,24 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
.map(|c| c.iter().map(|c| c.to_string()).collect::<Vec<String>>());
query = match connectors {
Some(connectors) => query.filter(poa_dsl::connector.eq_any(connectors)),
None => query,
Some(conn_filters) if !conn_filters.is_empty() => {
query.filter(poa_dsl::connector.eq_any(conn_filters))
}
_ => query,
};
query = match &params.status {
Some(status) => query.filter(po_dsl::status.eq_any(status.clone())),
None => query,
Some(status_filters) if !status_filters.is_empty() => {
query.filter(po_dsl::status.eq_any(status_filters.clone()))
}
_ => query,
};
query = match &params.payout_method {
Some(payout_method) => {
Some(payout_method) if !payout_method.is_empty() => {
query.filter(po_dsl::payout_type.eq_any(payout_method.clone()))
}
None => query,
_ => query,
};
query
@ -760,7 +765,7 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
async fn get_total_count_of_filtered_payouts(
&self,
merchant_id: &common_utils::id_type::MerchantId,
active_payout_ids: &[String],
active_payout_ids: &[common_utils::id_type::PayoutId],
connector: Option<Vec<PayoutConnectors>>,
currency: Option<Vec<storage_enums::Currency>>,
status: Option<Vec<storage_enums::PayoutStatus>>,
@ -800,7 +805,7 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
&self,
merchant_id: &common_utils::id_type::MerchantId,
constraints: &PayoutFetchConstraints,
) -> error_stack::Result<Vec<String>, StorageError> {
) -> error_stack::Result<Vec<common_utils::id_type::PayoutId>, StorageError> {
let conn = connection::pg_connection_read(self).await?;
let conn = async_bb8_diesel::Connection::as_async_conn(&conn);
let mut query = DieselPayouts::table()
@ -868,8 +873,14 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
error_stack::report!(diesel_models::errors::DatabaseError::from(er))
.attach_printable("Error filtering payout records"),
)
.into()
})?
.into_iter()
.map(|s| {
common_utils::id_type::PayoutId::try_from(std::borrow::Cow::Owned(s))
.change_context(StorageError::DeserializationFailed)
.attach_printable("Failed to deserialize PayoutId from database string")
})
.collect::<error_stack::Result<Vec<_>, _>>()
}
#[cfg(all(feature = "olap", feature = "v2"))]
@ -878,7 +889,7 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
&self,
_merchant_id: &common_utils::id_type::MerchantId,
_constraints: &PayoutFetchConstraints,
) -> error_stack::Result<Vec<String>, StorageError> {
) -> error_stack::Result<Vec<common_utils::id_type::PayoutId>, StorageError> {
todo!()
}
}

View File

@ -41,7 +41,7 @@ pub enum PartitionKey<'a> {
},
MerchantIdPayoutId {
merchant_id: &'a common_utils::id_type::MerchantId,
payout_id: &'a str,
payout_id: &'a common_utils::id_type::PayoutId,
},
MerchantIdPayoutAttemptId {
merchant_id: &'a common_utils::id_type::MerchantId,
@ -93,8 +93,9 @@ impl std::fmt::Display for PartitionKey<'_> {
merchant_id,
payout_id,
} => f.write_str(&format!(
"mid_{}_po_{payout_id}",
merchant_id.get_string_repr()
"mid_{}_po_{}",
merchant_id.get_string_repr(),
payout_id.get_string_repr()
)),
PartitionKey::MerchantIdPayoutAttemptId {
merchant_id,