feat(router): add total count for payments list (#1912)

This commit is contained in:
Apoorv Dixit
2023-08-29 16:48:35 +05:30
committed by GitHub
parent 784702d9c5
commit 7a5c8413cf
10 changed files with 292 additions and 16 deletions

View File

@ -1976,13 +1976,24 @@ pub struct PaymentListResponse {
// The list of payments response objects // The list of payments response objects
pub data: Vec<PaymentsResponse>, pub data: Vec<PaymentsResponse>,
} }
#[derive(Clone, Debug, serde::Serialize)]
pub struct PaymentListResponseV2 {
/// The number of payments included in the list for given constraints
pub count: usize,
/// The total number of available payments for given constraints
pub total_count: i64,
/// The list of payments response objects
pub data: Vec<PaymentsResponse>,
}
#[derive(Clone, Debug, serde::Deserialize)] #[derive(Clone, Debug, serde::Deserialize)]
#[serde(deny_unknown_fields)] #[serde(deny_unknown_fields)]
pub struct PaymentListFilterConstraints { pub struct PaymentListFilterConstraints {
/// The identifier for payment /// The identifier for payment
pub payment_id: Option<String>, pub payment_id: Option<String>,
/// The starting point within a list of objects, limit on number of object will be some constant for join query /// The limit on the number of objects. The max limit is 20
pub limit: Option<u32>,
/// The starting point within a list of objects
pub offset: Option<u32>, pub offset: Option<u32>,
/// The time range for which objects are needed. TimeRange has two fields start_time and end_time from which objects can be filtered as per required scenarios (created_at, time less than, greater than etc). /// The time range for which objects are needed. TimeRange has two fields start_time and end_time from which objects can be filtered as per required scenarios (created_at, time less than, greater than etc).
#[serde(flatten)] #[serde(flatten)]

View File

@ -4,7 +4,6 @@ use serde::{Deserialize, Serialize};
use time::PrimitiveDateTime; use time::PrimitiveDateTime;
use crate::{errors, MerchantStorageScheme}; use crate::{errors, MerchantStorageScheme};
const QUERY_LIMIT: u32 = 20;
const MAX_LIMIT: u32 = 100; const MAX_LIMIT: u32 = 100;
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait PaymentIntentInterface { pub trait PaymentIntentInterface {
@ -54,6 +53,14 @@ pub trait PaymentIntentInterface {
Vec<(PaymentIntent, super::payment_attempt::PaymentAttempt)>, Vec<(PaymentIntent, super::payment_attempt::PaymentAttempt)>,
errors::StorageError, errors::StorageError,
>; >;
#[cfg(feature = "olap")]
async fn get_filtered_active_attempt_ids_for_total_count(
&self,
merchant_id: &str,
constraints: &PaymentIntentFetchConstraints,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<String>, errors::StorageError>;
} }
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
@ -420,7 +427,7 @@ impl From<api_models::payments::PaymentListFilterConstraints> for PaymentIntentF
customer_id: None, customer_id: None,
starting_after_id: None, starting_after_id: None,
ending_before_id: None, ending_before_id: None,
limit: Some(QUERY_LIMIT), limit: value.limit,
} }
} }
} }

View File

@ -65,7 +65,6 @@ pub struct PaymentListFilters {
pub status: Vec<storage_enums::IntentStatus>, pub status: Vec<storage_enums::IntentStatus>,
pub payment_method: Vec<storage_enums::PaymentMethod>, pub payment_method: Vec<storage_enums::PaymentMethod>,
} }
#[derive( #[derive(
Clone, Debug, Default, Insertable, router_derive::DebugAsDisplay, Serialize, Deserialize, Clone, Debug, Default, Insertable, router_derive::DebugAsDisplay, Serialize, Deserialize,
)] )]

View File

