mirror of
				https://github.com/juspay/hyperswitch.git
				synced 2025-11-01 02:57:02 +08:00 
			
		
		
		
	feat(disputes): add support for disputes aggregate (#5896)
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
		| @ -1,3 +1,5 @@ | |||||||
|  | use std::collections::HashMap; | ||||||
|  |  | ||||||
| use masking::{Deserialize, Serialize}; | use masking::{Deserialize, Serialize}; | ||||||
| use time::PrimitiveDateTime; | use time::PrimitiveDateTime; | ||||||
| use utoipa::ToSchema; | use utoipa::ToSchema; | ||||||
| @ -208,3 +210,9 @@ pub struct DeleteEvidenceRequest { | |||||||
|     /// Evidence Type to be deleted |     /// Evidence Type to be deleted | ||||||
|     pub evidence_type: EvidenceType, |     pub evidence_type: EvidenceType, | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[derive(Clone, Debug, serde::Serialize)] | ||||||
|  | pub struct DisputesAggregateResponse { | ||||||
|  |     /// Different status of disputes with their count | ||||||
|  |     pub status_with_count: HashMap<DisputeStatus, i64>, | ||||||
|  | } | ||||||
|  | |||||||
| @ -1,7 +1,8 @@ | |||||||
| use common_utils::events::{ApiEventMetric, ApiEventsType}; | use common_utils::events::{ApiEventMetric, ApiEventsType}; | ||||||
|  |  | ||||||
| use super::{ | use super::{ | ||||||
|     DeleteEvidenceRequest, DisputeResponse, DisputeResponsePaymentsRetrieve, SubmitEvidenceRequest, |     DeleteEvidenceRequest, DisputeResponse, DisputeResponsePaymentsRetrieve, | ||||||
|  |     DisputesAggregateResponse, SubmitEvidenceRequest, | ||||||
| }; | }; | ||||||
|  |  | ||||||
| impl ApiEventMetric for SubmitEvidenceRequest { | impl ApiEventMetric for SubmitEvidenceRequest { | ||||||
| @ -32,3 +33,9 @@ impl ApiEventMetric for DeleteEvidenceRequest { | |||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | impl ApiEventMetric for DisputesAggregateResponse { | ||||||
|  |     fn get_api_event_type(&self) -> Option<ApiEventsType> { | ||||||
|  |         Some(ApiEventsType::ResourceListAPI) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | |||||||
| @ -1843,6 +1843,7 @@ pub enum DisputeStage { | |||||||
|     serde::Serialize, |     serde::Serialize, | ||||||
|     strum::Display, |     strum::Display, | ||||||
|     strum::EnumString, |     strum::EnumString, | ||||||
|  |     strum::EnumIter, | ||||||
|     ToSchema, |     ToSchema, | ||||||
| )] | )] | ||||||
| #[router_derive::diesel_enum(storage_type = "db_enum")] | #[router_derive::diesel_enum(storage_type = "db_enum")] | ||||||
|  | |||||||
| @ -3,6 +3,9 @@ use common_utils::ext_traits::{Encode, ValueExt}; | |||||||
| use error_stack::ResultExt; | use error_stack::ResultExt; | ||||||
| use router_env::{instrument, tracing}; | use router_env::{instrument, tracing}; | ||||||
| pub mod transformers; | pub mod transformers; | ||||||
|  | use std::collections::HashMap; | ||||||
|  |  | ||||||
|  | use strum::IntoEnumIterator; | ||||||
|  |  | ||||||
| use super::{ | use super::{ | ||||||
|     errors::{self, ConnectorErrorExt, RouterResponse, StorageErrorExt}, |     errors::{self, ConnectorErrorExt, RouterResponse, StorageErrorExt}, | ||||||
| @ -507,3 +510,31 @@ pub async fn delete_evidence( | |||||||
|         })?; |         })?; | ||||||
|     Ok(services::ApplicationResponse::StatusOk) |     Ok(services::ApplicationResponse::StatusOk) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[instrument(skip(state))] | ||||||
|  | pub async fn get_aggregates_for_disputes( | ||||||
|  |     state: SessionState, | ||||||
|  |     merchant: domain::MerchantAccount, | ||||||
|  |     profile_id_list: Option<Vec<common_utils::id_type::ProfileId>>, | ||||||
|  |     time_range: api::TimeRange, | ||||||
|  | ) -> RouterResponse<dispute_models::DisputesAggregateResponse> { | ||||||
|  |     let db = state.store.as_ref(); | ||||||
|  |     let dispute_status_with_count = db | ||||||
|  |         .get_dispute_status_with_count(merchant.get_id(), profile_id_list, &time_range) | ||||||
|  |         .await | ||||||
|  |         .change_context(errors::ApiErrorResponse::InternalServerError) | ||||||
|  |         .attach_printable("Unable to retrieve disputes aggregate")?; | ||||||
|  |  | ||||||
|  |     let mut status_map: HashMap<storage_enums::DisputeStatus, i64> = | ||||||
|  |         dispute_status_with_count.into_iter().collect(); | ||||||
|  |  | ||||||
|  |     for status in storage_enums::DisputeStatus::iter() { | ||||||
|  |         status_map.entry(status).or_default(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     Ok(services::ApplicationResponse::Json( | ||||||
|  |         dispute_models::DisputesAggregateResponse { | ||||||
|  |             status_with_count: status_map, | ||||||
|  |         }, | ||||||
|  |     )) | ||||||
|  | } | ||||||
|  | |||||||
| @ -1,3 +1,5 @@ | |||||||
|  | use std::collections::HashMap; | ||||||
|  |  | ||||||
| use error_stack::report; | use error_stack::report; | ||||||
| use router_env::{instrument, tracing}; | use router_env::{instrument, tracing}; | ||||||
|  |  | ||||||
| @ -45,6 +47,13 @@ pub trait DisputeInterface { | |||||||
|         this: storage::Dispute, |         this: storage::Dispute, | ||||||
|         dispute: storage::DisputeUpdate, |         dispute: storage::DisputeUpdate, | ||||||
|     ) -> CustomResult<storage::Dispute, errors::StorageError>; |     ) -> CustomResult<storage::Dispute, errors::StorageError>; | ||||||
|  |  | ||||||
|  |     async fn get_dispute_status_with_count( | ||||||
|  |         &self, | ||||||
|  |         merchant_id: &common_utils::id_type::MerchantId, | ||||||
|  |         profile_id_list: Option<Vec<common_utils::id_type::ProfileId>>, | ||||||
|  |         time_range: &api_models::payments::TimeRange, | ||||||
|  |     ) -> CustomResult<Vec<(common_enums::enums::DisputeStatus, i64)>, errors::StorageError>; | ||||||
| } | } | ||||||
|  |  | ||||||
| #[async_trait::async_trait] | #[async_trait::async_trait] | ||||||
| @ -126,6 +135,24 @@ impl DisputeInterface for Store { | |||||||
|             .await |             .await | ||||||
|             .map_err(|error| report!(errors::StorageError::from(error))) |             .map_err(|error| report!(errors::StorageError::from(error))) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     #[instrument(skip_all)] | ||||||
|  |     async fn get_dispute_status_with_count( | ||||||
|  |         &self, | ||||||
|  |         merchant_id: &common_utils::id_type::MerchantId, | ||||||
|  |         profile_id_list: Option<Vec<common_utils::id_type::ProfileId>>, | ||||||
|  |         time_range: &api_models::payments::TimeRange, | ||||||
|  |     ) -> CustomResult<Vec<(common_enums::DisputeStatus, i64)>, errors::StorageError> { | ||||||
|  |         let conn = connection::pg_connection_read(self).await?; | ||||||
|  |         storage::Dispute::get_dispute_status_with_count( | ||||||
|  |             &conn, | ||||||
|  |             merchant_id, | ||||||
|  |             profile_id_list, | ||||||
|  |             time_range, | ||||||
|  |         ) | ||||||
|  |         .await | ||||||
|  |         .map_err(|error| report!(errors::StorageError::from(error))) | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| #[async_trait::async_trait] | #[async_trait::async_trait] | ||||||
| @ -358,6 +385,50 @@ impl DisputeInterface for MockDb { | |||||||
|  |  | ||||||
|         Ok(dispute_to_update.clone()) |         Ok(dispute_to_update.clone()) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     async fn get_dispute_status_with_count( | ||||||
|  |         &self, | ||||||
|  |         merchant_id: &common_utils::id_type::MerchantId, | ||||||
|  |         profile_id_list: Option<Vec<common_utils::id_type::ProfileId>>, | ||||||
|  |         time_range: &api_models::payments::TimeRange, | ||||||
|  |     ) -> CustomResult<Vec<(common_enums::DisputeStatus, i64)>, errors::StorageError> { | ||||||
|  |         let locked_disputes = self.disputes.lock().await; | ||||||
|  |  | ||||||
|  |         let filtered_disputes_data = locked_disputes | ||||||
|  |             .iter() | ||||||
|  |             .filter(|d| { | ||||||
|  |                 d.merchant_id == *merchant_id | ||||||
|  |                     && d.created_at >= time_range.start_time | ||||||
|  |                     && time_range | ||||||
|  |                         .end_time | ||||||
|  |                         .as_ref() | ||||||
|  |                         .map(|received_end_time| received_end_time >= &d.created_at) | ||||||
|  |                         .unwrap_or(true) | ||||||
|  |                     && profile_id_list | ||||||
|  |                         .as_ref() | ||||||
|  |                         .zip(d.profile_id.as_ref()) | ||||||
|  |                         .map(|(received_profile_list, received_profile_id)| { | ||||||
|  |                             received_profile_list.contains(received_profile_id) | ||||||
|  |                         }) | ||||||
|  |                         .unwrap_or(true) | ||||||
|  |             }) | ||||||
|  |             .cloned() | ||||||
|  |             .collect::<Vec<storage::Dispute>>(); | ||||||
|  |  | ||||||
|  |         Ok(filtered_disputes_data | ||||||
|  |             .into_iter() | ||||||
|  |             .fold( | ||||||
|  |                 HashMap::new(), | ||||||
|  |                 |mut acc: HashMap<common_enums::DisputeStatus, i64>, value| { | ||||||
|  |                     acc.entry(value.dispute_status) | ||||||
|  |                         .and_modify(|value| *value += 1) | ||||||
|  |                         .or_insert(1); | ||||||
|  |                     acc | ||||||
|  |                 }, | ||||||
|  |             ) | ||||||
|  |             .into_iter() | ||||||
|  |             .collect::<Vec<(common_enums::DisputeStatus, i64)>>()) | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| #[cfg(test)] | #[cfg(test)] | ||||||
|  | |||||||
| @ -629,6 +629,17 @@ impl DisputeInterface for KafkaStore { | |||||||
|             .find_disputes_by_merchant_id_payment_id(merchant_id, payment_id) |             .find_disputes_by_merchant_id_payment_id(merchant_id, payment_id) | ||||||
|             .await |             .await | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     async fn get_dispute_status_with_count( | ||||||
|  |         &self, | ||||||
|  |         merchant_id: &id_type::MerchantId, | ||||||
|  |         profile_id_list: Option<Vec<id_type::ProfileId>>, | ||||||
|  |         time_range: &api_models::payments::TimeRange, | ||||||
|  |     ) -> CustomResult<Vec<(common_enums::DisputeStatus, i64)>, errors::StorageError> { | ||||||
|  |         self.diesel_store | ||||||
|  |             .get_dispute_status_with_count(merchant_id, profile_id_list, time_range) | ||||||
|  |             .await | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| #[async_trait::async_trait] | #[async_trait::async_trait] | ||||||
|  | |||||||
| @ -1495,6 +1495,13 @@ impl Disputes { | |||||||
|                 web::resource("/accept/{dispute_id}") |                 web::resource("/accept/{dispute_id}") | ||||||
|                     .route(web::post().to(disputes::accept_dispute)), |                     .route(web::post().to(disputes::accept_dispute)), | ||||||
|             ) |             ) | ||||||
|  |             .service( | ||||||
|  |                 web::resource("/aggregate").route(web::get().to(disputes::get_disputes_aggregate)), | ||||||
|  |             ) | ||||||
|  |             .service( | ||||||
|  |                 web::resource("/profile/aggregate") | ||||||
|  |                     .route(web::get().to(disputes::get_disputes_aggregate_profile)), | ||||||
|  |             ) | ||||||
|             .service( |             .service( | ||||||
|                 web::resource("/evidence") |                 web::resource("/evidence") | ||||||
|                     .route(web::post().to(disputes::submit_dispute_evidence)) |                     .route(web::post().to(disputes::submit_dispute_evidence)) | ||||||
|  | |||||||
| @ -11,7 +11,7 @@ use super::app::AppState; | |||||||
| use crate::{ | use crate::{ | ||||||
|     core::disputes, |     core::disputes, | ||||||
|     services::{api, authentication as auth}, |     services::{api, authentication as auth}, | ||||||
|     types::api::disputes as dispute_types, |     types::api::{disputes as dispute_types, payments::TimeRange}, | ||||||
| }; | }; | ||||||
|  |  | ||||||
| /// Disputes - Retrieve Dispute | /// Disputes - Retrieve Dispute | ||||||
| @ -408,3 +408,68 @@ pub async fn delete_dispute_evidence( | |||||||
|     )) |     )) | ||||||
|     .await |     .await | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[instrument(skip_all, fields(flow = ?Flow::DisputesAggregate))] | ||||||
|  | pub async fn get_disputes_aggregate( | ||||||
|  |     state: web::Data<AppState>, | ||||||
|  |     req: HttpRequest, | ||||||
|  |     query_param: web::Query<TimeRange>, | ||||||
|  | ) -> HttpResponse { | ||||||
|  |     let flow = Flow::DisputesAggregate; | ||||||
|  |     let query_param = query_param.into_inner(); | ||||||
|  |  | ||||||
|  |     Box::pin(api::server_wrap( | ||||||
|  |         flow, | ||||||
|  |         state, | ||||||
|  |         &req, | ||||||
|  |         query_param, | ||||||
|  |         |state, auth, req, _| { | ||||||
|  |             disputes::get_aggregates_for_disputes(state, auth.merchant_account, None, req) | ||||||
|  |         }, | ||||||
|  |         auth::auth_type( | ||||||
|  |             &auth::HeaderAuth(auth::ApiKeyAuth), | ||||||
|  |             &auth::JWTAuth { | ||||||
|  |                 permission: Permission::DisputeRead, | ||||||
|  |                 minimum_entity_level: EntityType::Merchant, | ||||||
|  |             }, | ||||||
|  |             req.headers(), | ||||||
|  |         ), | ||||||
|  |         api_locking::LockAction::NotApplicable, | ||||||
|  |     )) | ||||||
|  |     .await | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[instrument(skip_all, fields(flow = ?Flow::DisputesAggregate))] | ||||||
|  | pub async fn get_disputes_aggregate_profile( | ||||||
|  |     state: web::Data<AppState>, | ||||||
|  |     req: HttpRequest, | ||||||
|  |     query_param: web::Query<TimeRange>, | ||||||
|  | ) -> HttpResponse { | ||||||
|  |     let flow = Flow::DisputesAggregate; | ||||||
|  |     let query_param = query_param.into_inner(); | ||||||
|  |  | ||||||
|  |     Box::pin(api::server_wrap( | ||||||
|  |         flow, | ||||||
|  |         state, | ||||||
|  |         &req, | ||||||
|  |         query_param, | ||||||
|  |         |state, auth, req, _| { | ||||||
|  |             disputes::get_aggregates_for_disputes( | ||||||
|  |                 state, | ||||||
|  |                 auth.merchant_account, | ||||||
|  |                 auth.profile_id.map(|profile_id| vec![profile_id]), | ||||||
|  |                 req, | ||||||
|  |             ) | ||||||
|  |         }, | ||||||
|  |         auth::auth_type( | ||||||
|  |             &auth::HeaderAuth(auth::ApiKeyAuth), | ||||||
|  |             &auth::JWTAuth { | ||||||
|  |                 permission: Permission::DisputeRead, | ||||||
|  |                 minimum_entity_level: EntityType::Profile, | ||||||
|  |             }, | ||||||
|  |             req.headers(), | ||||||
|  |         ), | ||||||
|  |         api_locking::LockAction::NotApplicable, | ||||||
|  |     )) | ||||||
|  |     .await | ||||||
|  | } | ||||||
|  | |||||||
| @ -175,6 +175,7 @@ impl From<Flow> for ApiIdentifier { | |||||||
|             | Flow::DisputesEvidenceSubmit |             | Flow::DisputesEvidenceSubmit | ||||||
|             | Flow::AttachDisputeEvidence |             | Flow::AttachDisputeEvidence | ||||||
|             | Flow::RetrieveDisputeEvidence |             | Flow::RetrieveDisputeEvidence | ||||||
|  |             | Flow::DisputesAggregate | ||||||
|             | Flow::DeleteDisputeEvidence => Self::Disputes, |             | Flow::DeleteDisputeEvidence => Self::Disputes, | ||||||
|  |  | ||||||
|             Flow::CardsInfo => Self::CardsInfo, |             Flow::CardsInfo => Self::CardsInfo, | ||||||
|  | |||||||
| @ -14,6 +14,13 @@ pub trait DisputeDbExt: Sized { | |||||||
|         merchant_id: &common_utils::id_type::MerchantId, |         merchant_id: &common_utils::id_type::MerchantId, | ||||||
|         dispute_list_constraints: api_models::disputes::DisputeListConstraints, |         dispute_list_constraints: api_models::disputes::DisputeListConstraints, | ||||||
|     ) -> CustomResult<Vec<Self>, errors::DatabaseError>; |     ) -> CustomResult<Vec<Self>, errors::DatabaseError>; | ||||||
|  |  | ||||||
|  |     async fn get_dispute_status_with_count( | ||||||
|  |         conn: &PgPooledConn, | ||||||
|  |         merchant_id: &common_utils::id_type::MerchantId, | ||||||
|  |         profile_id_list: Option<Vec<common_utils::id_type::ProfileId>>, | ||||||
|  |         time_range: &api_models::payments::TimeRange, | ||||||
|  |     ) -> CustomResult<Vec<(common_enums::enums::DisputeStatus, i64)>, errors::DatabaseError>; | ||||||
| } | } | ||||||
|  |  | ||||||
| #[async_trait::async_trait] | #[async_trait::async_trait] | ||||||
| @ -72,4 +79,38 @@ impl DisputeDbExt for Dispute { | |||||||
|         .change_context(errors::DatabaseError::NotFound) |         .change_context(errors::DatabaseError::NotFound) | ||||||
|         .attach_printable_lazy(|| "Error filtering records by predicate") |         .attach_printable_lazy(|| "Error filtering records by predicate") | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     async fn get_dispute_status_with_count( | ||||||
|  |         conn: &PgPooledConn, | ||||||
|  |         merchant_id: &common_utils::id_type::MerchantId, | ||||||
|  |         profile_id_list: Option<Vec<common_utils::id_type::ProfileId>>, | ||||||
|  |         time_range: &api_models::payments::TimeRange, | ||||||
|  |     ) -> CustomResult<Vec<(common_enums::DisputeStatus, i64)>, errors::DatabaseError> { | ||||||
|  |         let mut query = <Self as HasTable>::table() | ||||||
|  |             .group_by(dsl::dispute_status) | ||||||
|  |             .select((dsl::dispute_status, diesel::dsl::count_star())) | ||||||
|  |             .filter(dsl::merchant_id.eq(merchant_id.to_owned())) | ||||||
|  |             .into_boxed(); | ||||||
|  |  | ||||||
|  |         if let Some(profile_id) = profile_id_list { | ||||||
|  |             query = query.filter(dsl::profile_id.eq_any(profile_id)); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         query = query.filter(dsl::created_at.ge(time_range.start_time)); | ||||||
|  |  | ||||||
|  |         query = match time_range.end_time { | ||||||
|  |             Some(ending_at) => query.filter(dsl::created_at.le(ending_at)), | ||||||
|  |             None => query, | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |         logger::debug!(query = %diesel::debug_query::<diesel::pg::Pg,_>(&query).to_string()); | ||||||
|  |  | ||||||
|  |         db_metrics::track_database_call::<<Self as HasTable>::Table, _, _>( | ||||||
|  |             query.get_results_async::<(common_enums::DisputeStatus, i64)>(conn), | ||||||
|  |             db_metrics::DatabaseOperation::Count, | ||||||
|  |         ) | ||||||
|  |         .await | ||||||
|  |         .change_context(errors::DatabaseError::NotFound) | ||||||
|  |         .attach_printable_lazy(|| "Error filtering records by predicate") | ||||||
|  |     } | ||||||
| } | } | ||||||
|  | |||||||
| @ -290,6 +290,8 @@ pub enum Flow { | |||||||
|     AttachDisputeEvidence, |     AttachDisputeEvidence, | ||||||
|     /// Delete Dispute Evidence flow |     /// Delete Dispute Evidence flow | ||||||
|     DeleteDisputeEvidence, |     DeleteDisputeEvidence, | ||||||
|  |     /// Disputes aggregate flow | ||||||
|  |     DisputesAggregate, | ||||||
|     /// Retrieve Dispute Evidence flow |     /// Retrieve Dispute Evidence flow | ||||||
|     RetrieveDisputeEvidence, |     RetrieveDisputeEvidence, | ||||||
|     /// Invalidate cache flow |     /// Invalidate cache flow | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user
	 Riddhiagrawal001
					Riddhiagrawal001