feat(router): get filters for payments (#1600)

This commit is contained in:
Apoorv Dixit
2023-07-07 16:10:01 +05:30
committed by GitHub
parent f77fdb7a6e
commit d5891ecbd4
12 changed files with 341 additions and 15 deletions

View File

@ -23,6 +23,7 @@ pub enum PaymentOp {
Confirm, Confirm,
} }
use crate::enums;
#[derive(serde::Deserialize)] #[derive(serde::Deserialize)]
pub struct BankData { pub struct BankData {
pub payment_method_type: api_enums::PaymentMethodType, pub payment_method_type: api_enums::PaymentMethodType,
@ -1597,6 +1598,34 @@ pub struct PaymentListResponse {
pub data: Vec<PaymentsResponse>, pub data: Vec<PaymentsResponse>,
} }
#[derive(Clone, Debug, serde::Serialize, ToSchema)]
pub struct PaymentListFilters {
/// The list of available connector filters
#[schema(value_type = Vec<api_enums::Connector>)]
pub connector: Vec<String>,
/// The list of available currency filters
#[schema(value_type = Vec<Currency>)]
pub currency: Vec<enums::Currency>,
/// The list of available payment status filters
#[schema(value_type = Vec<IntentStatus>)]
pub status: Vec<enums::IntentStatus>,
/// The list of available payment method filters
#[schema(value_type = Vec<PaymentMethod>)]
pub payment_method: Vec<enums::PaymentMethod>,
}
#[derive(
Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, Hash, ToSchema,
)]
pub struct TimeRange {
/// The start time to filter payments list or to get list of filters. To get list of filters start time is needed to be passed
#[serde(with = "common_utils::custom_serde::iso8601")]
pub start_time: PrimitiveDateTime,
/// The end time to filter payments list or to get list of filters. If not passed the default time is now
#[serde(default, with = "common_utils::custom_serde::iso8601::option")]
pub end_time: Option<PrimitiveDateTime>,
}
#[derive(Setter, Clone, Default, Debug, PartialEq, serde::Serialize)] #[derive(Setter, Clone, Default, Debug, PartialEq, serde::Serialize)]
pub struct VerifyResponse { pub struct VerifyResponse {
pub verify_id: Option<String>, pub verify_id: Option<String>,

View File

@ -1265,6 +1265,38 @@ pub async fn list_payments(
)) ))
} }
#[cfg(feature = "olap")]
pub async fn get_filters_for_payments(
db: &dyn StorageInterface,
merchant: domain::MerchantAccount,
time_range: api::TimeRange,
) -> RouterResponse<api::PaymentListFilters> {
use crate::types::transformers::ForeignFrom;
let pi = db
.filter_payment_intents_by_time_range_constraints(
&merchant.merchant_id,
&time_range,
merchant.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?;
let filters = db
.get_filters_for_payments(
&pi,
&merchant.merchant_id,
// since OLAP doesn't have KV. Force to get the data from PSQL.
storage_enums::MerchantStorageScheme::PostgresOnly,
)
.await
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?;
let filters: api::PaymentListFilters = ForeignFrom::foreign_from(filters);
Ok(services::ApplicationResponse::Json(filters))
}
pub async fn add_process_sync_task( pub async fn add_process_sync_task(
db: &dyn StorageInterface, db: &dyn StorageInterface,
payment_attempt: &storage::PaymentAttempt, payment_attempt: &storage::PaymentAttempt,

View File

@ -4,7 +4,7 @@ use api_models::payments::OrderDetailsWithAmount;
use common_utils::fp_utils; use common_utils::fp_utils;
use error_stack::ResultExt; use error_stack::ResultExt;
use router_env::{instrument, tracing}; use router_env::{instrument, tracing};
use storage_models::ephemeral_key; use storage_models::{ephemeral_key, payment_attempt::PaymentListFilters};
use super::{flows::Feature, PaymentAddress, PaymentData}; use super::{flows::Feature, PaymentAddress, PaymentData};
use crate::{ use crate::{
@ -599,6 +599,29 @@ impl ForeignFrom<(storage::PaymentIntent, storage::PaymentAttempt)> for api::Pay
} }
} }
impl ForeignFrom<PaymentListFilters> for api_models::payments::PaymentListFilters {
fn foreign_from(item: PaymentListFilters) -> Self {
Self {
connector: item.connector,
currency: item
.currency
.into_iter()
.map(ForeignInto::foreign_into)
.collect(),
status: item
.status
.into_iter()
.map(ForeignInto::foreign_into)
.collect(),
payment_method: item
.payment_method
.into_iter()
.map(ForeignInto::foreign_into)
.collect(),
}
}
}
impl ForeignFrom<ephemeral_key::EphemeralKey> for api::ephemeral_key::EphemeralKeyCreateResponse { impl ForeignFrom<ephemeral_key::EphemeralKey> for api::ephemeral_key::EphemeralKeyCreateResponse {
fn foreign_from(from: ephemeral_key::EphemeralKey) -> Self { fn foreign_from(from: ephemeral_key::EphemeralKey) -> Self {
Self { Self {

View File

@ -62,6 +62,13 @@ pub trait PaymentAttemptInterface {
merchant_id: &str, merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme, storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<types::PaymentAttempt, errors::StorageError>; ) -> CustomResult<types::PaymentAttempt, errors::StorageError>;
async fn get_filters_for_payments(
&self,
pi: &[storage_models::payment_intent::PaymentIntent],
merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_models::payment_attempt::PaymentListFilters, errors::StorageError>;
} }
#[cfg(not(feature = "kv_store"))] #[cfg(not(feature = "kv_store"))]
@ -177,6 +184,20 @@ mod storage {
.into_report() .into_report()
} }
async fn get_filters_for_payments(
&self,
pi: &[storage_models::payment_intent::PaymentIntent],
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_models::payment_attempt::PaymentListFilters, errors::StorageError>
{
let conn = connection::pg_connection_read(self).await?;
PaymentAttempt::get_filters_for_payments(&conn, pi, merchant_id)
.await
.map_err(Into::into)
.into_report()
}
async fn find_payment_attempt_by_preprocessing_id_merchant_id( async fn find_payment_attempt_by_preprocessing_id_merchant_id(
&self, &self,
preprocessing_id: &str, preprocessing_id: &str,
@ -224,6 +245,16 @@ impl PaymentAttemptInterface for MockDb {
Err(errors::StorageError::MockDbError)? Err(errors::StorageError::MockDbError)?
} }
async fn get_filters_for_payments(
&self,
_pi: &[storage_models::payment_intent::PaymentIntent],
_merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_models::payment_attempt::PaymentListFilters, errors::StorageError>
{
Err(errors::StorageError::MockDbError)?
}
async fn find_payment_attempt_by_attempt_id_merchant_id( async fn find_payment_attempt_by_attempt_id_merchant_id(
&self, &self,
_attempt_id: &str, _attempt_id: &str,
@ -798,6 +829,20 @@ mod storage {
} }
} }
} }
async fn get_filters_for_payments(
&self,
pi: &[storage_models::payment_intent::PaymentIntent],
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_models::payment_attempt::PaymentListFilters, errors::StorageError>
{
let conn = connection::pg_connection_read(self).await?;
PaymentAttempt::get_filters_for_payments(&conn, pi, merchant_id)
.await
.map_err(Into::into)
.into_report()
}
} }
#[inline] #[inline]

View File

@ -35,6 +35,14 @@ pub trait PaymentIntentInterface {
pc: &api::PaymentListConstraints, pc: &api::PaymentListConstraints,
storage_scheme: enums::MerchantStorageScheme, storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<types::PaymentIntent>, errors::StorageError>; ) -> CustomResult<Vec<types::PaymentIntent>, errors::StorageError>;
#[cfg(feature = "olap")]
async fn filter_payment_intents_by_time_range_constraints(
&self,
merchant_id: &str,
time_range: &api::TimeRange,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<types::PaymentIntent>, errors::StorageError>;
} }
#[cfg(feature = "kv_store")] #[cfg(feature = "kv_store")]
@ -237,6 +245,25 @@ mod storage {
.into_report() .into_report()
} }
enums::MerchantStorageScheme::RedisKv => Err(errors::StorageError::KVError.into()),
}
}
#[cfg(feature = "olap")]
async fn filter_payment_intents_by_time_range_constraints(
&self,
merchant_id: &str,
time_range: &api::TimeRange,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<PaymentIntent>, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = connection::pg_connection_read(self).await?;
PaymentIntent::filter_by_time_constraints(&conn, merchant_id, time_range)
.await
.map_err(Into::into)
.into_report()
}
enums::MerchantStorageScheme::RedisKv => Err(errors::StorageError::KVError.into()), enums::MerchantStorageScheme::RedisKv => Err(errors::StorageError::KVError.into()),
} }
} }
@ -307,6 +334,19 @@ mod storage {
.map_err(Into::into) .map_err(Into::into)
.into_report() .into_report()
} }
#[cfg(feature = "olap")]
async fn filter_payment_intents_by_time_range_constraints(
&self,
merchant_id: &str,
time_range: &api::TimeRange,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<PaymentIntent>, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
PaymentIntent::filter_by_time_constraints(&conn, merchant_id, time_range)
.await
.map_err(Into::into)
.into_report()
}
} }
} }
@ -322,6 +362,16 @@ impl PaymentIntentInterface for MockDb {
// [#172]: Implement function for `MockDb` // [#172]: Implement function for `MockDb`
Err(errors::StorageError::MockDbError)? Err(errors::StorageError::MockDbError)?
} }
#[cfg(feature = "olap")]
async fn filter_payment_intents_by_time_range_constraints(
&self,
_merchant_id: &str,
_time_range: &api::TimeRange,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<types::PaymentIntent>, errors::StorageError> {
// [#172]: Implement function for `MockDb`
Err(errors::StorageError::MockDbError)?
}
#[allow(clippy::panic)] #[allow(clippy::panic)]
async fn insert_payment_intent( async fn insert_payment_intent(

View File

@ -147,7 +147,9 @@ impl Payments {
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
{ {
route = route.service(web::resource("/list").route(web::get().to(payments_list))); route = route
.service(web::resource("/list").route(web::get().to(payments_list)))
.service(web::resource("/filter").route(web::post().to(get_filters_for_payments)))
} }
#[cfg(feature = "oltp")] #[cfg(feature = "oltp")]
{ {

View File

@ -692,7 +692,6 @@ pub async fn payments_cancel(
)] )]
#[instrument(skip_all, fields(flow = ?Flow::PaymentsList))] #[instrument(skip_all, fields(flow = ?Flow::PaymentsList))]
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
// #[get("/list")]
pub async fn payments_list( pub async fn payments_list(
state: web::Data<app::AppState>, state: web::Data<app::AppState>,
req: actix_web::HttpRequest, req: actix_web::HttpRequest,
@ -711,6 +710,28 @@ pub async fn payments_list(
.await .await
} }
#[instrument(skip_all, fields(flow = ?Flow::PaymentsList))]
#[cfg(feature = "olap")]
pub async fn get_filters_for_payments(
state: web::Data<app::AppState>,
req: actix_web::HttpRequest,
payload: web::Json<payment_types::TimeRange>,
) -> impl Responder {
let flow = Flow::PaymentsList;
let payload = payload.into_inner();
api::server_wrap(
flow,
state.get_ref(),
&req,
payload,
|state, auth, req| {
payments::get_filters_for_payments(&*state.store, auth.merchant_account, req)
},
&auth::ApiKeyAuth,
)
.await
}
async fn authorize_verify_select<Op>( async fn authorize_verify_select<Op>(
operation: Op, operation: Op,
state: &app::AppState, state: &app::AppState,

View File

@ -2,12 +2,13 @@ pub use api_models::payments::{
AcceptanceType, Address, AddressDetails, Amount, AuthenticationForStartResponse, Card, AcceptanceType, Address, AddressDetails, Amount, AuthenticationForStartResponse, Card,
CryptoData, CustomerAcceptance, MandateData, MandateTransactionType, MandateType, CryptoData, CustomerAcceptance, MandateData, MandateTransactionType, MandateType,
MandateValidationFields, NextActionType, OnlineMandate, PayLaterData, PaymentIdType, MandateValidationFields, NextActionType, OnlineMandate, PayLaterData, PaymentIdType,
PaymentListConstraints, PaymentListResponse, PaymentMethodData, PaymentMethodDataResponse, PaymentListConstraints, PaymentListFilters, PaymentListResponse, PaymentMethodData,
PaymentOp, PaymentRetrieveBody, PaymentRetrieveBodyWithCredentials, PaymentsCancelRequest, PaymentMethodDataResponse, PaymentOp, PaymentRetrieveBody, PaymentRetrieveBodyWithCredentials,
PaymentsCaptureRequest, PaymentsRedirectRequest, PaymentsRedirectionResponse, PaymentsRequest, PaymentsCancelRequest, PaymentsCaptureRequest, PaymentsRedirectRequest,
PaymentsResponse, PaymentsResponseForm, PaymentsRetrieveRequest, PaymentsSessionRequest, PaymentsRedirectionResponse, PaymentsRequest, PaymentsResponse, PaymentsResponseForm,
PaymentsSessionResponse, PaymentsStartRequest, PgRedirectResponse, PhoneDetails, PaymentsRetrieveRequest, PaymentsSessionRequest, PaymentsSessionResponse, PaymentsStartRequest,
RedirectionResponse, SessionToken, UrlDetails, VerifyRequest, VerifyResponse, WalletData, PgRedirectResponse, PhoneDetails, RedirectionResponse, SessionToken, TimeRange, UrlDetails,
VerifyRequest, VerifyResponse, WalletData,
}; };
use error_stack::{IntoReport, ResultExt}; use error_stack::{IntoReport, ResultExt};
use masking::PeekInterface; use masking::PeekInterface;

View File

@ -22,6 +22,12 @@ pub trait PaymentIntentDbExt: Sized {
merchant_id: &str, merchant_id: &str,
pc: &api::PaymentListConstraints, pc: &api::PaymentListConstraints,
) -> CustomResult<Vec<Self>, errors::DatabaseError>; ) -> CustomResult<Vec<Self>, errors::DatabaseError>;
async fn filter_by_time_constraints(
conn: &PgPooledConn,
merchant_id: &str,
pc: &api::TimeRange,
) -> CustomResult<Vec<Self>, errors::DatabaseError>;
} }
#[async_trait::async_trait] #[async_trait::async_trait]
@ -85,4 +91,34 @@ impl PaymentIntentDbExt for PaymentIntent {
.change_context(errors::DatabaseError::NotFound) .change_context(errors::DatabaseError::NotFound)
.attach_printable_lazy(|| "Error filtering records by predicate") .attach_printable_lazy(|| "Error filtering records by predicate")
} }
#[instrument(skip(conn))]
async fn filter_by_time_constraints(
conn: &PgPooledConn,
merchant_id: &str,
time_range: &api::TimeRange,
) -> CustomResult<Vec<Self>, errors::DatabaseError> {
let start_time = time_range.start_time;
let end_time = time_range
.end_time
.unwrap_or_else(common_utils::date_time::now);
//[#350]: Replace this with Boxable Expression and pass it into generic filter
// when https://github.com/rust-lang/rust/issues/52662 becomes stable
let mut filter = <Self as HasTable>::table()
.filter(dsl::merchant_id.eq(merchant_id.to_owned()))
.order(dsl::modified_at.desc())
.into_boxed();
filter = filter.filter(dsl::created_at.ge(start_time));
filter = filter.filter(dsl::created_at.le(end_time));
filter
.get_results_async(conn)
.await
.into_report()
.change_context(errors::DatabaseError::Others)
.attach_printable("Error filtering records by time range")
}
} }