@ -1,7 +1,10 @@
use std::collections::HashSet; use std::collections::HashSet;
use async_bb8_diesel::AsyncRunQueryDsl; use async_bb8_diesel::AsyncRunQueryDsl;
use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods, QueryDsl, Table}; use diesel::{
associations::HasTable, debug_query, pg::Pg, BoolExpressionMethods, ExpressionMethods,
QueryDsl, Table,
};
use error_stack::{IntoReport, ResultExt}; use error_stack::{IntoReport, ResultExt};
use router_env::{instrument, tracing}; use router_env::{instrument, tracing};
@ -14,6 +17,7 @@ use crate::{
PaymentListFilters, PaymentListFilters,
}, },
payment_intent::PaymentIntent, payment_intent::PaymentIntent,
query::generics::db_metrics,
schema::payment_attempt::dsl, schema::payment_attempt::dsl,
PgPooledConn, StorageResult, PgPooledConn, StorageResult,
}; };
@ -209,14 +213,14 @@ impl PaymentAttempt {
pi: &[PaymentIntent], pi: &[PaymentIntent],
merchant_id: &str, merchant_id: &str,
) -> StorageResult<PaymentListFilters> { ) -> StorageResult<PaymentListFilters> {
let active_attempts: Vec<String> = pi let active_attempt_ids: Vec<String> = pi
.iter() .iter()
.map(|payment_intent| payment_intent.clone().active_attempt_id) .map(|payment_intent| payment_intent.clone().active_attempt_id)
.collect(); .collect();
let filter = <Self as HasTable>::table() let filter = <Self as HasTable>::table()
.filter(dsl::merchant_id.eq(merchant_id.to_owned())) .filter(dsl::merchant_id.eq(merchant_id.to_owned()))
.filter(dsl::attempt_id.eq_any(active_attempts)); .filter(dsl::attempt_id.eq_any(active_attempt_ids));
let intent_status: Vec<IntentStatus> = pi let intent_status: Vec<IntentStatus> = pi
.iter() .iter()
@ -273,4 +277,35 @@ impl PaymentAttempt {
Ok(filters) Ok(filters)
} }
pub async fn get_total_count_of_attempts(
conn: &PgPooledConn,
merchant_id: &str,
active_attempt_ids: &[String],
connector: Option<Vec<String>>,
payment_method: Option<Vec<enums::PaymentMethod>>,
) -> StorageResult<i64> {
let mut filter = <Self as HasTable>::table()
.count()
.filter(dsl::merchant_id.eq(merchant_id.to_owned()))
.filter(dsl::attempt_id.eq_any(active_attempt_ids.to_owned()))
.into_boxed();
if let Some(connector) = connector.clone() {
filter = filter.filter(dsl::connector.eq_any(connector));
}
if let Some(payment_method) = payment_method.clone() {
filter = filter.filter(dsl::payment_method.eq_any(payment_method));
}
router_env::logger::debug!(query = %debug_query::<Pg, _>(&filter).to_string());
db_metrics::track_database_call::<<Self as HasTable>::Table, _, _>(
filter.get_result_async::<i64>(conn),
db_metrics::DatabaseOperation::Filter,
)
.await
.into_report()
.change_context(errors::DatabaseError::Others)
.attach_printable("Error filtering count of payments")
}
} }

View File

