diff --git a/crates/analytics/src/clickhouse.rs b/crates/analytics/src/clickhouse.rs index 546b57f99a..f56e875f72 100644 --- a/crates/analytics/src/clickhouse.rs +++ b/crates/analytics/src/clickhouse.rs @@ -139,6 +139,9 @@ impl AnalyticsDataSource for ClickhouseClient { | AnalyticsCollection::Dispute => { TableEngine::CollapsingMergeTree { sign: "sign_flag" } } + AnalyticsCollection::DisputeSessionized => { + TableEngine::CollapsingMergeTree { sign: "sign_flag" } + } AnalyticsCollection::SdkEvents | AnalyticsCollection::SdkEventsAnalytics | AnalyticsCollection::ApiEvents @@ -439,6 +442,7 @@ impl ToSql for AnalyticsCollection { Self::ConnectorEvents => Ok("connector_events_audit".to_string()), Self::OutgoingWebhookEvent => Ok("outgoing_webhook_events_audit".to_string()), Self::Dispute => Ok("dispute".to_string()), + Self::DisputeSessionized => Ok("sessionizer_dispute".to_string()), Self::ActivePaymentsAnalytics => Ok("active_payments".to_string()), } } diff --git a/crates/analytics/src/disputes/accumulators.rs b/crates/analytics/src/disputes/accumulators.rs index 1997d75d32..41bd3beebd 100644 --- a/crates/analytics/src/disputes/accumulators.rs +++ b/crates/analytics/src/disputes/accumulators.rs @@ -5,8 +5,8 @@ use super::metrics::DisputeMetricRow; #[derive(Debug, Default)] pub struct DisputeMetricsAccumulator { pub disputes_status_rate: RateAccumulator, - pub total_amount_disputed: SumAccumulator, - pub total_dispute_lost_amount: SumAccumulator, + pub disputed_amount: DisputedAmountAccumulator, + pub dispute_lost_amount: DisputedAmountAccumulator, } #[derive(Debug, Default)] pub struct RateAccumulator { @@ -17,7 +17,7 @@ pub struct RateAccumulator { } #[derive(Debug, Default)] #[repr(transparent)] -pub struct SumAccumulator { +pub struct DisputedAmountAccumulator { pub total: Option, } @@ -29,7 +29,7 @@ pub trait DisputeMetricAccumulator { fn collect(self) -> Self::MetricOutput; } -impl DisputeMetricAccumulator for SumAccumulator { +impl DisputeMetricAccumulator for DisputedAmountAccumulator { type MetricOutput = Option; #[inline] fn add_metrics_bucket(&mut self, metrics: &DisputeMetricRow) { @@ -92,8 +92,8 @@ impl DisputeMetricsAccumulator { disputes_challenged: challenge_rate, disputes_won: won_rate, disputes_lost: lost_rate, - total_amount_disputed: self.total_amount_disputed.collect(), - total_dispute_lost_amount: self.total_dispute_lost_amount.collect(), + disputed_amount: self.disputed_amount.collect(), + dispute_lost_amount: self.dispute_lost_amount.collect(), total_dispute, } } diff --git a/crates/analytics/src/disputes/core.rs b/crates/analytics/src/disputes/core.rs index b8b44a757d..85d1a62a1d 100644 --- a/crates/analytics/src/disputes/core.rs +++ b/crates/analytics/src/disputes/core.rs @@ -5,8 +5,8 @@ use api_models::analytics::{ DisputeDimensions, DisputeMetrics, DisputeMetricsBucketIdentifier, DisputeMetricsBucketResponse, }, - AnalyticsMetadata, DisputeFilterValue, DisputeFiltersResponse, GetDisputeFilterRequest, - GetDisputeMetricRequest, MetricsResponse, + DisputeFilterValue, DisputeFiltersResponse, DisputesAnalyticsMetadata, DisputesMetricsResponse, + GetDisputeFilterRequest, GetDisputeMetricRequest, }; use error_stack::ResultExt; use router_env::{ @@ -30,7 +30,7 @@ pub async fn get_metrics( pool: &AnalyticsProvider, auth: &AuthInfo, req: GetDisputeMetricRequest, -) -> AnalyticsResult> { +) -> AnalyticsResult> { let mut metrics_accumulator: HashMap< DisputeMetricsBucketIdentifier, DisputeMetricsAccumulator, @@ -87,14 +87,17 @@ pub async fn get_metrics( logger::debug!(bucket_id=?id, bucket_value=?value, "Bucket row for metric {metric}"); let metrics_builder = metrics_accumulator.entry(id).or_default(); match metric { - DisputeMetrics::DisputeStatusMetric => metrics_builder + DisputeMetrics::DisputeStatusMetric + | DisputeMetrics::SessionizedDisputeStatusMetric => metrics_builder .disputes_status_rate .add_metrics_bucket(&value), - DisputeMetrics::TotalAmountDisputed => metrics_builder - .total_amount_disputed - .add_metrics_bucket(&value), - DisputeMetrics::TotalDisputeLostAmount => metrics_builder - .total_dispute_lost_amount + DisputeMetrics::TotalAmountDisputed + | DisputeMetrics::SessionizedTotalAmountDisputed => { + metrics_builder.disputed_amount.add_metrics_bucket(&value) + } + DisputeMetrics::TotalDisputeLostAmount + | DisputeMetrics::SessionizedTotalDisputeLostAmount => metrics_builder + .dispute_lost_amount .add_metrics_bucket(&value), } } @@ -105,18 +108,31 @@ pub async fn get_metrics( metrics_accumulator ); } + let mut total_disputed_amount = 0; + let mut total_dispute_lost_amount = 0; let query_data: Vec = metrics_accumulator .into_iter() - .map(|(id, val)| DisputeMetricsBucketResponse { - values: val.collect(), - dimensions: id, + .map(|(id, val)| { + let collected_values = val.collect(); + if let Some(amount) = collected_values.disputed_amount { + total_disputed_amount += amount; + } + if let Some(amount) = collected_values.dispute_lost_amount { + total_dispute_lost_amount += amount; + } + + DisputeMetricsBucketResponse { + values: collected_values, + dimensions: id, + } }) .collect(); - Ok(MetricsResponse { + Ok(DisputesMetricsResponse { query_data, - meta_data: [AnalyticsMetadata { - current_time_range: req.time_range, + meta_data: [DisputesAnalyticsMetadata { + total_disputed_amount: Some(total_disputed_amount), + total_dispute_lost_amount: Some(total_dispute_lost_amount), }], }) } diff --git a/crates/analytics/src/disputes/metrics.rs b/crates/analytics/src/disputes/metrics.rs index dd1aa3c1bb..ad7ed81aae 100644 --- a/crates/analytics/src/disputes/metrics.rs +++ b/crates/analytics/src/disputes/metrics.rs @@ -1,4 +1,5 @@ mod dispute_status_metric; +mod sessionized_metrics; mod total_amount_disputed; mod total_dispute_lost_amount; @@ -92,6 +93,21 @@ where .load_metrics(dimensions, auth, filters, granularity, time_range, pool) .await } + Self::SessionizedTotalAmountDisputed => { + sessionized_metrics::TotalAmountDisputed::default() + .load_metrics(dimensions, auth, filters, granularity, time_range, pool) + .await + } + Self::SessionizedDisputeStatusMetric => { + sessionized_metrics::DisputeStatusMetric::default() + .load_metrics(dimensions, auth, filters, granularity, time_range, pool) + .await + } + Self::SessionizedTotalDisputeLostAmount => { + sessionized_metrics::TotalDisputeLostAmount::default() + .load_metrics(dimensions, auth, filters, granularity, time_range, pool) + .await + } } } } diff --git a/crates/analytics/src/disputes/metrics/sessionized_metrics/dispute_status_metric.rs b/crates/analytics/src/disputes/metrics/sessionized_metrics/dispute_status_metric.rs new file mode 100644 index 0000000000..c5c0b91a17 --- /dev/null +++ b/crates/analytics/src/disputes/metrics/sessionized_metrics/dispute_status_metric.rs @@ -0,0 +1,120 @@ +use std::collections::HashSet; + +use api_models::analytics::{ + disputes::{DisputeDimensions, DisputeFilters, DisputeMetricsBucketIdentifier}, + Granularity, TimeRange, +}; +use common_utils::errors::ReportSwitchExt; +use error_stack::ResultExt; +use time::PrimitiveDateTime; + +use super::DisputeMetricRow; +use crate::{ + enums::AuthInfo, + query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window}, + types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult}, +}; +#[derive(Default)] +pub(crate) struct DisputeStatusMetric {} + +#[async_trait::async_trait] +impl super::DisputeMetric for DisputeStatusMetric +where + T: AnalyticsDataSource + super::DisputeMetricAnalytics, + PrimitiveDateTime: ToSql, + AnalyticsCollection: ToSql, + Granularity: GroupByClause, + Aggregate<&'static str>: ToSql, + Window<&'static str>: ToSql, +{ + async fn load_metrics( + &self, + dimensions: &[DisputeDimensions], + auth: &AuthInfo, + filters: &DisputeFilters, + granularity: &Option, + time_range: &TimeRange, + pool: &T, + ) -> MetricsResult> + where + T: AnalyticsDataSource + super::DisputeMetricAnalytics, + { + let mut query_builder = QueryBuilder::new(AnalyticsCollection::DisputeSessionized); + + for dim in dimensions { + query_builder.add_select_column(dim).switch()?; + } + + query_builder.add_select_column("dispute_status").switch()?; + + query_builder + .add_select_column(Aggregate::Count { + field: None, + alias: Some("count"), + }) + .switch()?; + query_builder + .add_select_column(Aggregate::Min { + field: "created_at", + alias: Some("start_bucket"), + }) + .switch()?; + query_builder + .add_select_column(Aggregate::Max { + field: "created_at", + alias: Some("end_bucket"), + }) + .switch()?; + + filters.set_filter_clause(&mut query_builder).switch()?; + + auth.set_filter_clause(&mut query_builder).switch()?; + + time_range.set_filter_clause(&mut query_builder).switch()?; + + for dim in dimensions { + query_builder.add_group_by_clause(dim).switch()?; + } + + query_builder + .add_group_by_clause("dispute_status") + .switch()?; + + if let Some(granularity) = granularity.as_ref() { + granularity + .set_group_by_clause(&mut query_builder) + .switch()?; + } + + query_builder + .execute_query::(pool) + .await + .change_context(MetricsError::QueryBuildingError)? + .change_context(MetricsError::QueryExecutionFailure)? + .into_iter() + .map(|i| { + Ok(( + DisputeMetricsBucketIdentifier::new( + i.dispute_stage.as_ref().map(|i| i.0), + i.connector.clone(), + TimeRange { + start_time: match (granularity, i.start_bucket) { + (Some(g), Some(st)) => g.clip_to_start(st)?, + _ => time_range.start_time, + }, + end_time: granularity.as_ref().map_or_else( + || Ok(time_range.end_time), + |g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(), + )?, + }, + ), + i, + )) + }) + .collect::, + crate::query::PostProcessingError, + >>() + .change_context(MetricsError::PostProcessingFailure) + } +} diff --git a/crates/analytics/src/disputes/metrics/sessionized_metrics/mod.rs b/crates/analytics/src/disputes/metrics/sessionized_metrics/mod.rs new file mode 100644 index 0000000000..4d41194634 --- /dev/null +++ b/crates/analytics/src/disputes/metrics/sessionized_metrics/mod.rs @@ -0,0 +1,8 @@ +mod dispute_status_metric; +mod total_amount_disputed; +mod total_dispute_lost_amount; +pub(super) use dispute_status_metric::DisputeStatusMetric; +pub(super) use total_amount_disputed::TotalAmountDisputed; +pub(super) use total_dispute_lost_amount::TotalDisputeLostAmount; + +pub use super::{DisputeMetric, DisputeMetricAnalytics, DisputeMetricRow}; diff --git a/crates/analytics/src/disputes/metrics/sessionized_metrics/total_amount_disputed.rs b/crates/analytics/src/disputes/metrics/sessionized_metrics/total_amount_disputed.rs new file mode 100644 index 0000000000..0767bdaf85 --- /dev/null +++ b/crates/analytics/src/disputes/metrics/sessionized_metrics/total_amount_disputed.rs @@ -0,0 +1,118 @@ +use std::collections::HashSet; + +use api_models::analytics::{ + disputes::{DisputeDimensions, DisputeFilters, DisputeMetricsBucketIdentifier}, + Granularity, TimeRange, +}; +use common_utils::errors::ReportSwitchExt; +use error_stack::ResultExt; +use time::PrimitiveDateTime; + +use super::DisputeMetricRow; +use crate::{ + enums::AuthInfo, + query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window}, + types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult}, +}; +#[derive(Default)] +pub(crate) struct TotalAmountDisputed {} + +#[async_trait::async_trait] +impl super::DisputeMetric for TotalAmountDisputed +where + T: AnalyticsDataSource + super::DisputeMetricAnalytics, + PrimitiveDateTime: ToSql, + AnalyticsCollection: ToSql, + Granularity: GroupByClause, + Aggregate<&'static str>: ToSql, + Window<&'static str>: ToSql, +{ + async fn load_metrics( + &self, + dimensions: &[DisputeDimensions], + auth: &AuthInfo, + filters: &DisputeFilters, + granularity: &Option, + time_range: &TimeRange, + pool: &T, + ) -> MetricsResult> + where + T: AnalyticsDataSource + super::DisputeMetricAnalytics, + { + let mut query_builder: QueryBuilder = + QueryBuilder::new(AnalyticsCollection::DisputeSessionized); + + for dim in dimensions { + query_builder.add_select_column(dim).switch()?; + } + + query_builder + .add_select_column(Aggregate::Sum { + field: "dispute_amount", + alias: Some("total"), + }) + .switch()?; + query_builder + .add_select_column(Aggregate::Min { + field: "created_at", + alias: Some("start_bucket"), + }) + .switch()?; + query_builder + .add_select_column(Aggregate::Max { + field: "created_at", + alias: Some("end_bucket"), + }) + .switch()?; + + filters.set_filter_clause(&mut query_builder).switch()?; + + auth.set_filter_clause(&mut query_builder).switch()?; + + time_range + .set_filter_clause(&mut query_builder) + .attach_printable("Error filtering time range") + .switch()?; + + for dim in dimensions.iter() { + query_builder.add_group_by_clause(dim).switch()?; + } + + if let Some(granularity) = granularity.as_ref() { + granularity + .set_group_by_clause(&mut query_builder) + .switch()?; + } + query_builder + .add_filter_clause("dispute_status", "dispute_won") + .switch()?; + + query_builder + .execute_query::(pool) + .await + .change_context(MetricsError::QueryBuildingError)? + .change_context(MetricsError::QueryExecutionFailure)? + .into_iter() + .map(|i| { + Ok(( + DisputeMetricsBucketIdentifier::new( + i.dispute_stage.as_ref().map(|i| i.0), + i.connector.clone(), + TimeRange { + start_time: match (granularity, i.start_bucket) { + (Some(g), Some(st)) => g.clip_to_start(st)?, + _ => time_range.start_time, + }, + end_time: granularity.as_ref().map_or_else( + || Ok(time_range.end_time), + |g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(), + )?, + }, + ), + i, + )) + }) + .collect::, crate::query::PostProcessingError>>() + .change_context(MetricsError::PostProcessingFailure) + } +} diff --git a/crates/analytics/src/disputes/metrics/sessionized_metrics/total_dispute_lost_amount.rs b/crates/analytics/src/disputes/metrics/sessionized_metrics/total_dispute_lost_amount.rs new file mode 100644 index 0000000000..f4f4d86086 --- /dev/null +++ b/crates/analytics/src/disputes/metrics/sessionized_metrics/total_dispute_lost_amount.rs @@ -0,0 +1,119 @@ +use std::collections::HashSet; + +use api_models::analytics::{ + disputes::{DisputeDimensions, DisputeFilters, DisputeMetricsBucketIdentifier}, + Granularity, TimeRange, +}; +use common_utils::errors::ReportSwitchExt; +use error_stack::ResultExt; +use time::PrimitiveDateTime; + +use super::DisputeMetricRow; +use crate::{ + enums::AuthInfo, + query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window}, + types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult}, +}; +#[derive(Default)] +pub(crate) struct TotalDisputeLostAmount {} + +#[async_trait::async_trait] +impl super::DisputeMetric for TotalDisputeLostAmount +where + T: AnalyticsDataSource + super::DisputeMetricAnalytics, + PrimitiveDateTime: ToSql, + AnalyticsCollection: ToSql, + Granularity: GroupByClause, + Aggregate<&'static str>: ToSql, + Window<&'static str>: ToSql, +{ + async fn load_metrics( + &self, + dimensions: &[DisputeDimensions], + auth: &AuthInfo, + filters: &DisputeFilters, + granularity: &Option, + time_range: &TimeRange, + pool: &T, + ) -> MetricsResult> + where + T: AnalyticsDataSource + super::DisputeMetricAnalytics, + { + let mut query_builder: QueryBuilder = + QueryBuilder::new(AnalyticsCollection::DisputeSessionized); + + for dim in dimensions.iter() { + query_builder.add_select_column(dim).switch()?; + } + + query_builder + .add_select_column(Aggregate::Sum { + field: "dispute_amount", + alias: Some("total"), + }) + .switch()?; + query_builder + .add_select_column(Aggregate::Min { + field: "created_at", + alias: Some("start_bucket"), + }) + .switch()?; + query_builder + .add_select_column(Aggregate::Max { + field: "created_at", + alias: Some("end_bucket"), + }) + .switch()?; + + filters.set_filter_clause(&mut query_builder).switch()?; + + auth.set_filter_clause(&mut query_builder).switch()?; + + time_range + .set_filter_clause(&mut query_builder) + .attach_printable("Error filtering time range") + .switch()?; + + for dim in dimensions.iter() { + query_builder.add_group_by_clause(dim).switch()?; + } + + if let Some(granularity) = granularity.as_ref() { + granularity + .set_group_by_clause(&mut query_builder) + .switch()?; + } + + query_builder + .add_filter_clause("dispute_status", "dispute_lost") + .switch()?; + + query_builder + .execute_query::(pool) + .await + .change_context(MetricsError::QueryBuildingError)? + .change_context(MetricsError::QueryExecutionFailure)? + .into_iter() + .map(|i| { + Ok(( + DisputeMetricsBucketIdentifier::new( + i.dispute_stage.as_ref().map(|i| i.0), + i.connector.clone(), + TimeRange { + start_time: match (granularity, i.start_bucket) { + (Some(g), Some(st)) => g.clip_to_start(st)?, + _ => time_range.start_time, + }, + end_time: granularity.as_ref().map_or_else( + || Ok(time_range.end_time), + |g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(), + )?, + }, + ), + i, + )) + }) + .collect::, crate::query::PostProcessingError>>() + .change_context(MetricsError::PostProcessingFailure) + } +} diff --git a/crates/analytics/src/sqlx.rs b/crates/analytics/src/sqlx.rs index 16523d5d0a..1d91ce17c6 100644 --- a/crates/analytics/src/sqlx.rs +++ b/crates/analytics/src/sqlx.rs @@ -932,6 +932,8 @@ impl ToSql for AnalyticsCollection { Self::OutgoingWebhookEvent => Err(error_stack::report!(ParsingError::UnknownError) .attach_printable("OutgoingWebhookEvents table is not implemented for Sqlx"))?, Self::Dispute => Ok("dispute".to_string()), + Self::DisputeSessionized => Err(error_stack::report!(ParsingError::UnknownError) + .attach_printable("DisputeSessionized table is not implemented for Sqlx"))?, } } } diff --git a/crates/analytics/src/types.rs b/crates/analytics/src/types.rs index 6bdd11fcd7..8605633810 100644 --- a/crates/analytics/src/types.rs +++ b/crates/analytics/src/types.rs @@ -38,6 +38,7 @@ pub enum AnalyticsCollection { ConnectorEvents, OutgoingWebhookEvent, Dispute, + DisputeSessionized, ApiEventsAnalytics, ActivePaymentsAnalytics, } diff --git a/crates/api_models/src/analytics.rs b/crates/api_models/src/analytics.rs index 70c0e0e78d..ee90465215 100644 --- a/crates/api_models/src/analytics.rs +++ b/crates/api_models/src/analytics.rs @@ -346,6 +346,11 @@ pub struct SdkEventFilterValue { pub values: Vec, } +#[derive(Debug, serde::Serialize)] +pub struct DisputesAnalyticsMetadata { + pub total_disputed_amount: Option, + pub total_dispute_lost_amount: Option, +} #[derive(Debug, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct MetricsResponse { @@ -373,6 +378,12 @@ pub struct RefundsMetricsResponse { pub query_data: Vec, pub meta_data: [RefundsAnalyticsMetadata; 1], } +#[derive(Debug, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DisputesMetricsResponse { + pub query_data: Vec, + pub meta_data: [DisputesAnalyticsMetadata; 1], +} #[derive(Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct GetApiEventFiltersRequest { diff --git a/crates/api_models/src/analytics/disputes.rs b/crates/api_models/src/analytics/disputes.rs index edb85c129e..2509d83e13 100644 --- a/crates/api_models/src/analytics/disputes.rs +++ b/crates/api_models/src/analytics/disputes.rs @@ -24,6 +24,9 @@ pub enum DisputeMetrics { DisputeStatusMetric, TotalAmountDisputed, TotalDisputeLostAmount, + SessionizedDisputeStatusMetric, + SessionizedTotalAmountDisputed, + SessionizedTotalDisputeLostAmount, } #[derive( @@ -122,8 +125,8 @@ pub struct DisputeMetricsBucketValue { pub disputes_challenged: Option, pub disputes_won: Option, pub disputes_lost: Option, - pub total_amount_disputed: Option, - pub total_dispute_lost_amount: Option, + pub disputed_amount: Option, + pub dispute_lost_amount: Option, pub total_dispute: Option, } #[derive(Debug, serde::Serialize)] diff --git a/crates/api_models/src/events.rs b/crates/api_models/src/events.rs index dad624ef87..72a0d59251 100644 --- a/crates/api_models/src/events.rs +++ b/crates/api_models/src/events.rs @@ -173,6 +173,12 @@ impl ApiEventMetric for RefundsMetricsResponse { Some(ApiEventsType::Miscellaneous) } } + +impl ApiEventMetric for DisputesMetricsResponse { + fn get_api_event_type(&self) -> Option { + Some(ApiEventsType::Miscellaneous) + } +} #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] impl ApiEventMetric for PaymentMethodIntentConfirmInternal { fn get_api_event_type(&self) -> Option {