feat(router): apply filters on payments (#1744)

This commit is contained in:
Apoorv Dixit
2023-07-26 18:53:36 +05:30
committed by GitHub
parent c284f41cc6
commit 04c3de73a5
9 changed files with 254 additions and 63 deletions

View File

@ -1723,25 +1723,38 @@ pub struct PaymentListResponse {
pub data: Vec<PaymentsResponse>,
}
#[derive(Clone, Debug, serde::Serialize, ToSchema)]
#[derive(Clone, Debug, serde::Deserialize)]
#[serde(deny_unknown_fields)]
pub struct PaymentListFilterConstraints {
/// The identifier for payment
pub payment_id: Option<String>,
/// The starting point within a list of objects, limit on number of object will be some constant for join query
pub offset: Option<i64>,
/// The time range for which objects are needed. TimeRange has two fields start_time and end_time from which objects can be filtered as per required scenarios (created_at, time less than, greater than etc).
#[serde(flatten)]
pub time_range: Option<TimeRange>,
/// The list of connectors to filter payments list
pub connector: Option<Vec<String>>,
/// The list of currencies to filter payments list
pub currency: Option<Vec<enums::Currency>>,
/// The list of payment statuses to filter payments list
pub status: Option<Vec<enums::IntentStatus>>,
/// The list of payment methods to filter payments list
pub payment_methods: Option<Vec<enums::PaymentMethod>>,
}
#[derive(Clone, Debug, serde::Serialize)]
pub struct PaymentListFilters {
/// The list of available connector filters
#[schema(value_type = Vec<api_enums::Connector>)]
pub connector: Vec<String>,
/// The list of available currency filters
#[schema(value_type = Vec<Currency>)]
pub currency: Vec<enums::Currency>,
/// The list of available payment status filters
#[schema(value_type = Vec<IntentStatus>)]
pub status: Vec<enums::IntentStatus>,
/// The list of available payment method filters
#[schema(value_type = Vec<PaymentMethod>)]
pub payment_method: Vec<enums::PaymentMethod>,
}
#[derive(
Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, Hash, ToSchema,
)]
#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, Hash)]
pub struct TimeRange {
/// The start time to filter payments list or to get list of filters. To get list of filters start time is needed to be passed
#[serde(with = "common_utils::custom_serde::iso8601")]

View File

@ -135,6 +135,7 @@ pub struct RefundListRequest {
/// The starting point within a list of objects
pub offset: Option<i64>,
/// The time range for which objects are needed. TimeRange has two fields start_time and end_time from which objects can be filtered as per required scenarios (created_at, time less than, greater than etc).
#[serde(flatten)]
pub time_range: Option<TimeRange>,
/// The list of connectors to filter refunds list
pub connector: Option<Vec<String>>,

View File

@ -1261,6 +1261,33 @@ pub async fn list_payments(
},
))
}
#[cfg(feature = "olap")]
pub async fn apply_filters_on_payments(
db: &dyn StorageInterface,
merchant: domain::MerchantAccount,
constraints: api::PaymentListFilterConstraints,
) -> RouterResponse<api::PaymentListResponse> {
use crate::types::transformers::ForeignFrom;
let list: Vec<(storage::PaymentIntent, storage::PaymentAttempt)> = db
.apply_filters_on_payments_list(
&merchant.merchant_id,
&constraints,
merchant.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?;
let data: Vec<api::PaymentsResponse> =
list.into_iter().map(ForeignFrom::foreign_from).collect();
Ok(services::ApplicationResponse::Json(
api::PaymentListResponse {
size: data.len(),
data,
},
))
}
#[cfg(feature = "olap")]
pub async fn get_filters_for_payments(

View File

@ -43,6 +43,14 @@ pub trait PaymentIntentInterface {
time_range: &api::TimeRange,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<types::PaymentIntent>, errors::StorageError>;
#[cfg(feature = "olap")]
async fn apply_filters_on_payments_list(
&self,
merchant_id: &str,
constraints: &api::PaymentListFilterConstraints,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<(types::PaymentIntent, types::PaymentAttempt)>, errors::StorageError>;
}
#[cfg(feature = "kv_store")]
@ -267,6 +275,26 @@ mod storage {
enums::MerchantStorageScheme::RedisKv => Err(errors::StorageError::KVError.into()),
}
}
#[cfg(feature = "olap")]
async fn apply_filters_on_payments_list(
&self,
merchant_id: &str,
constraints: &api::PaymentListFilterConstraints,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<(PaymentIntent, PaymentAttempt)>, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = connection::pg_connection_read(self).await?;
PaymentIntent::apply_filters_on_payments(&conn, merchant_id, constraints)
.await
.map_err(Into::into)
.into_report()
}
enums::MerchantStorageScheme::RedisKv => Err(errors::StorageError::KVError.into()),
}
}
}
}
@ -347,6 +375,20 @@ mod storage {
.map_err(Into::into)
.into_report()
}
#[cfg(feature = "olap")]
async fn apply_filters_on_payments_list(
&self,
merchant_id: &str,
constraints: &api::PaymentListFilterConstraints,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<(PaymentIntent, PaymentAttempt)>, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
PaymentIntent::apply_filters_on_payments(&conn, merchant_id, constraints)
.await
.map_err(Into::into)
.into_report()
}
}
}
@ -372,6 +414,17 @@ impl PaymentIntentInterface for MockDb {
// [#172]: Implement function for `MockDb`
Err(errors::StorageError::MockDbError)?
}
#[cfg(feature = "olap")]
async fn apply_filters_on_payments_list(
&self,
_merchant_id: &str,
_constraints: &api::PaymentListFilterConstraints,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<(types::PaymentIntent, types::PaymentAttempt)>, errors::StorageError>
{
// [#172]: Implement function for `MockDb`
Err(errors::StorageError::MockDbError)?
}
#[allow(clippy::panic)]
async fn insert_payment_intent(

View File

@ -150,7 +150,11 @@ impl Payments {
#[cfg(feature = "olap")]
{
route = route
.service(web::resource("/list").route(web::get().to(payments_list)))
.service(
web::resource("/list")
.route(web::get().to(payments_list))
.route(web::post().to(payments_list_by_filter)),
)
.service(web::resource("/filter").route(web::post().to(get_filters_for_payments)))
}
#[cfg(feature = "oltp")]

View File

@ -713,6 +713,28 @@ pub async fn payments_list(
.await
}
#[instrument(skip_all, fields(flow = ?Flow::PaymentsList))]
#[cfg(feature = "olap")]
pub async fn payments_list_by_filter(
state: web::Data<app::AppState>,
req: actix_web::HttpRequest,
payload: web::Json<payment_types::PaymentListFilterConstraints>,
) -> impl Responder {
let flow = Flow::PaymentsList;
let payload = payload.into_inner();
api::server_wrap(
flow,
state.get_ref(),
&req,
payload,
|state, auth, req| {
payments::apply_filters_on_payments(&*state.store, auth.merchant_account, req)
},
&auth::ApiKeyAuth,
)
.await
}
#[instrument(skip_all, fields(flow = ?Flow::PaymentsList))]
#[cfg(feature = "olap")]
pub async fn get_filters_for_payments(

View File

@ -2,13 +2,13 @@ pub use api_models::payments::{
AcceptanceType, Address, AddressDetails, Amount, AuthenticationForStartResponse, Card,
CryptoData, CustomerAcceptance, MandateData, MandateTransactionType, MandateType,
MandateValidationFields, NextActionType, OnlineMandate, PayLaterData, PaymentIdType,
PaymentListConstraints, PaymentListFilters, PaymentListResponse, PaymentMethodData,
PaymentMethodDataResponse, PaymentOp, PaymentRetrieveBody, PaymentRetrieveBodyWithCredentials,
PaymentsCancelRequest, PaymentsCaptureRequest, PaymentsRedirectRequest,
PaymentsRedirectionResponse, PaymentsRequest, PaymentsResponse, PaymentsResponseForm,
PaymentsRetrieveRequest, PaymentsSessionRequest, PaymentsSessionResponse, PaymentsStartRequest,
PgRedirectResponse, PhoneDetails, RedirectionResponse, SessionToken, TimeRange, UrlDetails,
VerifyRequest, VerifyResponse, WalletData,
PaymentListConstraints, PaymentListFilterConstraints, PaymentListFilters, PaymentListResponse,
PaymentMethodData, PaymentMethodDataResponse, PaymentOp, PaymentRetrieveBody,
PaymentRetrieveBodyWithCredentials, PaymentsCancelRequest, PaymentsCaptureRequest,
PaymentsRedirectRequest, PaymentsRedirectionResponse, PaymentsRequest, PaymentsResponse,
PaymentsResponseForm, PaymentsRetrieveRequest, PaymentsSessionRequest, PaymentsSessionResponse,
PaymentsStartRequest, PgRedirectResponse, PhoneDetails, RedirectionResponse, SessionToken,
TimeRange, UrlDetails, VerifyRequest, VerifyResponse, WalletData,
};
use error_stack::{IntoReport, ResultExt};
use masking::PeekInterface;

View File

@ -1,17 +1,23 @@
use async_bb8_diesel::AsyncRunQueryDsl;
use diesel::{associations::HasTable, ExpressionMethods, QueryDsl};
use diesel::{associations::HasTable, debug_query, pg::Pg, ExpressionMethods, JoinOnDsl, QueryDsl};
pub use diesel_models::{
errors,
payment_attempt::PaymentAttempt,
payment_intent::{
PaymentIntent, PaymentIntentNew, PaymentIntentUpdate, PaymentIntentUpdateInternal,
},
schema::payment_intent::dsl,
schema::{
payment_attempt::{self, dsl as dsl1},
payment_intent::dsl,
},
};
use error_stack::{IntoReport, ResultExt};
use router_env::{instrument, tracing};
use crate::{connection::PgPooledConn, core::errors::CustomResult, types::api};
const JOIN_LIMIT: i64 = 20;
#[cfg(feature = "kv_store")]
impl crate::utils::storage_partitioning::KvStorePartition for PaymentIntent {}
@ -28,6 +34,12 @@ pub trait PaymentIntentDbExt: Sized {
merchant_id: &str,
pc: &api::TimeRange,
) -> CustomResult<Vec<Self>, errors::DatabaseError>;
async fn apply_filters_on_payments(
conn: &PgPooledConn,
merchant_id: &str,
constraints: &api::PaymentListFilterConstraints,
) -> CustomResult<Vec<(PaymentIntent, PaymentAttempt)>, errors::DatabaseError>;
}
#[async_trait::async_trait]
@ -82,7 +94,7 @@ impl PaymentIntentDbExt for PaymentIntent {
filter = filter.limit(pc.limit);
crate::logger::debug!(query = %diesel::debug_query::<diesel::pg::Pg, _>(&filter).to_string());
crate::logger::debug!(query = %debug_query::<Pg, _>(&filter).to_string());
filter
.get_results_async(conn)
@ -114,6 +126,7 @@ impl PaymentIntentDbExt for PaymentIntent {
filter = filter.filter(dsl::created_at.le(end_time));
crate::logger::debug!(query = %debug_query::<Pg, _>(&filter).to_string());
filter
.get_results_async(conn)
.await
@ -121,4 +134,58 @@ impl PaymentIntentDbExt for PaymentIntent {
.change_context(errors::DatabaseError::Others)
.attach_printable("Error filtering records by time range")
}
#[instrument(skip(conn))]
async fn apply_filters_on_payments(
conn: &PgPooledConn,
merchant_id: &str,
constraints: &api::PaymentListFilterConstraints,
) -> CustomResult<Vec<(Self, PaymentAttempt)>, errors::DatabaseError> {
let offset = constraints.offset.unwrap_or_default();
let mut filter = Self::table()
.inner_join(payment_attempt::table.on(dsl1::attempt_id.eq(dsl::active_attempt_id)))
.filter(dsl::merchant_id.eq(merchant_id.to_owned()))
.order(dsl::created_at.desc())
.into_boxed();
match &constraints.payment_id {
Some(payment_id) => {
filter = filter.filter(dsl::payment_id.eq(payment_id.to_owned()));
}
None => {
filter = filter.limit(JOIN_LIMIT).offset(offset);
}
};
if let Some(time_range) = constraints.time_range {
filter = filter.filter(dsl::created_at.ge(time_range.start_time));
if let Some(end_time) = time_range.end_time {
filter = filter.filter(dsl::created_at.le(end_time));
}
}
if let Some(connector) = constraints.connector.clone() {
filter = filter.filter(dsl1::connector.eq_any(connector));
}
if let Some(filter_currency) = constraints.currency.clone() {
filter = filter.filter(dsl::currency.eq_any(filter_currency));
}
if let Some(status) = constraints.status.clone() {
filter = filter.filter(dsl::status.eq_any(status));
}
if let Some(payment_method) = constraints.payment_methods.clone() {
filter = filter.filter(dsl1::payment_method.eq_any(payment_method));
}
crate::logger::debug!(filter = %debug_query::<Pg, _>(&filter).to_string());
filter
.get_results_async(conn)
.await
.into_report()
.change_context(errors::DatabaseError::Others)
.attach_printable("Error filtering payment records")
}
}

View File

@ -9212,26 +9212,8 @@
}
},
"RefundListRequest": {
"type": "object",
"properties": {
"payment_id": {
"type": "string",
"description": "The identifier for the payment",
"nullable": true
},
"limit": {
"type": "integer",
"format": "int64",
"description": "Limit on the number of objects to return",
"nullable": true
},
"offset": {
"type": "integer",
"format": "int64",
"description": "The starting point within a list of objects",
"nullable": true
},
"time_range": {
"allOf": [
{
"allOf": [
{
"$ref": "#/components/schemas/TimeRange"
@ -9239,31 +9221,53 @@
],
"nullable": true
},
"connector": {
"type": "array",
"items": {
"type": "string"
},
"description": "The list of connectors to filter refunds list",
"nullable": true
},
"currency": {
"type": "array",
"items": {
"$ref": "#/components/schemas/Currency"
},
"description": "The list of currencies to filter refunds list",
"nullable": true
},
"refund_status": {
"type": "array",
"items": {
"$ref": "#/components/schemas/RefundStatus"
},
"description": "The list of refund statuses to filter refunds list",
"nullable": true
{
"type": "object",
"properties": {
"payment_id": {
"type": "string",
"description": "The identifier for the payment",
"nullable": true
},
"limit": {
"type": "integer",
"format": "int64",
"description": "Limit on the number of objects to return",
"nullable": true
},
"offset": {
"type": "integer",
"format": "int64",
"description": "The starting point within a list of objects",
"nullable": true
},
"connector": {
"type": "array",
"items": {
"type": "string"
},
"description": "The list of connectors to filter refunds list",
"nullable": true
},
"currency": {
"type": "array",
"items": {
"$ref": "#/components/schemas/Currency"
},
"description": "The list of currencies to filter refunds list",
"nullable": true
},
"refund_status": {
"type": "array",
"items": {
"$ref": "#/components/schemas/RefundStatus"
},
"description": "The list of refund statuses to filter refunds list",
"nullable": true
}
}
}
}
]
},
"RefundListResponse": {
"type": "object",