View File

@ -344,6 +344,7 @@ pub enum EventType {
Debug, Debug,
Default, Default,
Eq, Eq,
Hash,
PartialEq, PartialEq,
serde::Deserialize, serde::Deserialize,
serde::Serialize, serde::Serialize,

View File

@ -2,7 +2,10 @@ use diesel::{AsChangeset, Identifiable, Insertable, Queryable};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use time::PrimitiveDateTime; use time::PrimitiveDateTime;
use crate::{enums as storage_enums, schema::payment_attempt}; use crate::{
enums::{self as storage_enums},
schema::payment_attempt,
};
#[derive(Clone, Debug, Eq, PartialEq, Identifiable, Queryable, Serialize, Deserialize)] #[derive(Clone, Debug, Eq, PartialEq, Identifiable, Queryable, Serialize, Deserialize)]
#[diesel(table_name = payment_attempt)] #[diesel(table_name = payment_attempt)]
@ -52,6 +55,14 @@ pub struct PaymentAttempt {
pub error_reason: Option<String>, pub error_reason: Option<String>,
} }
#[derive(Clone, Debug, Eq, PartialEq, Queryable, Serialize, Deserialize)]
pub struct PaymentListFilters {
pub connector: Vec<String>,
pub currency: Vec<storage_enums::Currency>,
pub status: Vec<storage_enums::IntentStatus>,
pub payment_method: Vec<storage_enums::PaymentMethod>,
}
#[derive( #[derive(
Clone, Debug, Default, Insertable, router_derive::DebugAsDisplay, Serialize, Deserialize, Clone, Debug, Default, Insertable, router_derive::DebugAsDisplay, Serialize, Deserialize,
)] )]

