feat(analytics): add sessionized_metrics for disputes analytics (#6573)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Uzair Khan
2024-11-26 18:54:42 +05:30
committed by GitHub
parent 02479a12b1
commit 8fbb766308
13 changed files with 447 additions and 23 deletions

View File

@ -139,6 +139,9 @@ impl AnalyticsDataSource for ClickhouseClient {
| AnalyticsCollection::Dispute => {
TableEngine::CollapsingMergeTree { sign: "sign_flag" }
}
AnalyticsCollection::DisputeSessionized => {
TableEngine::CollapsingMergeTree { sign: "sign_flag" }
}
AnalyticsCollection::SdkEvents
| AnalyticsCollection::SdkEventsAnalytics
| AnalyticsCollection::ApiEvents
@ -439,6 +442,7 @@ impl ToSql<ClickhouseClient> for AnalyticsCollection {
Self::ConnectorEvents => Ok("connector_events_audit".to_string()),
Self::OutgoingWebhookEvent => Ok("outgoing_webhook_events_audit".to_string()),
Self::Dispute => Ok("dispute".to_string()),
Self::DisputeSessionized => Ok("sessionizer_dispute".to_string()),
Self::ActivePaymentsAnalytics => Ok("active_payments".to_string()),
}
}

View File

@ -5,8 +5,8 @@ use super::metrics::DisputeMetricRow;
#[derive(Debug, Default)]
pub struct DisputeMetricsAccumulator {
pub disputes_status_rate: RateAccumulator,
pub total_amount_disputed: SumAccumulator,
pub total_dispute_lost_amount: SumAccumulator,
pub disputed_amount: DisputedAmountAccumulator,
pub dispute_lost_amount: DisputedAmountAccumulator,
}
#[derive(Debug, Default)]
pub struct RateAccumulator {
@ -17,7 +17,7 @@ pub struct RateAccumulator {
}
#[derive(Debug, Default)]
#[repr(transparent)]
pub struct SumAccumulator {
pub struct DisputedAmountAccumulator {
pub total: Option<i64>,
}
@ -29,7 +29,7 @@ pub trait DisputeMetricAccumulator {
fn collect(self) -> Self::MetricOutput;
}
impl DisputeMetricAccumulator for SumAccumulator {
impl DisputeMetricAccumulator for DisputedAmountAccumulator {
type MetricOutput = Option<u64>;
#[inline]
fn add_metrics_bucket(&mut self, metrics: &DisputeMetricRow) {
@ -92,8 +92,8 @@ impl DisputeMetricsAccumulator {
disputes_challenged: challenge_rate,
disputes_won: won_rate,
disputes_lost: lost_rate,
total_amount_disputed: self.total_amount_disputed.collect(),
total_dispute_lost_amount: self.total_dispute_lost_amount.collect(),
disputed_amount: self.disputed_amount.collect(),
dispute_lost_amount: self.dispute_lost_amount.collect(),
total_dispute,
}
}

View File

@ -5,8 +5,8 @@ use api_models::analytics::{
DisputeDimensions, DisputeMetrics, DisputeMetricsBucketIdentifier,
DisputeMetricsBucketResponse,
},
AnalyticsMetadata, DisputeFilterValue, DisputeFiltersResponse, GetDisputeFilterRequest,
GetDisputeMetricRequest, MetricsResponse,
DisputeFilterValue, DisputeFiltersResponse, DisputesAnalyticsMetadata, DisputesMetricsResponse,
GetDisputeFilterRequest, GetDisputeMetricRequest,
};
use error_stack::ResultExt;
use router_env::{
@ -30,7 +30,7 @@ pub async fn get_metrics(
pool: &AnalyticsProvider,
auth: &AuthInfo,
req: GetDisputeMetricRequest,
) -> AnalyticsResult<MetricsResponse<DisputeMetricsBucketResponse>> {
) -> AnalyticsResult<DisputesMetricsResponse<DisputeMetricsBucketResponse>> {
let mut metrics_accumulator: HashMap<
DisputeMetricsBucketIdentifier,
DisputeMetricsAccumulator,
@ -87,14 +87,17 @@ pub async fn get_metrics(
logger::debug!(bucket_id=?id, bucket_value=?value, "Bucket row for metric {metric}");
let metrics_builder = metrics_accumulator.entry(id).or_default();
match metric {
DisputeMetrics::DisputeStatusMetric => metrics_builder
DisputeMetrics::DisputeStatusMetric
| DisputeMetrics::SessionizedDisputeStatusMetric => metrics_builder
.disputes_status_rate
.add_metrics_bucket(&value),
DisputeMetrics::TotalAmountDisputed => metrics_builder
.total_amount_disputed
.add_metrics_bucket(&value),
DisputeMetrics::TotalDisputeLostAmount => metrics_builder
.total_dispute_lost_amount
DisputeMetrics::TotalAmountDisputed
| DisputeMetrics::SessionizedTotalAmountDisputed => {
metrics_builder.disputed_amount.add_metrics_bucket(&value)
}
DisputeMetrics::TotalDisputeLostAmount
| DisputeMetrics::SessionizedTotalDisputeLostAmount => metrics_builder
.dispute_lost_amount
.add_metrics_bucket(&value),
}
}
@ -105,18 +108,31 @@ pub async fn get_metrics(
metrics_accumulator
);
}
let mut total_disputed_amount = 0;
let mut total_dispute_lost_amount = 0;
let query_data: Vec<DisputeMetricsBucketResponse> = metrics_accumulator
.into_iter()
.map(|(id, val)| DisputeMetricsBucketResponse {
values: val.collect(),
.map(|(id, val)| {
let collected_values = val.collect();
if let Some(amount) = collected_values.disputed_amount {
total_disputed_amount += amount;
}
if let Some(amount) = collected_values.dispute_lost_amount {
total_dispute_lost_amount += amount;
}
DisputeMetricsBucketResponse {
values: collected_values,
dimensions: id,
}
})
.collect();
Ok(MetricsResponse {
Ok(DisputesMetricsResponse {
query_data,
meta_data: [AnalyticsMetadata {
current_time_range: req.time_range,
meta_data: [DisputesAnalyticsMetadata {
total_disputed_amount: Some(total_disputed_amount),
total_dispute_lost_amount: Some(total_dispute_lost_amount),
}],
})
}

View File

@ -1,4 +1,5 @@
mod dispute_status_metric;
mod sessionized_metrics;
mod total_amount_disputed;
mod total_dispute_lost_amount;
@ -92,6 +93,21 @@ where
.load_metrics(dimensions, auth, filters, granularity, time_range, pool)
.await
}
Self::SessionizedTotalAmountDisputed => {
sessionized_metrics::TotalAmountDisputed::default()
.load_metrics(dimensions, auth, filters, granularity, time_range, pool)
.await
}
Self::SessionizedDisputeStatusMetric => {
sessionized_metrics::DisputeStatusMetric::default()
.load_metrics(dimensions, auth, filters, granularity, time_range, pool)
.await
}
Self::SessionizedTotalDisputeLostAmount => {
sessionized_metrics::TotalDisputeLostAmount::default()
.load_metrics(dimensions, auth, filters, granularity, time_range, pool)
.await
}
}
}
}

View File

@ -0,0 +1,120 @@
use std::collections::HashSet;
use api_models::analytics::{
disputes::{DisputeDimensions, DisputeFilters, DisputeMetricsBucketIdentifier},
Granularity, TimeRange,
};
use common_utils::errors::ReportSwitchExt;
use error_stack::ResultExt;
use time::PrimitiveDateTime;
use super::DisputeMetricRow;
use crate::{
enums::AuthInfo,
query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult},
};
#[derive(Default)]
pub(crate) struct DisputeStatusMetric {}
#[async_trait::async_trait]
impl<T> super::DisputeMetric<T> for DisputeStatusMetric
where
T: AnalyticsDataSource + super::DisputeMetricAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_metrics(
&self,
dimensions: &[DisputeDimensions],
auth: &AuthInfo,
filters: &DisputeFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<HashSet<(DisputeMetricsBucketIdentifier, DisputeMetricRow)>>
where
T: AnalyticsDataSource + super::DisputeMetricAnalytics,
{
let mut query_builder = QueryBuilder::new(AnalyticsCollection::DisputeSessionized);
for dim in dimensions {
query_builder.add_select_column(dim).switch()?;
}
query_builder.add_select_column("dispute_status").switch()?;
query_builder
.add_select_column(Aggregate::Count {
field: None,
alias: Some("count"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Min {
field: "created_at",
alias: Some("start_bucket"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Max {
field: "created_at",
alias: Some("end_bucket"),
})
.switch()?;
filters.set_filter_clause(&mut query_builder).switch()?;
auth.set_filter_clause(&mut query_builder).switch()?;
time_range.set_filter_clause(&mut query_builder).switch()?;
for dim in dimensions {
query_builder.add_group_by_clause(dim).switch()?;
}
query_builder
.add_group_by_clause("dispute_status")
.switch()?;
if let Some(granularity) = granularity.as_ref() {
granularity
.set_group_by_clause(&mut query_builder)
.switch()?;
}
query_builder
.execute_query::<DisputeMetricRow, _>(pool)
.await
.change_context(MetricsError::QueryBuildingError)?
.change_context(MetricsError::QueryExecutionFailure)?
.into_iter()
.map(|i| {
Ok((
DisputeMetricsBucketIdentifier::new(
i.dispute_stage.as_ref().map(|i| i.0),
i.connector.clone(),
TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,
_ => time_range.start_time,
},
end_time: granularity.as_ref().map_or_else(
|| Ok(time_range.end_time),
|g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(),
)?,
},
),
i,
))
})
.collect::<error_stack::Result<
HashSet<(DisputeMetricsBucketIdentifier, DisputeMetricRow)>,
crate::query::PostProcessingError,
>>()
.change_context(MetricsError::PostProcessingFailure)
}
}

View File

@ -0,0 +1,8 @@
mod dispute_status_metric;
mod total_amount_disputed;
mod total_dispute_lost_amount;
pub(super) use dispute_status_metric::DisputeStatusMetric;
pub(super) use total_amount_disputed::TotalAmountDisputed;
pub(super) use total_dispute_lost_amount::TotalDisputeLostAmount;
pub use super::{DisputeMetric, DisputeMetricAnalytics, DisputeMetricRow};

View File

@ -0,0 +1,118 @@
use std::collections::HashSet;
use api_models::analytics::{
disputes::{DisputeDimensions, DisputeFilters, DisputeMetricsBucketIdentifier},
Granularity, TimeRange,
};
use common_utils::errors::ReportSwitchExt;
use error_stack::ResultExt;
use time::PrimitiveDateTime;
use super::DisputeMetricRow;
use crate::{
enums::AuthInfo,
query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult},
};
#[derive(Default)]
pub(crate) struct TotalAmountDisputed {}
#[async_trait::async_trait]
impl<T> super::DisputeMetric<T> for TotalAmountDisputed
where
T: AnalyticsDataSource + super::DisputeMetricAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_metrics(
&self,
dimensions: &[DisputeDimensions],
auth: &AuthInfo,
filters: &DisputeFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<HashSet<(DisputeMetricsBucketIdentifier, DisputeMetricRow)>>
where
T: AnalyticsDataSource + super::DisputeMetricAnalytics,
{
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::DisputeSessionized);
for dim in dimensions {
query_builder.add_select_column(dim).switch()?;
}
query_builder
.add_select_column(Aggregate::Sum {
field: "dispute_amount",
alias: Some("total"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Min {
field: "created_at",
alias: Some("start_bucket"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Max {
field: "created_at",
alias: Some("end_bucket"),
})
.switch()?;
filters.set_filter_clause(&mut query_builder).switch()?;
auth.set_filter_clause(&mut query_builder).switch()?;
time_range
.set_filter_clause(&mut query_builder)
.attach_printable("Error filtering time range")
.switch()?;
for dim in dimensions.iter() {
query_builder.add_group_by_clause(dim).switch()?;
}
if let Some(granularity) = granularity.as_ref() {
granularity
.set_group_by_clause(&mut query_builder)
.switch()?;
}
query_builder
.add_filter_clause("dispute_status", "dispute_won")
.switch()?;
query_builder
.execute_query::<DisputeMetricRow, _>(pool)
.await
.change_context(MetricsError::QueryBuildingError)?
.change_context(MetricsError::QueryExecutionFailure)?
.into_iter()
.map(|i| {
Ok((
DisputeMetricsBucketIdentifier::new(
i.dispute_stage.as_ref().map(|i| i.0),
i.connector.clone(),
TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,
_ => time_range.start_time,
},
end_time: granularity.as_ref().map_or_else(
|| Ok(time_range.end_time),
|g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(),
)?,
},
),
i,
))
})
.collect::<error_stack::Result<HashSet<_>, crate::query::PostProcessingError>>()
.change_context(MetricsError::PostProcessingFailure)
}
}

View File

@ -0,0 +1,119 @@
use std::collections::HashSet;
use api_models::analytics::{
disputes::{DisputeDimensions, DisputeFilters, DisputeMetricsBucketIdentifier},
Granularity, TimeRange,
};
use common_utils::errors::ReportSwitchExt;
use error_stack::ResultExt;
use time::PrimitiveDateTime;
use super::DisputeMetricRow;
use crate::{
enums::AuthInfo,
query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult},
};
#[derive(Default)]
pub(crate) struct TotalDisputeLostAmount {}
#[async_trait::async_trait]
impl<T> super::DisputeMetric<T> for TotalDisputeLostAmount
where
T: AnalyticsDataSource + super::DisputeMetricAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_metrics(
&self,
dimensions: &[DisputeDimensions],
auth: &AuthInfo,
filters: &DisputeFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<HashSet<(DisputeMetricsBucketIdentifier, DisputeMetricRow)>>
where
T: AnalyticsDataSource + super::DisputeMetricAnalytics,
{
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::DisputeSessionized);
for dim in dimensions.iter() {
query_builder.add_select_column(dim).switch()?;
}
query_builder
.add_select_column(Aggregate::Sum {
field: "dispute_amount",
alias: Some("total"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Min {
field: "created_at",
alias: Some("start_bucket"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Max {
field: "created_at",
alias: Some("end_bucket"),
})
.switch()?;
filters.set_filter_clause(&mut query_builder).switch()?;
auth.set_filter_clause(&mut query_builder).switch()?;
time_range
.set_filter_clause(&mut query_builder)
.attach_printable("Error filtering time range")
.switch()?;
for dim in dimensions.iter() {
query_builder.add_group_by_clause(dim).switch()?;
}
if let Some(granularity) = granularity.as_ref() {
granularity
.set_group_by_clause(&mut query_builder)
.switch()?;
}
query_builder
.add_filter_clause("dispute_status", "dispute_lost")
.switch()?;
query_builder
.execute_query::<DisputeMetricRow, _>(pool)
.await
.change_context(MetricsError::QueryBuildingError)?
.change_context(MetricsError::QueryExecutionFailure)?
.into_iter()
.map(|i| {
Ok((
DisputeMetricsBucketIdentifier::new(
i.dispute_stage.as_ref().map(|i| i.0),
i.connector.clone(),
TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,
_ => time_range.start_time,
},
end_time: granularity.as_ref().map_or_else(
|| Ok(time_range.end_time),
|g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(),
)?,
},
),
i,
))
})
.collect::<error_stack::Result<HashSet<_>, crate::query::PostProcessingError>>()
.change_context(MetricsError::PostProcessingFailure)
}
}

View File

@ -932,6 +932,8 @@ impl ToSql<SqlxClient> for AnalyticsCollection {
Self::OutgoingWebhookEvent => Err(error_stack::report!(ParsingError::UnknownError)
.attach_printable("OutgoingWebhookEvents table is not implemented for Sqlx"))?,
Self::Dispute => Ok("dispute".to_string()),
Self::DisputeSessionized => Err(error_stack::report!(ParsingError::UnknownError)
.attach_printable("DisputeSessionized table is not implemented for Sqlx"))?,
}
}
}

View File

@ -38,6 +38,7 @@ pub enum AnalyticsCollection {
ConnectorEvents,
OutgoingWebhookEvent,
Dispute,
DisputeSessionized,
ApiEventsAnalytics,
ActivePaymentsAnalytics,
}

View File

@ -346,6 +346,11 @@ pub struct SdkEventFilterValue {
pub values: Vec<String>,
}
#[derive(Debug, serde::Serialize)]
pub struct DisputesAnalyticsMetadata {
pub total_disputed_amount: Option<u64>,
pub total_dispute_lost_amount: Option<u64>,
}
#[derive(Debug, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct MetricsResponse<T> {
@ -373,6 +378,12 @@ pub struct RefundsMetricsResponse<T> {
pub query_data: Vec<T>,
pub meta_data: [RefundsAnalyticsMetadata; 1],
}
#[derive(Debug, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DisputesMetricsResponse<T> {
pub query_data: Vec<T>,
pub meta_data: [DisputesAnalyticsMetadata; 1],
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct GetApiEventFiltersRequest {

View File

@ -24,6 +24,9 @@ pub enum DisputeMetrics {
DisputeStatusMetric,
TotalAmountDisputed,
TotalDisputeLostAmount,
SessionizedDisputeStatusMetric,
SessionizedTotalAmountDisputed,
SessionizedTotalDisputeLostAmount,
}
#[derive(
@ -122,8 +125,8 @@ pub struct DisputeMetricsBucketValue {
pub disputes_challenged: Option<u64>,
pub disputes_won: Option<u64>,
pub disputes_lost: Option<u64>,
pub total_amount_disputed: Option<u64>,
pub total_dispute_lost_amount: Option<u64>,
pub disputed_amount: Option<u64>,
pub dispute_lost_amount: Option<u64>,
pub total_dispute: Option<u64>,
}
#[derive(Debug, serde::Serialize)]

View File

@ -173,6 +173,12 @@ impl<T> ApiEventMetric for RefundsMetricsResponse<T> {
Some(ApiEventsType::Miscellaneous)
}
}
impl<T> ApiEventMetric for DisputesMetricsResponse<T> {
fn get_api_event_type(&self) -> Option<ApiEventsType> {
Some(ApiEventsType::Miscellaneous)
}
}
#[cfg(all(feature = "v2", feature = "payment_methods_v2"))]
impl ApiEventMetric for PaymentMethodIntentConfirmInternal {
fn get_api_event_type(&self) -> Option<ApiEventsType> {