@ -42,6 +42,8 @@ use crate::{
utils::{add_connector_http_status_code_metrics, Encode, OptionExt, ValueExt}, utils::{add_connector_http_status_code_metrics, Encode, OptionExt, ValueExt},
}; };
#[cfg(feature = "olap")]
const PAYMENTS_LIST_MAX_LIMIT: u32 = 20;
#[instrument(skip_all, fields(payment_id, merchant_id))] #[instrument(skip_all, fields(payment_id, merchant_id))]
pub async fn payments_operation_core<F, Req, Op, FData>( pub async fn payments_operation_core<F, Req, Op, FData>(
state: &AppState, state: &AppState,
@ -1318,11 +1320,14 @@ pub async fn apply_filters_on_payments(
db: &dyn StorageInterface, db: &dyn StorageInterface,
merchant: domain::MerchantAccount, merchant: domain::MerchantAccount,
constraints: api::PaymentListFilterConstraints, constraints: api::PaymentListFilterConstraints,
) -> RouterResponse<api::PaymentListResponse> { ) -> RouterResponse<api::PaymentListResponseV2> {
use storage_impl::DataModelExt; use storage_impl::DataModelExt;
use crate::types::transformers::ForeignFrom; use crate::types::transformers::ForeignFrom;
let limit = &constraints.limit.unwrap_or(PAYMENTS_LIST_MAX_LIMIT);
helpers::validate_payment_list_request_for_joins(*limit, PAYMENTS_LIST_MAX_LIMIT)?;
let list: Vec<(storage::PaymentIntent, storage::PaymentAttempt)> = db let list: Vec<(storage::PaymentIntent, storage::PaymentAttempt)> = db
.get_filtered_payment_intents_attempt( .get_filtered_payment_intents_attempt(
&merchant.merchant_id, &merchant.merchant_id,
@ -1338,9 +1343,30 @@ pub async fn apply_filters_on_payments(
let data: Vec<api::PaymentsResponse> = let data: Vec<api::PaymentsResponse> =
list.into_iter().map(ForeignFrom::foreign_from).collect(); list.into_iter().map(ForeignFrom::foreign_from).collect();
let active_attempt_ids = db
.get_filtered_active_attempt_ids_for_total_count(
&merchant.merchant_id,
&constraints.clone().into(),
merchant.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::InternalServerError)?;
let total_count = db
.get_total_count_of_filtered_payment_attempts(
&merchant.merchant_id,
&active_attempt_ids,
constraints.connector,
constraints.payment_methods,
merchant.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)?;
Ok(services::ApplicationResponse::Json( Ok(services::ApplicationResponse::Json(
api::PaymentListResponse { api::PaymentListResponseV2 {
size: data.len(), count: data.len(),
total_count,
data, data,
}, },
)) ))

View File

@ -1691,6 +1691,18 @@ pub(super) fn validate_payment_list_request(
})?; })?;
Ok(()) Ok(())
} }
#[cfg(feature = "olap")]
pub(super) fn validate_payment_list_request_for_joins(
limit: u32,
max_limit: u32,
) -> CustomResult<(), errors::ApiErrorResponse> {
utils::when(limit > max_limit || limit < 1, || {
Err(errors::ApiErrorResponse::InvalidRequestData {
message: format!("limit should be in between 1 and {}", max_limit),
})
})?;
Ok(())
}
pub fn get_handle_response_url( pub fn get_handle_response_url(
payment_id: String, payment_id: String,

View File

@ -1,3 +1,5 @@
use api_models::enums::{Connector, PaymentMethod};
use super::MockDb; use super::MockDb;
use crate::{ use crate::{
core::errors::{self, CustomResult}, core::errors::{self, CustomResult},
@ -76,10 +78,20 @@ pub trait PaymentAttemptInterface {
merchant_id: &str, merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme, storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<diesel_models::payment_attempt::PaymentListFilters, errors::StorageError>; ) -> CustomResult<diesel_models::payment_attempt::PaymentListFilters, errors::StorageError>;
async fn get_total_count_of_filtered_payment_attempts(
&self,
merchant_id: &str,
active_attempt_ids: &[String],
connector: Option<Vec<Connector>>,
payment_methods: Option<Vec<PaymentMethod>>,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<i64, errors::StorageError>;
} }
#[cfg(not(feature = "kv_store"))] #[cfg(not(feature = "kv_store"))]
mod storage { mod storage {
use api_models::enums::{Connector, PaymentMethod};
use error_stack::IntoReport; use error_stack::IntoReport;
use storage_impl::DataModelExt; use storage_impl::DataModelExt;
@ -211,6 +223,37 @@ mod storage {
.into_report() .into_report()
} }
async fn get_total_count_of_filtered_payment_attempts(
&self,
merchant_id: &str,
active_attempt_ids: &[String],
connector: Option<Vec<Connector>>,
payment_methods: Option<Vec<PaymentMethod>>,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<i64, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
let connector_strings = if let Some(connector_vec) = &connector {
Some(
connector_vec
.iter()
.map(|c| c.to_string())
.collect::<Vec<String>>(),
)
} else {
None
};
PaymentAttempt::get_total_count_of_attempts(
&conn,
merchant_id,
active_attempt_ids,
connector_strings,
payment_methods,
)
.await
.map_err(Into::into)
.into_report()
}
async fn find_payment_attempt_by_preprocessing_id_merchant_id( async fn find_payment_attempt_by_preprocessing_id_merchant_id(
&self, &self,
preprocessing_id: &str, preprocessing_id: &str,
@ -281,6 +324,17 @@ impl PaymentAttemptInterface for MockDb {
Err(errors::StorageError::MockDbError)? Err(errors::StorageError::MockDbError)?
} }
async fn get_total_count_of_filtered_payment_attempts(
&self,
_merchant_id: &str,
_active_attempt_ids: &[String],
_connector: Option<Vec<Connector>>,
_payment_methods: Option<Vec<PaymentMethod>>,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<i64, errors::StorageError> {
Err(errors::StorageError::MockDbError)?
}
async fn find_payment_attempt_by_attempt_id_merchant_id( async fn find_payment_attempt_by_attempt_id_merchant_id(
&self, &self,
_attempt_id: &str, _attempt_id: &str,
@ -920,6 +974,35 @@ mod storage {
.map_err(Into::into) .map_err(Into::into)
.into_report() .into_report()
} }
async fn get_total_count_of_filtered_payment_attempts(
&self,
merchant_id: &str,
active_attempt_ids: &[String],
connector: Option<Vec<api_models::enums::Connector>>,
payment_methods: Option<Vec<api_models::enums::PaymentMethod>>,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<i64, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
let connector_strings = connector.as_ref().map(|connector_vec| {
connector_vec
.iter()
.map(|c| c.to_string())
.collect::<Vec<String>>()
});
PaymentAttempt::get_total_count_of_attempts(
&conn,
merchant_id,
active_attempt_ids,
connector_strings,
payment_methods,
)
.await
.map_err(Into::into)
.into_report()
}
} }
#[inline] #[inline]

View File

@ -48,6 +48,17 @@ impl PaymentIntentInterface for MockDb {
Err(errors::DataStorageError::MockDbError)? Err(errors::DataStorageError::MockDbError)?
} }
#[cfg(feature = "olap")]
async fn get_filtered_active_attempt_ids_for_total_count(
&self,
_merchant_id: &str,
_constraints: &PaymentIntentFetchConstraints,
_storage_scheme: enums::MerchantStorageScheme,
) -> error_stack::Result<Vec<String>, errors::DataStorageError> {
// [#172]: Implement function for `MockDb`
Err(errors::DataStorageError::MockDbError)?
}
#[allow(clippy::panic)] #[allow(clippy::panic)]
async fn insert_payment_intent( async fn insert_payment_intent(
&self, &self,

View File

@ -3,12 +3,13 @@ pub use api_models::payments::{
CryptoData, CustomerAcceptance, MandateData, MandateTransactionType, MandateType, CryptoData, CustomerAcceptance, MandateData, MandateTransactionType, MandateType,
MandateValidationFields, NextActionType, OnlineMandate, PayLaterData, PaymentIdType, MandateValidationFields, NextActionType, OnlineMandate, PayLaterData, PaymentIdType,
PaymentListConstraints, PaymentListFilterConstraints, PaymentListFilters, PaymentListResponse, PaymentListConstraints, PaymentListFilterConstraints, PaymentListFilters, PaymentListResponse,
PaymentMethodData, PaymentMethodDataResponse, PaymentOp, PaymentRetrieveBody, PaymentListResponseV2, PaymentMethodData, PaymentMethodDataResponse, PaymentOp,
PaymentRetrieveBodyWithCredentials, PaymentsCancelRequest, PaymentsCaptureRequest, PaymentRetrieveBody, PaymentRetrieveBodyWithCredentials, PaymentsCancelRequest,
PaymentsRedirectRequest, PaymentsRedirectionResponse, PaymentsRequest, PaymentsResponse, PaymentsCaptureRequest, PaymentsRedirectRequest, PaymentsRedirectionResponse, PaymentsRequest,
PaymentsResponseForm, PaymentsRetrieveRequest, PaymentsSessionRequest, PaymentsSessionResponse, PaymentsResponse, PaymentsResponseForm, PaymentsRetrieveRequest, PaymentsSessionRequest,
PaymentsStartRequest, PgRedirectResponse, PhoneDetails, RedirectionResponse, SessionToken, PaymentsSessionResponse, PaymentsStartRequest, PgRedirectResponse, PhoneDetails,
TimeRange, UrlDetails, VerifyRequest, VerifyResponse, WalletData, RedirectionResponse, SessionToken, TimeRange, UrlDetails, VerifyRequest, VerifyResponse,
WalletData,
}; };
use error_stack::{IntoReport, ResultExt}; use error_stack::{IntoReport, ResultExt};
use masking::PeekInterface; use masking::PeekInterface;

View File

@ -262,6 +262,28 @@ impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
MerchantStorageScheme::RedisKv => Err(StorageError::KVError.into()), MerchantStorageScheme::RedisKv => Err(StorageError::KVError.into()),
} }
} }
#[cfg(feature = "olap")]
async fn get_filtered_active_attempt_ids_for_total_count(
&self,
merchant_id: &str,
constraints: &PaymentIntentFetchConstraints,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<String>, StorageError> {
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store
.get_filtered_active_attempt_ids_for_total_count(
merchant_id,
constraints,
storage_scheme,
)
.await
}
MerchantStorageScheme::RedisKv => Err(StorageError::KVError.into()),
}
}
} }
#[async_trait::async_trait] #[async_trait::async_trait]
@ -454,6 +476,7 @@ impl<T: DatabaseStore> PaymentIntentInterface for crate::RouterStore<T> {
.filter(pi_dsl::merchant_id.eq(merchant_id.to_owned())) .filter(pi_dsl::merchant_id.eq(merchant_id.to_owned()))
.order(pi_dsl::created_at.desc()) .order(pi_dsl::created_at.desc())
.into_boxed(); .into_boxed();
query = match constraints { query = match constraints {
PaymentIntentFetchConstraints::Single { payment_intent_id } => { PaymentIntentFetchConstraints::Single { payment_intent_id } => {
query.filter(pi_dsl::payment_id.eq(payment_intent_id.to_owned())) query.filter(pi_dsl::payment_id.eq(payment_intent_id.to_owned()))
@ -568,6 +591,74 @@ impl<T: DatabaseStore> PaymentIntentInterface for crate::RouterStore<T> {
}) })
.attach_printable("Error filtering payment records") .attach_printable("Error filtering payment records")
} }
#[cfg(feature = "olap")]
async fn get_filtered_active_attempt_ids_for_total_count(
&self,
merchant_id: &str,
constraints: &PaymentIntentFetchConstraints,
_storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<String>, StorageError> {
let conn = self.get_replica_pool();
let mut query = DieselPaymentIntent::table()
.select(pi_dsl::active_attempt_id)
.filter(pi_dsl::merchant_id.eq(merchant_id.to_owned()))
.order(pi_dsl::created_at.desc())
.into_boxed();
query = match constraints {
PaymentIntentFetchConstraints::Single { payment_intent_id } => {
query.filter(pi_dsl::payment_id.eq(payment_intent_id.to_owned()))
}
PaymentIntentFetchConstraints::List {
starting_at,
ending_at,
currency,
status,
customer_id,
..
} => {
if let Some(customer_id) = customer_id {
query = query.filter(pi_dsl::customer_id.eq(customer_id.clone()));
}
query = match starting_at {
Some(starting_at) => query.filter(pi_dsl::created_at.ge(*starting_at)),
None => query,
};
query = match ending_at {
Some(ending_at) => query.filter(pi_dsl::created_at.le(*ending_at)),
None => query,
};
query = match currency {
Some(currency) => query.filter(pi_dsl::currency.eq_any(currency.clone())),
None => query,
};
query = match status {
Some(status) => query.filter(pi_dsl::status.eq_any(status.clone())),
None => query,
};
query
}
};
db_metrics::track_database_call::<<DieselPaymentIntent as HasTable>::Table, _, _>(
query.get_results_async::<String>(conn),
db_metrics::DatabaseOperation::Filter,
)
.await
.into_report()
.map_err(|er| {
let new_err = StorageError::DatabaseError(format!("{er:?}"));
er.change_context(new_err)
})
.attach_printable_lazy(|| "Error filtering records by predicate")
}
} }
impl DataModelExt for PaymentIntentNew { impl DataModelExt for PaymentIntentNew {