feat(payments): add support for aggregates in payments (#5654)

This commit is contained in:
Apoorv Dixit
2024-08-21 18:06:54 +05:30
committed by GitHub
parent 1e64ed79bc
commit 9f3b2fba3e
12 changed files with 154 additions and 14 deletions

View File

@ -18,12 +18,13 @@ use crate::{
payments::{ payments::{
ExtendedCardInfoResponse, PaymentIdType, PaymentListConstraints, ExtendedCardInfoResponse, PaymentIdType, PaymentListConstraints,
PaymentListFilterConstraints, PaymentListFilters, PaymentListFiltersV2, PaymentListFilterConstraints, PaymentListFilters, PaymentListFiltersV2,
PaymentListResponse, PaymentListResponseV2, PaymentsApproveRequest, PaymentsCancelRequest, PaymentListResponse, PaymentListResponseV2, PaymentsAggregateResponse,
PaymentsCaptureRequest, PaymentsCompleteAuthorizeRequest, PaymentsApproveRequest, PaymentsCancelRequest, PaymentsCaptureRequest,
PaymentsExternalAuthenticationRequest, PaymentsExternalAuthenticationResponse, PaymentsCompleteAuthorizeRequest, PaymentsExternalAuthenticationRequest,
PaymentsIncrementalAuthorizationRequest, PaymentsManualUpdateRequest, PaymentsExternalAuthenticationResponse, PaymentsIncrementalAuthorizationRequest,
PaymentsRejectRequest, PaymentsRequest, PaymentsResponse, PaymentsRetrieveRequest, PaymentsManualUpdateRequest, PaymentsRejectRequest, PaymentsRequest, PaymentsResponse,
PaymentsSessionResponse, PaymentsStartRequest, RedirectionResponse, PaymentsRetrieveRequest, PaymentsSessionResponse, PaymentsStartRequest,
RedirectionResponse,
}, },
}; };
impl ApiEventMetric for PaymentsRetrieveRequest { impl ApiEventMetric for PaymentsRetrieveRequest {
@ -225,6 +226,11 @@ impl ApiEventMetric for PaymentListResponseV2 {
Some(ApiEventsType::ResourceListAPI) Some(ApiEventsType::ResourceListAPI)
} }
} }
impl ApiEventMetric for PaymentsAggregateResponse {
fn get_api_event_type(&self) -> Option<ApiEventsType> {
Some(ApiEventsType::ResourceListAPI)
}
}
impl ApiEventMetric for RedirectionResponse {} impl ApiEventMetric for RedirectionResponse {}

View File