View File

@ -1,13 +1,19 @@
use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods, Table}; use std::collections::HashSet;
use error_stack::IntoReport;
use async_bb8_diesel::AsyncRunQueryDsl;
use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods, QueryDsl, Table};
use error_stack::{IntoReport, ResultExt};
use router_env::{instrument, tracing}; use router_env::{instrument, tracing};
use super::generics; use super::generics;
use crate::{ use crate::{
enums, errors, enums::{self, IntentStatus},
errors::{self, DatabaseError},
payment_attempt::{ payment_attempt::{
PaymentAttempt, PaymentAttemptNew, PaymentAttemptUpdate, PaymentAttemptUpdateInternal, PaymentAttempt, PaymentAttemptNew, PaymentAttemptUpdate, PaymentAttemptUpdateInternal,
PaymentListFilters,
}, },
payment_intent::PaymentIntent,
schema::payment_attempt::dsl, schema::payment_attempt::dsl,
PgPooledConn, StorageResult, PgPooledConn, StorageResult,
}; };
@ -41,7 +47,7 @@ impl PaymentAttempt {
.await .await
{ {
Err(error) => match error.current_context() { Err(error) => match error.current_context() {
errors::DatabaseError::NoFieldsToUpdate => Ok(self), DatabaseError::NoFieldsToUpdate => Ok(self),
_ => Err(error), _ => Err(error),
}, },
result => result, result => result,
@ -104,7 +110,7 @@ impl PaymentAttempt {
.await? .await?
.into_iter() .into_iter()
.fold( .fold(
Err(errors::DatabaseError::NotFound).into_report(), Err(DatabaseError::NotFound).into_report(),
|acc, cur| match acc { |acc, cur| match acc {
Ok(value) if value.modified_at > cur.modified_at => Ok(value), Ok(value) if value.modified_at > cur.modified_at => Ok(value),
_ => Ok(cur), _ => Ok(cur),
@ -174,4 +180,73 @@ impl PaymentAttempt {
) )
.await .await
} }
pub async fn get_filters_for_payments(
conn: &PgPooledConn,
pi: &[PaymentIntent],
merchant_id: &str,
) -> StorageResult<PaymentListFilters> {
let active_attempts: Vec<String> = pi
.iter()
.map(|payment_intent| payment_intent.clone().active_attempt_id)
.collect();
let filter = <Self as HasTable>::table()
.filter(dsl::merchant_id.eq(merchant_id.to_owned()))
.filter(dsl::attempt_id.eq_any(active_attempts));
let intent_status: Vec<IntentStatus> = pi
.iter()
.map(|payment_intent| payment_intent.status)
.collect::<HashSet<IntentStatus>>()
.into_iter()
.collect();
let filter_connector = filter
.clone()
.select(dsl::connector)
.distinct()
.get_results_async::<Option<String>>(conn)
.await
.into_report()
.change_context(errors::DatabaseError::Others)
.attach_printable("Error filtering records by connector")?
.into_iter()
.flatten()
.collect::<Vec<String>>();
let filter_currency = filter
.clone()
.select(dsl::currency)
.distinct()
.get_results_async::<Option<enums::Currency>>(conn)
.await
.into_report()
.change_context(DatabaseError::Others)
.attach_printable("Error filtering records by currency")?
.into_iter()
.flatten()
.collect::<Vec<enums::Currency>>();
let filter_payment_method = filter
.clone()
.select(dsl::payment_method)
.distinct()
.get_results_async::<Option<enums::PaymentMethod>>(conn)
.await
.into_report()
.change_context(DatabaseError::Others)
.attach_printable("Error filtering records by payment method")?
.into_iter()
.flatten()
.collect::<Vec<enums::PaymentMethod>>();
let filters = PaymentListFilters {
connector: filter_connector,
currency: filter_currency,
status: intent_status,
payment_method: filter_payment_method,
};
Ok(filters)
}
} }