diff --git a/crates/api_models/src/payments.rs b/crates/api_models/src/payments.rs index 37ae6aaf32..1da8f9e329 100644 --- a/crates/api_models/src/payments.rs +++ b/crates/api_models/src/payments.rs @@ -1976,13 +1976,24 @@ pub struct PaymentListResponse { // The list of payments response objects pub data: Vec, } +#[derive(Clone, Debug, serde::Serialize)] +pub struct PaymentListResponseV2 { + /// The number of payments included in the list for given constraints + pub count: usize, + /// The total number of available payments for given constraints + pub total_count: i64, + /// The list of payments response objects + pub data: Vec, +} #[derive(Clone, Debug, serde::Deserialize)] #[serde(deny_unknown_fields)] pub struct PaymentListFilterConstraints { /// The identifier for payment pub payment_id: Option, - /// The starting point within a list of objects, limit on number of object will be some constant for join query + /// The limit on the number of objects. The max limit is 20 + pub limit: Option, + /// The starting point within a list of objects pub offset: Option, /// The time range for which objects are needed. TimeRange has two fields start_time and end_time from which objects can be filtered as per required scenarios (created_at, time less than, greater than etc). #[serde(flatten)] diff --git a/crates/data_models/src/payments/payment_intent.rs b/crates/data_models/src/payments/payment_intent.rs index a2110900ef..c5e6ed9b0b 100644 --- a/crates/data_models/src/payments/payment_intent.rs +++ b/crates/data_models/src/payments/payment_intent.rs @@ -4,7 +4,6 @@ use serde::{Deserialize, Serialize}; use time::PrimitiveDateTime; use crate::{errors, MerchantStorageScheme}; -const QUERY_LIMIT: u32 = 20; const MAX_LIMIT: u32 = 100; #[async_trait::async_trait] pub trait PaymentIntentInterface { @@ -54,6 +53,14 @@ pub trait PaymentIntentInterface { Vec<(PaymentIntent, super::payment_attempt::PaymentAttempt)>, errors::StorageError, >; + + #[cfg(feature = "olap")] + async fn get_filtered_active_attempt_ids_for_total_count( + &self, + merchant_id: &str, + constraints: &PaymentIntentFetchConstraints, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result, errors::StorageError>; } #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] @@ -420,7 +427,7 @@ impl From for PaymentIntentF customer_id: None, starting_after_id: None, ending_before_id: None, - limit: Some(QUERY_LIMIT), + limit: value.limit, } } } diff --git a/crates/diesel_models/src/payment_attempt.rs b/crates/diesel_models/src/payment_attempt.rs index 07021fee68..826d4c0ee9 100644 --- a/crates/diesel_models/src/payment_attempt.rs +++ b/crates/diesel_models/src/payment_attempt.rs @@ -65,7 +65,6 @@ pub struct PaymentListFilters { pub status: Vec, pub payment_method: Vec, } - #[derive( Clone, Debug, Default, Insertable, router_derive::DebugAsDisplay, Serialize, Deserialize, )] diff --git a/crates/diesel_models/src/query/payment_attempt.rs b/crates/diesel_models/src/query/payment_attempt.rs index 11968fc250..d87b8715f7 100644 --- a/crates/diesel_models/src/query/payment_attempt.rs +++ b/crates/diesel_models/src/query/payment_attempt.rs @@ -1,7 +1,10 @@ use std::collections::HashSet; use async_bb8_diesel::AsyncRunQueryDsl; -use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods, QueryDsl, Table}; +use diesel::{ + associations::HasTable, debug_query, pg::Pg, BoolExpressionMethods, ExpressionMethods, + QueryDsl, Table, +}; use error_stack::{IntoReport, ResultExt}; use router_env::{instrument, tracing}; @@ -14,6 +17,7 @@ use crate::{ PaymentListFilters, }, payment_intent::PaymentIntent, + query::generics::db_metrics, schema::payment_attempt::dsl, PgPooledConn, StorageResult, }; @@ -209,14 +213,14 @@ impl PaymentAttempt { pi: &[PaymentIntent], merchant_id: &str, ) -> StorageResult { - let active_attempts: Vec = pi + let active_attempt_ids: Vec = pi .iter() .map(|payment_intent| payment_intent.clone().active_attempt_id) .collect(); let filter = ::table() .filter(dsl::merchant_id.eq(merchant_id.to_owned())) - .filter(dsl::attempt_id.eq_any(active_attempts)); + .filter(dsl::attempt_id.eq_any(active_attempt_ids)); let intent_status: Vec = pi .iter() @@ -273,4 +277,35 @@ impl PaymentAttempt { Ok(filters) } + pub async fn get_total_count_of_attempts( + conn: &PgPooledConn, + merchant_id: &str, + active_attempt_ids: &[String], + connector: Option>, + payment_method: Option>, + ) -> StorageResult { + let mut filter = ::table() + .count() + .filter(dsl::merchant_id.eq(merchant_id.to_owned())) + .filter(dsl::attempt_id.eq_any(active_attempt_ids.to_owned())) + .into_boxed(); + + if let Some(connector) = connector.clone() { + filter = filter.filter(dsl::connector.eq_any(connector)); + } + + if let Some(payment_method) = payment_method.clone() { + filter = filter.filter(dsl::payment_method.eq_any(payment_method)); + } + router_env::logger::debug!(query = %debug_query::(&filter).to_string()); + + db_metrics::track_database_call::<::Table, _, _>( + filter.get_result_async::(conn), + db_metrics::DatabaseOperation::Filter, + ) + .await + .into_report() + .change_context(errors::DatabaseError::Others) + .attach_printable("Error filtering count of payments") + } } diff --git a/crates/router/src/core/payments.rs b/crates/router/src/core/payments.rs index 2fc8c76a4d..f047715d81 100644 --- a/crates/router/src/core/payments.rs +++ b/crates/router/src/core/payments.rs @@ -42,6 +42,8 @@ use crate::{ utils::{add_connector_http_status_code_metrics, Encode, OptionExt, ValueExt}, }; +#[cfg(feature = "olap")] +const PAYMENTS_LIST_MAX_LIMIT: u32 = 20; #[instrument(skip_all, fields(payment_id, merchant_id))] pub async fn payments_operation_core( state: &AppState, @@ -1318,11 +1320,14 @@ pub async fn apply_filters_on_payments( db: &dyn StorageInterface, merchant: domain::MerchantAccount, constraints: api::PaymentListFilterConstraints, -) -> RouterResponse { +) -> RouterResponse { use storage_impl::DataModelExt; use crate::types::transformers::ForeignFrom; + let limit = &constraints.limit.unwrap_or(PAYMENTS_LIST_MAX_LIMIT); + + helpers::validate_payment_list_request_for_joins(*limit, PAYMENTS_LIST_MAX_LIMIT)?; let list: Vec<(storage::PaymentIntent, storage::PaymentAttempt)> = db .get_filtered_payment_intents_attempt( &merchant.merchant_id, @@ -1338,9 +1343,30 @@ pub async fn apply_filters_on_payments( let data: Vec = list.into_iter().map(ForeignFrom::foreign_from).collect(); + let active_attempt_ids = db + .get_filtered_active_attempt_ids_for_total_count( + &merchant.merchant_id, + &constraints.clone().into(), + merchant.storage_scheme, + ) + .await + .to_not_found_response(errors::ApiErrorResponse::InternalServerError)?; + + let total_count = db + .get_total_count_of_filtered_payment_attempts( + &merchant.merchant_id, + &active_attempt_ids, + constraints.connector, + constraints.payment_methods, + merchant.storage_scheme, + ) + .await + .change_context(errors::ApiErrorResponse::InternalServerError)?; + Ok(services::ApplicationResponse::Json( - api::PaymentListResponse { - size: data.len(), + api::PaymentListResponseV2 { + count: data.len(), + total_count, data, }, )) diff --git a/crates/router/src/core/payments/helpers.rs b/crates/router/src/core/payments/helpers.rs index 2e462e4e38..efea54d0af 100644 --- a/crates/router/src/core/payments/helpers.rs +++ b/crates/router/src/core/payments/helpers.rs @@ -1691,6 +1691,18 @@ pub(super) fn validate_payment_list_request( })?; Ok(()) } +#[cfg(feature = "olap")] +pub(super) fn validate_payment_list_request_for_joins( + limit: u32, + max_limit: u32, +) -> CustomResult<(), errors::ApiErrorResponse> { + utils::when(limit > max_limit || limit < 1, || { + Err(errors::ApiErrorResponse::InvalidRequestData { + message: format!("limit should be in between 1 and {}", max_limit), + }) + })?; + Ok(()) +} pub fn get_handle_response_url( payment_id: String, diff --git a/crates/router/src/db/payment_attempt.rs b/crates/router/src/db/payment_attempt.rs index 6fe4250504..d8b1e20db2 100644 --- a/crates/router/src/db/payment_attempt.rs +++ b/crates/router/src/db/payment_attempt.rs @@ -1,3 +1,5 @@ +use api_models::enums::{Connector, PaymentMethod}; + use super::MockDb; use crate::{ core::errors::{self, CustomResult}, @@ -76,10 +78,20 @@ pub trait PaymentAttemptInterface { merchant_id: &str, storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; + + async fn get_total_count_of_filtered_payment_attempts( + &self, + merchant_id: &str, + active_attempt_ids: &[String], + connector: Option>, + payment_methods: Option>, + storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult; } #[cfg(not(feature = "kv_store"))] mod storage { + use api_models::enums::{Connector, PaymentMethod}; use error_stack::IntoReport; use storage_impl::DataModelExt; @@ -211,6 +223,37 @@ mod storage { .into_report() } + async fn get_total_count_of_filtered_payment_attempts( + &self, + merchant_id: &str, + active_attempt_ids: &[String], + connector: Option>, + payment_methods: Option>, + _storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + let connector_strings = if let Some(connector_vec) = &connector { + Some( + connector_vec + .iter() + .map(|c| c.to_string()) + .collect::>(), + ) + } else { + None + }; + PaymentAttempt::get_total_count_of_attempts( + &conn, + merchant_id, + active_attempt_ids, + connector_strings, + payment_methods, + ) + .await + .map_err(Into::into) + .into_report() + } + async fn find_payment_attempt_by_preprocessing_id_merchant_id( &self, preprocessing_id: &str, @@ -281,6 +324,17 @@ impl PaymentAttemptInterface for MockDb { Err(errors::StorageError::MockDbError)? } + async fn get_total_count_of_filtered_payment_attempts( + &self, + _merchant_id: &str, + _active_attempt_ids: &[String], + _connector: Option>, + _payment_methods: Option>, + _storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + Err(errors::StorageError::MockDbError)? + } + async fn find_payment_attempt_by_attempt_id_merchant_id( &self, _attempt_id: &str, @@ -920,6 +974,35 @@ mod storage { .map_err(Into::into) .into_report() } + + async fn get_total_count_of_filtered_payment_attempts( + &self, + merchant_id: &str, + active_attempt_ids: &[String], + connector: Option>, + payment_methods: Option>, + _storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + + let connector_strings = connector.as_ref().map(|connector_vec| { + connector_vec + .iter() + .map(|c| c.to_string()) + .collect::>() + }); + + PaymentAttempt::get_total_count_of_attempts( + &conn, + merchant_id, + active_attempt_ids, + connector_strings, + payment_methods, + ) + .await + .map_err(Into::into) + .into_report() + } } #[inline] diff --git a/crates/router/src/db/payment_intent.rs b/crates/router/src/db/payment_intent.rs index 33f561ed24..88f428bdf4 100644 --- a/crates/router/src/db/payment_intent.rs +++ b/crates/router/src/db/payment_intent.rs @@ -48,6 +48,17 @@ impl PaymentIntentInterface for MockDb { Err(errors::DataStorageError::MockDbError)? } + #[cfg(feature = "olap")] + async fn get_filtered_active_attempt_ids_for_total_count( + &self, + _merchant_id: &str, + _constraints: &PaymentIntentFetchConstraints, + _storage_scheme: enums::MerchantStorageScheme, + ) -> error_stack::Result, errors::DataStorageError> { + // [#172]: Implement function for `MockDb` + Err(errors::DataStorageError::MockDbError)? + } + #[allow(clippy::panic)] async fn insert_payment_intent( &self, diff --git a/crates/router/src/types/api/payments.rs b/crates/router/src/types/api/payments.rs index def6c06e21..b3448cca2d 100644 --- a/crates/router/src/types/api/payments.rs +++ b/crates/router/src/types/api/payments.rs @@ -3,12 +3,13 @@ pub use api_models::payments::{ CryptoData, CustomerAcceptance, MandateData, MandateTransactionType, MandateType, MandateValidationFields, NextActionType, OnlineMandate, PayLaterData, PaymentIdType, PaymentListConstraints, PaymentListFilterConstraints, PaymentListFilters, PaymentListResponse, - PaymentMethodData, PaymentMethodDataResponse, PaymentOp, PaymentRetrieveBody, - PaymentRetrieveBodyWithCredentials, PaymentsCancelRequest, PaymentsCaptureRequest, - PaymentsRedirectRequest, PaymentsRedirectionResponse, PaymentsRequest, PaymentsResponse, - PaymentsResponseForm, PaymentsRetrieveRequest, PaymentsSessionRequest, PaymentsSessionResponse, - PaymentsStartRequest, PgRedirectResponse, PhoneDetails, RedirectionResponse, SessionToken, - TimeRange, UrlDetails, VerifyRequest, VerifyResponse, WalletData, + PaymentListResponseV2, PaymentMethodData, PaymentMethodDataResponse, PaymentOp, + PaymentRetrieveBody, PaymentRetrieveBodyWithCredentials, PaymentsCancelRequest, + PaymentsCaptureRequest, PaymentsRedirectRequest, PaymentsRedirectionResponse, PaymentsRequest, + PaymentsResponse, PaymentsResponseForm, PaymentsRetrieveRequest, PaymentsSessionRequest, + PaymentsSessionResponse, PaymentsStartRequest, PgRedirectResponse, PhoneDetails, + RedirectionResponse, SessionToken, TimeRange, UrlDetails, VerifyRequest, VerifyResponse, + WalletData, }; use error_stack::{IntoReport, ResultExt}; use masking::PeekInterface; diff --git a/crates/storage_impl/src/payments/payment_intent.rs b/crates/storage_impl/src/payments/payment_intent.rs index 9b5ce8ee58..21946fa619 100644 --- a/crates/storage_impl/src/payments/payment_intent.rs +++ b/crates/storage_impl/src/payments/payment_intent.rs @@ -262,6 +262,28 @@ impl PaymentIntentInterface for KVRouterStore { MerchantStorageScheme::RedisKv => Err(StorageError::KVError.into()), } } + + #[cfg(feature = "olap")] + async fn get_filtered_active_attempt_ids_for_total_count( + &self, + merchant_id: &str, + constraints: &PaymentIntentFetchConstraints, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result, StorageError> { + match storage_scheme { + MerchantStorageScheme::PostgresOnly => { + self.router_store + .get_filtered_active_attempt_ids_for_total_count( + merchant_id, + constraints, + storage_scheme, + ) + .await + } + + MerchantStorageScheme::RedisKv => Err(StorageError::KVError.into()), + } + } } #[async_trait::async_trait] @@ -454,6 +476,7 @@ impl PaymentIntentInterface for crate::RouterStore { .filter(pi_dsl::merchant_id.eq(merchant_id.to_owned())) .order(pi_dsl::created_at.desc()) .into_boxed(); + query = match constraints { PaymentIntentFetchConstraints::Single { payment_intent_id } => { query.filter(pi_dsl::payment_id.eq(payment_intent_id.to_owned())) @@ -568,6 +591,74 @@ impl PaymentIntentInterface for crate::RouterStore { }) .attach_printable("Error filtering payment records") } + + #[cfg(feature = "olap")] + async fn get_filtered_active_attempt_ids_for_total_count( + &self, + merchant_id: &str, + constraints: &PaymentIntentFetchConstraints, + _storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result, StorageError> { + let conn = self.get_replica_pool(); + + let mut query = DieselPaymentIntent::table() + .select(pi_dsl::active_attempt_id) + .filter(pi_dsl::merchant_id.eq(merchant_id.to_owned())) + .order(pi_dsl::created_at.desc()) + .into_boxed(); + + query = match constraints { + PaymentIntentFetchConstraints::Single { payment_intent_id } => { + query.filter(pi_dsl::payment_id.eq(payment_intent_id.to_owned())) + } + PaymentIntentFetchConstraints::List { + starting_at, + ending_at, + currency, + status, + customer_id, + .. + } => { + if let Some(customer_id) = customer_id { + query = query.filter(pi_dsl::customer_id.eq(customer_id.clone())); + } + + query = match starting_at { + Some(starting_at) => query.filter(pi_dsl::created_at.ge(*starting_at)), + None => query, + }; + + query = match ending_at { + Some(ending_at) => query.filter(pi_dsl::created_at.le(*ending_at)), + None => query, + }; + + query = match currency { + Some(currency) => query.filter(pi_dsl::currency.eq_any(currency.clone())), + None => query, + }; + + query = match status { + Some(status) => query.filter(pi_dsl::status.eq_any(status.clone())), + None => query, + }; + + query + } + }; + + db_metrics::track_database_call::<::Table, _, _>( + query.get_results_async::(conn), + db_metrics::DatabaseOperation::Filter, + ) + .await + .into_report() + .map_err(|er| { + let new_err = StorageError::DatabaseError(format!("{er:?}")); + er.change_context(new_err) + }) + .attach_printable_lazy(|| "Error filtering records by predicate") + } } impl DataModelExt for PaymentIntentNew {