@ -4099,6 +4099,12 @@ pub struct PaymentListFiltersV2 {
pub authentication_type: Vec<enums::AuthenticationType>, pub authentication_type: Vec<enums::AuthenticationType>,
} }
#[derive(Clone, Debug, serde::Serialize)]
pub struct PaymentsAggregateResponse {
/// The list of intent status with their count
pub status_with_count: HashMap<enums::IntentStatus, i64>,
}
#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize, ToSchema)] #[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize, ToSchema)]
pub struct AmountFilter { pub struct AmountFilter {
/// The start amount to filter list of transactions which are greater than or equal to the start amount /// The start amount to filter list of transactions which are greater than or equal to the start amount

View File

@ -67,6 +67,13 @@ pub trait PaymentIntentInterface {
storage_scheme: storage_enums::MerchantStorageScheme, storage_scheme: storage_enums::MerchantStorageScheme,
) -> error_stack::Result<Vec<PaymentIntent>, errors::StorageError>; ) -> error_stack::Result<Vec<PaymentIntent>, errors::StorageError>;
#[cfg(feature = "olap")]
async fn get_intent_status_with_count(
&self,
merchant_id: &id_type::MerchantId,
constraints: &api_models::payments::TimeRange,
) -> error_stack::Result<Vec<(common_enums::IntentStatus, i64)>, errors::StorageError>;
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
async fn get_filtered_payment_intents_attempt( async fn get_filtered_payment_intents_attempt(
&self, &self,

View File

@ -3161,6 +3161,31 @@ pub async fn get_payment_filters(
)) ))
} }
#[cfg(feature = "olap")]
pub async fn get_aggregates_for_payments(
state: SessionState,
merchant: domain::MerchantAccount,
time_range: api::TimeRange,
) -> RouterResponse<api::PaymentsAggregateResponse> {
let db = state.store.as_ref();
let intent_status_with_count = db
.get_intent_status_with_count(merchant.get_id(), &time_range)
.await
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?;
let mut status_map: HashMap<enums::IntentStatus, i64> =
intent_status_with_count.into_iter().collect();
for status in enums::IntentStatus::iter() {
status_map.entry(status).or_default();
}
Ok(services::ApplicationResponse::Json(
api::PaymentsAggregateResponse {
status_with_count: status_map,
},
))
}
pub async fn add_process_sync_task( pub async fn add_process_sync_task(
db: &dyn StorageInterface, db: &dyn StorageInterface,
payment_attempt: &storage::PaymentAttempt, payment_attempt: &storage::PaymentAttempt,

View File

@ -1617,6 +1617,17 @@ impl PaymentIntentInterface for KafkaStore {
.await .await
} }
#[cfg(feature = "olap")]
async fn get_intent_status_with_count(
&self,
merchant_id: &id_type::MerchantId,
time_range: &api_models::payments::TimeRange,
) -> error_stack::Result<Vec<(common_enums::IntentStatus, i64)>, errors::DataStorageError> {
self.diesel_store
.get_intent_status_with_count(merchant_id, time_range)
.await
}
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
async fn get_filtered_payment_intents_attempt( async fn get_filtered_payment_intents_attempt(
&self, &self,

View File

@ -539,6 +539,7 @@ impl Payments {
) )
.service(web::resource("/filter").route(web::post().to(get_filters_for_payments))) .service(web::resource("/filter").route(web::post().to(get_filters_for_payments)))
.service(web::resource("/v2/filter").route(web::get().to(get_payment_filters))) .service(web::resource("/v2/filter").route(web::get().to(get_payment_filters)))
.service(web::resource("/aggregate").route(web::get().to(get_payments_aggregates)))
.service( .service(
web::resource("/{payment_id}/manual-update") web::resource("/{payment_id}/manual-update")
.route(web::put().to(payments_manual_update)), .route(web::put().to(payments_manual_update)),

View File

@ -126,6 +126,7 @@ impl From<Flow> for ApiIdentifier {
| Flow::PaymentsStart | Flow::PaymentsStart
| Flow::PaymentsList | Flow::PaymentsList
| Flow::PaymentsFilters | Flow::PaymentsFilters
| Flow::PaymentsAggregate
| Flow::PaymentsRedirect | Flow::PaymentsRedirect
| Flow::PaymentsIncrementalAuthorization | Flow::PaymentsIncrementalAuthorization
| Flow::PaymentsExternalAuthentication | Flow::PaymentsExternalAuthentication

View File

@ -1075,6 +1075,29 @@ pub async fn get_payment_filters(
.await .await
} }
#[instrument(skip_all, fields(flow = ?Flow::PaymentsAggregate))]
#[cfg(feature = "olap")]
pub async fn get_payments_aggregates(
state: web::Data<app::AppState>,
req: actix_web::HttpRequest,
payload: web::Query<payment_types::TimeRange>,
) -> impl Responder {
let flow = Flow::PaymentsAggregate;
let payload = payload.into_inner();
Box::pin(api::server_wrap(
flow,
state,
&req,
payload,
|state, auth: auth::AuthenticationData, req, _| {
payments::get_aggregates_for_payments(state, auth.merchant_account, req)
},
&auth::JWTAuth(Permission::PaymentRead),
api_locking::LockAction::NotApplicable,
))
.await
}
#[cfg(feature = "oltp")] #[cfg(feature = "oltp")]
#[instrument(skip_all, fields(flow = ?Flow::PaymentsApprove, payment_id))] #[instrument(skip_all, fields(flow = ?Flow::PaymentsApprove, payment_id))]
// #[post("/{payment_id}/approve")] // #[post("/{payment_id}/approve")]

View File

@ -5,14 +5,14 @@ pub use api_models::payments::{
OnlineMandate, OpenBankingSessionToken, PayLaterData, PaymentIdType, PaymentListConstraints, OnlineMandate, OpenBankingSessionToken, PayLaterData, PaymentIdType, PaymentListConstraints,
PaymentListFilterConstraints, PaymentListFilters, PaymentListFiltersV2, PaymentListResponse, PaymentListFilterConstraints, PaymentListFilters, PaymentListFiltersV2, PaymentListResponse,
PaymentListResponseV2, PaymentMethodData, PaymentMethodDataRequest, PaymentMethodDataResponse, PaymentListResponseV2, PaymentMethodData, PaymentMethodDataRequest, PaymentMethodDataResponse,
PaymentOp, PaymentRetrieveBody, PaymentRetrieveBodyWithCredentials, PaymentsApproveRequest, PaymentOp, PaymentRetrieveBody, PaymentRetrieveBodyWithCredentials, PaymentsAggregateResponse,
PaymentsCancelRequest, PaymentsCaptureRequest, PaymentsCompleteAuthorizeRequest, PaymentsApproveRequest, PaymentsCancelRequest, PaymentsCaptureRequest,
PaymentsExternalAuthenticationRequest, PaymentsIncrementalAuthorizationRequest, PaymentsCompleteAuthorizeRequest, PaymentsExternalAuthenticationRequest,
PaymentsManualUpdateRequest, PaymentsRedirectRequest, PaymentsRedirectionResponse, PaymentsIncrementalAuthorizationRequest, PaymentsManualUpdateRequest, PaymentsRedirectRequest,
PaymentsRejectRequest, PaymentsRequest, PaymentsResponse, PaymentsResponseForm, PaymentsRedirectionResponse, PaymentsRejectRequest, PaymentsRequest, PaymentsResponse,
PaymentsRetrieveRequest, PaymentsSessionRequest, PaymentsSessionResponse, PaymentsStartRequest, PaymentsResponseForm, PaymentsRetrieveRequest, PaymentsSessionRequest, PaymentsSessionResponse,
PgRedirectResponse, PhoneDetails, RedirectionResponse, SessionToken, TimeRange, UrlDetails, PaymentsStartRequest, PgRedirectResponse, PhoneDetails, RedirectionResponse, SessionToken,
VerifyRequest, VerifyResponse, WalletData, TimeRange, UrlDetails, VerifyRequest, VerifyResponse, WalletData,
}; };
use error_stack::ResultExt; use error_stack::ResultExt;
pub use hyperswitch_domain_models::router_flow_types::payments::{ pub use hyperswitch_domain_models::router_flow_types::payments::{

View File

@ -167,6 +167,8 @@ pub enum Flow {
PaymentsList, PaymentsList,
/// Payments filters flow /// Payments filters flow
PaymentsFilters, PaymentsFilters,
/// Payments aggregates flow
PaymentsAggregate,
#[cfg(feature = "payouts")] #[cfg(feature = "payouts")]
/// Payouts create flow /// Payouts create flow
PayoutsCreate, PayoutsCreate,

View File

@ -41,6 +41,15 @@ impl PaymentIntentInterface for MockDb {
Err(StorageError::MockDbError)? Err(StorageError::MockDbError)?
} }
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
async fn get_intent_status_with_count(
&self,
_merchant_id: &common_utils::id_type::MerchantId,
_time_range: &api_models::payments::TimeRange,
) -> CustomResult<Vec<(common_enums::IntentStatus, i64)>, StorageError> {
// [#172]: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
#[cfg(feature = "olap")]
async fn get_filtered_active_attempt_ids_for_total_count( async fn get_filtered_active_attempt_ids_for_total_count(
&self, &self,
_merchant_id: &common_utils::id_type::MerchantId, _merchant_id: &common_utils::id_type::MerchantId,

View File

@ -346,6 +346,16 @@ impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
) )
.await .await
} }
#[cfg(feature = "olap")]
async fn get_intent_status_with_count(
&self,
merchant_id: &common_utils::id_type::MerchantId,
time_range: &api_models::payments::TimeRange,
) -> error_stack::Result<Vec<(common_enums::IntentStatus, i64)>, StorageError> {
self.router_store
.get_intent_status_with_count(merchant_id, time_range)
.await
}
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
async fn get_filtered_payment_intents_attempt( async fn get_filtered_payment_intents_attempt(
@ -655,6 +665,45 @@ impl<T: DatabaseStore> PaymentIntentInterface for crate::RouterStore<T> {
.await .await
} }
#[cfg(feature = "olap")]
#[instrument(skip_all)]
async fn get_intent_status_with_count(
&self,
merchant_id: &common_utils::id_type::MerchantId,
time_range: &api_models::payments::TimeRange,
) -> error_stack::Result<Vec<(common_enums::IntentStatus, i64)>, StorageError> {
let conn = connection::pg_connection_read(self).await.switch()?;
let conn = async_bb8_diesel::Connection::as_async_conn(&conn);
let mut query = <DieselPaymentIntent as HasTable>::table()
.group_by(pi_dsl::status)
.select((pi_dsl::status, diesel::dsl::count_star()))
.filter(pi_dsl::merchant_id.eq(merchant_id.to_owned()))
.into_boxed();
query = query.filter(pi_dsl::created_at.ge(time_range.start_time));
query = match time_range.end_time {
Some(ending_at) => query.filter(pi_dsl::created_at.le(ending_at)),
None => query,
};
logger::debug!(filter = %diesel::debug_query::<diesel::pg::Pg,_>(&query).to_string());
db_metrics::track_database_call::<<DieselPaymentIntent as HasTable>::Table, _, _>(
query.get_results_async::<(common_enums::IntentStatus, i64)>(conn),
db_metrics::DatabaseOperation::Filter,
)
.await
.map_err(|er| {
StorageError::DatabaseError(
error_stack::report!(diesel_models::errors::DatabaseError::from(er))
.attach_printable("Error filtering payment records"),
)
.into()
})
}
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
#[instrument(skip_all)] #[instrument(skip_all)]
async fn get_filtered_payment_intents_attempt( async fn get_filtered_payment_intents_attempt(