feat(router): Add Payments - List endpoint for v2 (#7191)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Anurag Thakur
2025-02-20 16:39:38 +05:30
committed by GitHub
parent 74bbf4bf27
commit d1f537e229
20 changed files with 1814 additions and 89 deletions

View File

@ -59,6 +59,22 @@ impl PaymentAttemptInterface for MockDb {
Err(StorageError::MockDbError)?
}
#[cfg(all(feature = "v2", feature = "olap"))]
async fn get_total_count_of_filtered_payment_attempts(
&self,
_merchant_id: &id_type::MerchantId,
_active_attempt_ids: &[String],
_connector: Option<api_models::enums::Connector>,
_payment_method_type: Option<common_enums::PaymentMethod>,
_payment_method_subtype: Option<common_enums::PaymentMethodType>,
_authentication_type: Option<common_enums::AuthenticationType>,
_merchanat_connector_id: Option<id_type::MerchantConnectorAccountId>,
_card_network: Option<storage_enums::CardNetwork>,
_storage_scheme: storage_enums::MerchantStorageScheme,
) -> CustomResult<i64, StorageError> {
Err(StorageError::MockDbError)?
}
#[cfg(feature = "v1")]
async fn find_payment_attempt_by_attempt_id_merchant_id(
&self,

View File

@ -28,6 +28,24 @@ impl PaymentIntentInterface for MockDb {
Err(StorageError::MockDbError)?
}
#[cfg(all(feature = "v2", feature = "olap"))]
async fn get_filtered_payment_intents_attempt(
&self,
state: &KeyManagerState,
merchant_id: &common_utils::id_type::MerchantId,
constraints: &hyperswitch_domain_models::payments::payment_intent::PaymentIntentFetchConstraints,
merchant_key_store: &MerchantKeyStore,
storage_scheme: storage_enums::MerchantStorageScheme,
) -> error_stack::Result<
Vec<(
PaymentIntent,
Option<hyperswitch_domain_models::payments::payment_attempt::PaymentAttempt>,
)>,
StorageError,
> {
Err(StorageError::MockDbError)?
}
#[cfg(all(feature = "v1", feature = "olap"))]
async fn filter_payment_intents_by_time_range_constraints(
&self,
@ -63,6 +81,17 @@ impl PaymentIntentInterface for MockDb {
Err(StorageError::MockDbError)?
}
#[cfg(all(feature = "v2", feature = "olap"))]
async fn get_filtered_active_attempt_ids_for_total_count(
&self,
_merchant_id: &common_utils::id_type::MerchantId,
_constraints: &hyperswitch_domain_models::payments::payment_intent::PaymentIntentFetchConstraints,
_storage_scheme: storage_enums::MerchantStorageScheme,
) -> error_stack::Result<Vec<Option<String>>, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
#[cfg(all(feature = "v1", feature = "olap"))]
async fn get_filtered_payment_intents_attempt(
&self,

View File

@ -507,6 +507,44 @@ impl<T: DatabaseStore> PaymentAttemptInterface for RouterStore<T> {
er.change_context(new_err)
})
}
#[cfg(all(feature = "v2", feature = "olap"))]
#[instrument(skip_all)]
async fn get_total_count_of_filtered_payment_attempts(
&self,
merchant_id: &common_utils::id_type::MerchantId,
active_attempt_ids: &[String],
connector: Option<api_models::enums::Connector>,
payment_method_type: Option<common_enums::PaymentMethod>,
payment_method_subtype: Option<common_enums::PaymentMethodType>,
authentication_type: Option<common_enums::AuthenticationType>,
merchant_connector_id: Option<common_utils::id_type::MerchantConnectorAccountId>,
card_network: Option<common_enums::CardNetwork>,
_storage_scheme: MerchantStorageScheme,
) -> CustomResult<i64, errors::StorageError> {
let conn = self
.db_store
.get_replica_pool()
.get()
.await
.change_context(errors::StorageError::DatabaseConnectionError)?;
DieselPaymentAttempt::get_total_count_of_attempts(
&conn,
merchant_id,
active_attempt_ids,
connector.map(|val| val.to_string()),
payment_method_type,
payment_method_subtype,
authentication_type,
merchant_connector_id,
card_network,
)
.await
.map_err(|er| {
let new_err = diesel_error_to_data_error(*er.current_context());
er.change_context(new_err)
})
}
}
#[async_trait::async_trait]
@ -1427,6 +1465,34 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
)
.await
}
#[cfg(all(feature = "v2", feature = "olap"))]
#[instrument(skip_all)]
async fn get_total_count_of_filtered_payment_attempts(
&self,
merchant_id: &common_utils::id_type::MerchantId,
active_attempt_ids: &[String],
connector: Option<api_models::enums::Connector>,
payment_method_type: Option<common_enums::PaymentMethod>,
payment_method_subtype: Option<common_enums::PaymentMethodType>,
authentication_type: Option<common_enums::AuthenticationType>,
merchant_connector_id: Option<common_utils::id_type::MerchantConnectorAccountId>,
card_network: Option<common_enums::CardNetwork>,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<i64, errors::StorageError> {
self.router_store
.get_total_count_of_filtered_payment_attempts(
merchant_id,
active_attempt_ids,
connector,
payment_method_type,
payment_method_subtype,
authentication_type,
merchant_connector_id,
card_network,
storage_scheme,
)
.await
}
}
impl DataModelExt for MandateAmountData {

View File

@ -164,6 +164,27 @@ impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
}
}
#[cfg(all(feature = "v2", feature = "olap"))]
#[instrument(skip_all)]
async fn get_filtered_payment_intents_attempt(
&self,
state: &KeyManagerState,
merchant_id: &common_utils::id_type::MerchantId,
constraints: &PaymentIntentFetchConstraints,
merchant_key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<(PaymentIntent, Option<PaymentAttempt>)>, StorageError> {
self.router_store
.get_filtered_payment_intents_attempt(
state,
merchant_id,
constraints,
merchant_key_store,
storage_scheme,
)
.await
}
#[cfg(feature = "v1")]
#[instrument(skip_all)]
async fn update_payment_intent(
@ -459,6 +480,23 @@ impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
)
.await
}
#[cfg(all(feature = "v2", feature = "olap"))]
async fn get_filtered_active_attempt_ids_for_total_count(
&self,
merchant_id: &common_utils::id_type::MerchantId,
constraints: &PaymentIntentFetchConstraints,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<Option<String>>, StorageError> {
self.router_store
.get_filtered_active_attempt_ids_for_total_count(
merchant_id,
constraints,
storage_scheme,
)
.await
}
#[cfg(feature = "v2")]
async fn find_payment_intent_by_merchant_reference_id_profile_id(
&self,
@ -1086,6 +1124,303 @@ impl<T: DatabaseStore> PaymentIntentInterface for crate::RouterStore<T> {
.await
}
#[cfg(all(feature = "v2", feature = "olap"))]
#[instrument(skip_all)]
async fn get_filtered_payment_intents_attempt(
&self,
state: &KeyManagerState,
merchant_id: &common_utils::id_type::MerchantId,
constraints: &PaymentIntentFetchConstraints,
merchant_key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<(PaymentIntent, Option<PaymentAttempt>)>, StorageError> {
use diesel::NullableExpressionMethods as _;
use futures::{future::try_join_all, FutureExt};
use crate::DataModelExt;
let conn = connection::pg_connection_read(self).await.switch()?;
let conn = async_bb8_diesel::Connection::as_async_conn(&conn);
let mut query = DieselPaymentIntent::table()
.filter(pi_dsl::merchant_id.eq(merchant_id.to_owned()))
.left_join(
payment_attempt_schema::table
.on(pi_dsl::active_attempt_id.eq(pa_dsl::id.nullable())),
)
// Filtering on merchant_id for payment_attempt is not required for v2 as payment_attempt_ids are globally unique
.into_boxed();
query = match constraints {
PaymentIntentFetchConstraints::Single { payment_intent_id } => {
query.filter(pi_dsl::id.eq(payment_intent_id.to_owned()))
}
PaymentIntentFetchConstraints::List(params) => {
query = match params.order {
Order {
on: SortOn::Amount,
by: SortBy::Asc,
} => query.order(pi_dsl::amount.asc()),
Order {
on: SortOn::Amount,
by: SortBy::Desc,
} => query.order(pi_dsl::amount.desc()),
Order {
on: SortOn::Created,
by: SortBy::Asc,
} => query.order(pi_dsl::created_at.asc()),
Order {
on: SortOn::Created,
by: SortBy::Desc,
} => query.order(pi_dsl::created_at.desc()),
};
if let Some(limit) = params.limit {
query = query.limit(limit.into());
}
if let Some(customer_id) = &params.customer_id {
query = query.filter(pi_dsl::customer_id.eq(customer_id.clone()));
}
if let Some(merchant_order_reference_id) = &params.merchant_order_reference_id {
query = query.filter(
pi_dsl::merchant_reference_id.eq(merchant_order_reference_id.clone()),
)
}
if let Some(profile_id) = &params.profile_id {
query = query.filter(pi_dsl::profile_id.eq(profile_id.clone()));
}
query = match (params.starting_at, &params.starting_after_id) {
(Some(starting_at), _) => query.filter(pi_dsl::created_at.ge(starting_at)),
(None, Some(starting_after_id)) => {
// TODO: Fetch partial columns for this query since we only need some columns
let starting_at = self
.find_payment_intent_by_id(
state,
starting_after_id,
merchant_key_store,
storage_scheme,
)
.await?
.created_at;
query.filter(pi_dsl::created_at.ge(starting_at))
}
(None, None) => query,
};
query = match (params.ending_at, &params.ending_before_id) {
(Some(ending_at), _) => query.filter(pi_dsl::created_at.le(ending_at)),
(None, Some(ending_before_id)) => {
// TODO: Fetch partial columns for this query since we only need some columns
let ending_at = self
.find_payment_intent_by_id(
state,
ending_before_id,
merchant_key_store,
storage_scheme,
)
.await?
.created_at;
query.filter(pi_dsl::created_at.le(ending_at))
}
(None, None) => query,
};
query = query.offset(params.offset.into());
query = match params.amount_filter {
Some(AmountFilter {
start_amount: Some(start),
end_amount: Some(end),
}) => query.filter(pi_dsl::amount.between(start, end)),
Some(AmountFilter {
start_amount: Some(start),
end_amount: None,
}) => query.filter(pi_dsl::amount.ge(start)),
Some(AmountFilter {
start_amount: None,
end_amount: Some(end),
}) => query.filter(pi_dsl::amount.le(end)),
_ => query,
};
query = match &params.currency {
Some(currency) => query.filter(pi_dsl::currency.eq(*currency)),
None => query,
};
query = match &params.connector {
Some(connector) => query.filter(pa_dsl::connector.eq(*connector)),
None => query,
};
query = match &params.status {
Some(status) => query.filter(pi_dsl::status.eq(*status)),
None => query,
};
query = match &params.payment_method_type {
Some(payment_method_type) => {
query.filter(pa_dsl::payment_method_type_v2.eq(*payment_method_type))
}
None => query,
};
query = match &params.payment_method_subtype {
Some(payment_method_subtype) => {
query.filter(pa_dsl::payment_method_subtype.eq(*payment_method_subtype))
}
None => query,
};
query = match &params.authentication_type {
Some(authentication_type) => {
query.filter(pa_dsl::authentication_type.eq(*authentication_type))
}
None => query,
};
query = match &params.merchant_connector_id {
Some(merchant_connector_id) => query
.filter(pa_dsl::merchant_connector_id.eq(merchant_connector_id.clone())),
None => query,
};
if let Some(card_network) = &params.card_network {
query = query.filter(pa_dsl::card_network.eq(card_network.clone()));
}
query
}
};
logger::debug!(filter = %diesel::debug_query::<diesel::pg::Pg,_>(&query).to_string());
query
.get_results_async::<(
DieselPaymentIntent,
Option<diesel_models::payment_attempt::PaymentAttempt>,
)>(conn)
.await
.change_context(StorageError::DecryptionError)
.async_and_then(|output| async {
try_join_all(output.into_iter().map(
|(pi, pa): (_, Option<diesel_models::payment_attempt::PaymentAttempt>)| async {
let payment_intent = PaymentIntent::convert_back(
state,
pi,
merchant_key_store.key.get_inner(),
merchant_id.to_owned().into(),
);
let payment_attempt = pa
.async_map(|val| {
PaymentAttempt::convert_back(
state,
val,
merchant_key_store.key.get_inner(),
merchant_id.to_owned().into(),
)
})
.map(|val| val.transpose());
let output = futures::try_join!(payment_intent, payment_attempt);
output.change_context(StorageError::DecryptionError)
},
))
.await
})
.await
.change_context(StorageError::DecryptionError)
}
#[cfg(all(feature = "v2", feature = "olap"))]
#[instrument(skip_all)]
async fn get_filtered_active_attempt_ids_for_total_count(
&self,
merchant_id: &common_utils::id_type::MerchantId,
constraints: &PaymentIntentFetchConstraints,
_storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<Option<String>>, StorageError> {
let conn = connection::pg_connection_read(self).await.switch()?;
let conn = async_bb8_diesel::Connection::as_async_conn(&conn);
let mut query = DieselPaymentIntent::table()
.select(pi_dsl::active_attempt_id)
.filter(pi_dsl::merchant_id.eq(merchant_id.to_owned()))
.order(pi_dsl::created_at.desc())
.into_boxed();
query = match constraints {
PaymentIntentFetchConstraints::Single { payment_intent_id } => {
query.filter(pi_dsl::id.eq(payment_intent_id.to_owned()))
}
PaymentIntentFetchConstraints::List(params) => {
if let Some(customer_id) = &params.customer_id {
query = query.filter(pi_dsl::customer_id.eq(customer_id.clone()));
}
if let Some(merchant_order_reference_id) = &params.merchant_order_reference_id {
query = query.filter(
pi_dsl::merchant_reference_id.eq(merchant_order_reference_id.clone()),
)
}
if let Some(profile_id) = &params.profile_id {
query = query.filter(pi_dsl::profile_id.eq(profile_id.clone()));
}
query = match params.starting_at {
Some(starting_at) => query.filter(pi_dsl::created_at.ge(starting_at)),
None => query,
};
query = match params.ending_at {
Some(ending_at) => query.filter(pi_dsl::created_at.le(ending_at)),
None => query,
};
query = match params.amount_filter {
Some(AmountFilter {
start_amount: Some(start),
end_amount: Some(end),
}) => query.filter(pi_dsl::amount.between(start, end)),
Some(AmountFilter {
start_amount: Some(start),
end_amount: None,
}) => query.filter(pi_dsl::amount.ge(start)),
Some(AmountFilter {
start_amount: None,
end_amount: Some(end),
}) => query.filter(pi_dsl::amount.le(end)),
_ => query,
};
query = match &params.currency {
Some(currency) => query.filter(pi_dsl::currency.eq(*currency)),
None => query,
};
query = match &params.status {
Some(status) => query.filter(pi_dsl::status.eq(*status)),
None => query,
};
query
}
};
db_metrics::track_database_call::<<DieselPaymentIntent as HasTable>::Table, _, _>(
query.get_results_async::<Option<String>>(conn),
db_metrics::DatabaseOperation::Filter,
)
.await
.map_err(|er| {
StorageError::DatabaseError(
error_stack::report!(diesel_models::errors::DatabaseError::from(er))
.attach_printable("Error filtering payment records"),
)
.into()
})
}
#[cfg(all(feature = "v1", feature = "olap"))]
#[instrument(skip_all)]
async fn get_filtered_active_attempt_ids_for_total_count(