feat(analytics): adding metric api for dispute analytics (#3810)

Co-authored-by: Sampras Lopes <lsampras@pm.me>
This commit is contained in:
harsh-sharma-juspay
2024-02-28 17:39:31 +05:30
committed by GitHub
parent f3931cf484
commit de6b16bed9
20 changed files with 1032 additions and 30 deletions

View File

@ -23,7 +23,7 @@ use crate::{
metrics::{latency::LatencyAvg, ApiEventMetricRow}, metrics::{latency::LatencyAvg, ApiEventMetricRow},
}, },
connector_events::events::ConnectorEventsResult, connector_events::events::ConnectorEventsResult,
disputes::filters::DisputeFilterRow, disputes::{filters::DisputeFilterRow, metrics::DisputeMetricRow},
outgoing_webhook_event::events::OutgoingWebhookLogsResult, outgoing_webhook_event::events::OutgoingWebhookLogsResult,
sdk_events::events::SdkEventsResult, sdk_events::events::SdkEventsResult,
types::TableEngine, types::TableEngine,
@ -170,6 +170,7 @@ impl super::outgoing_webhook_event::events::OutgoingWebhookLogsFilterAnalytics
{ {
} }
impl super::disputes::filters::DisputeFilterAnalytics for ClickhouseClient {} impl super::disputes::filters::DisputeFilterAnalytics for ClickhouseClient {}
impl super::disputes::metrics::DisputeMetricAnalytics for ClickhouseClient {}
#[derive(Debug, serde::Serialize)] #[derive(Debug, serde::Serialize)]
struct CkhQuery { struct CkhQuery {
@ -278,6 +279,17 @@ impl TryInto<RefundFilterRow> for serde_json::Value {
)) ))
} }
} }
impl TryInto<DisputeMetricRow> for serde_json::Value {
type Error = Report<ParsingError>;
fn try_into(self) -> Result<DisputeMetricRow, Self::Error> {
serde_json::from_value(self)
.into_report()
.change_context(ParsingError::StructParseFailure(
"Failed to parse DisputeMetricRow in clickhouse results",
))
}
}
impl TryInto<DisputeFilterRow> for serde_json::Value { impl TryInto<DisputeFilterRow> for serde_json::Value {
type Error = Report<ParsingError>; type Error = Report<ParsingError>;

View File

@ -1,5 +1,9 @@
pub mod accumulators;
mod core; mod core;
pub mod filters; pub mod filters;
pub mod metrics;
pub mod types;
pub use accumulators::{DisputeMetricAccumulator, DisputeMetricsAccumulator};
pub use self::core::get_filters; pub trait DisputeAnalytics: metrics::DisputeMetricAnalytics {}
pub use self::core::{get_filters, get_metrics};

View File

@ -0,0 +1,100 @@
use api_models::analytics::disputes::DisputeMetricsBucketValue;
use diesel_models::enums as storage_enums;
use super::metrics::DisputeMetricRow;
#[derive(Debug, Default)]
pub struct DisputeMetricsAccumulator {
pub disputes_status_rate: RateAccumulator,
pub total_amount_disputed: SumAccumulator,
pub total_dispute_lost_amount: SumAccumulator,
}
#[derive(Debug, Default)]
pub struct RateAccumulator {
pub won_count: i64,
pub challenged_count: i64,
pub lost_count: i64,
pub total: i64,
}
#[derive(Debug, Default)]
#[repr(transparent)]
pub struct SumAccumulator {
pub total: Option<i64>,
}
pub trait DisputeMetricAccumulator {
type MetricOutput;
fn add_metrics_bucket(&mut self, metrics: &DisputeMetricRow);
fn collect(self) -> Self::MetricOutput;
}
impl DisputeMetricAccumulator for SumAccumulator {
type MetricOutput = Option<u64>;
#[inline]
fn add_metrics_bucket(&mut self, metrics: &DisputeMetricRow) {
self.total = match (
self.total,
metrics
.total
.as_ref()
.and_then(bigdecimal::ToPrimitive::to_i64),
) {
(None, None) => None,
(None, i @ Some(_)) | (i @ Some(_), None) => i,
(Some(a), Some(b)) => Some(a + b),
}
}
#[inline]
fn collect(self) -> Self::MetricOutput {
self.total.and_then(|i| u64::try_from(i).ok())
}
}
impl DisputeMetricAccumulator for RateAccumulator {
type MetricOutput = Option<(Option<u64>, Option<u64>, Option<u64>, Option<u64>)>;
fn add_metrics_bucket(&mut self, metrics: &DisputeMetricRow) {
if let Some(ref dispute_status) = metrics.dispute_status {
if dispute_status.as_ref() == &storage_enums::DisputeStatus::DisputeChallenged {
self.challenged_count += metrics.count.unwrap_or_default();
}
if dispute_status.as_ref() == &storage_enums::DisputeStatus::DisputeWon {
self.won_count += metrics.count.unwrap_or_default();
}
if dispute_status.as_ref() == &storage_enums::DisputeStatus::DisputeLost {
self.lost_count += metrics.count.unwrap_or_default();
}
};
self.total += metrics.count.unwrap_or_default();
}
fn collect(self) -> Self::MetricOutput {
if self.total <= 0 {
Some((None, None, None, None))
} else {
Some((
u64::try_from(self.challenged_count).ok(),
u64::try_from(self.won_count).ok(),
u64::try_from(self.lost_count).ok(),
u64::try_from(self.total).ok(),
))
}
}
}
impl DisputeMetricsAccumulator {
pub fn collect(self) -> DisputeMetricsBucketValue {
let (challenge_rate, won_rate, lost_rate, total_dispute) =
self.disputes_status_rate.collect().unwrap_or_default();
DisputeMetricsBucketValue {
disputes_challenged: challenge_rate,
disputes_won: won_rate,
disputes_lost: lost_rate,
total_amount_disputed: self.total_amount_disputed.collect(),
total_dispute_lost_amount: self.total_dispute_lost_amount.collect(),
total_dispute,
}
}
}

View File

@ -1,14 +1,124 @@
use api_models::analytics::{ use std::collections::HashMap;
disputes::DisputeDimensions, DisputeFilterValue, DisputeFiltersResponse,
GetDisputeFilterRequest,
};
use error_stack::ResultExt;
use super::filters::{get_dispute_filter_for_dimension, DisputeFilterRow}; use api_models::analytics::{
use crate::{ disputes::{
errors::{AnalyticsError, AnalyticsResult}, DisputeDimensions, DisputeMetrics, DisputeMetricsBucketIdentifier,
AnalyticsProvider, DisputeMetricsBucketResponse,
},
AnalyticsMetadata, DisputeFilterValue, DisputeFiltersResponse, GetDisputeFilterRequest,
GetDisputeMetricRequest, MetricsResponse,
}; };
use error_stack::{IntoReport, ResultExt};
use router_env::{
logger,
tracing::{self, Instrument},
};
use super::{
filters::{get_dispute_filter_for_dimension, DisputeFilterRow},
DisputeMetricsAccumulator,
};
use crate::{
disputes::DisputeMetricAccumulator,
errors::{AnalyticsError, AnalyticsResult},
metrics, AnalyticsProvider,
};
pub async fn get_metrics(
pool: &AnalyticsProvider,
merchant_id: &String,
req: GetDisputeMetricRequest,
) -> AnalyticsResult<MetricsResponse<DisputeMetricsBucketResponse>> {
let mut metrics_accumulator: HashMap<
DisputeMetricsBucketIdentifier,
DisputeMetricsAccumulator,
> = HashMap::new();
let mut set = tokio::task::JoinSet::new();
for metric_type in req.metrics.iter().cloned() {
let req = req.clone();
let pool = pool.clone();
let task_span = tracing::debug_span!(
"analytics_dispute_query",
refund_metric = metric_type.as_ref()
);
// Currently JoinSet works with only static lifetime references even if the task pool does not outlive the given reference
// We can optimize away this clone once that is fixed
let merchant_id_scoped = merchant_id.to_owned();
set.spawn(
async move {
let data = pool
.get_dispute_metrics(
&metric_type,
&req.group_by_names.clone(),
&merchant_id_scoped,
&req.filters,
&req.time_series.map(|t| t.granularity),
&req.time_range,
)
.await
.change_context(AnalyticsError::UnknownError);
(metric_type, data)
}
.instrument(task_span),
);
}
while let Some((metric, data)) = set
.join_next()
.await
.transpose()
.into_report()
.change_context(AnalyticsError::UnknownError)?
{
let data = data?;
let attributes = &[
metrics::request::add_attributes("metric_type", metric.to_string()),
metrics::request::add_attributes("source", pool.to_string()),
];
let value = u64::try_from(data.len());
if let Ok(val) = value {
metrics::BUCKETS_FETCHED.record(&metrics::CONTEXT, val, attributes);
logger::debug!("Attributes: {:?}, Buckets fetched: {}", attributes, val);
}
for (id, value) in data {
logger::debug!(bucket_id=?id, bucket_value=?value, "Bucket row for metric {metric}");
let metrics_builder = metrics_accumulator.entry(id).or_default();
match metric {
DisputeMetrics::DisputeStatusMetric => metrics_builder
.disputes_status_rate
.add_metrics_bucket(&value),
DisputeMetrics::TotalAmountDisputed => metrics_builder
.total_amount_disputed
.add_metrics_bucket(&value),
DisputeMetrics::TotalDisputeLostAmount => metrics_builder
.total_dispute_lost_amount
.add_metrics_bucket(&value),
}
}
logger::debug!(
"Analytics Accumulated Results: metric: {}, results: {:#?}",
metric,
metrics_accumulator
);
}
let query_data: Vec<DisputeMetricsBucketResponse> = metrics_accumulator
.into_iter()
.map(|(id, val)| DisputeMetricsBucketResponse {
values: val.collect(),
dimensions: id,
})
.collect();
Ok(MetricsResponse {
query_data,
meta_data: [AnalyticsMetadata {
current_time_range: req.time_range,
}],
})
}
pub async fn get_filters( pub async fn get_filters(
pool: &AnalyticsProvider, pool: &AnalyticsProvider,
@ -76,9 +186,7 @@ pub async fn get_filters(
.change_context(AnalyticsError::UnknownError)? .change_context(AnalyticsError::UnknownError)?
.into_iter() .into_iter()
.filter_map(|fil: DisputeFilterRow| match dim { .filter_map(|fil: DisputeFilterRow| match dim {
DisputeDimensions::DisputeStatus => fil.dispute_status,
DisputeDimensions::DisputeStage => fil.dispute_stage, DisputeDimensions::DisputeStage => fil.dispute_stage,
DisputeDimensions::ConnectorStatus => fil.connector_status,
DisputeDimensions::Connector => fil.connector, DisputeDimensions::Connector => fil.connector,
}) })
.collect::<Vec<String>>(); .collect::<Vec<String>>();

View File

@ -0,0 +1,119 @@
mod dispute_status_metric;
mod total_amount_disputed;
mod total_dispute_lost_amount;
use api_models::{
analytics::{
disputes::{
DisputeDimensions, DisputeFilters, DisputeMetrics, DisputeMetricsBucketIdentifier,
},
Granularity,
},
payments::TimeRange,
};
use diesel_models::enums as storage_enums;
use time::PrimitiveDateTime;
use self::{
dispute_status_metric::DisputeStatusMetric, total_amount_disputed::TotalAmountDisputed,
total_dispute_lost_amount::TotalDisputeLostAmount,
};
use crate::{
query::{Aggregate, GroupByClause, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, DBEnumWrapper, LoadRow, MetricsResult},
};
#[derive(Debug, Eq, PartialEq, serde::Deserialize)]
pub struct DisputeMetricRow {
pub dispute_stage: Option<DBEnumWrapper<storage_enums::DisputeStage>>,
pub dispute_status: Option<DBEnumWrapper<storage_enums::DisputeStatus>>,
pub connector: Option<String>,
pub total: Option<bigdecimal::BigDecimal>,
pub count: Option<i64>,
#[serde(with = "common_utils::custom_serde::iso8601::option")]
pub start_bucket: Option<PrimitiveDateTime>,
#[serde(with = "common_utils::custom_serde::iso8601::option")]
pub end_bucket: Option<PrimitiveDateTime>,
}
pub trait DisputeMetricAnalytics: LoadRow<DisputeMetricRow> {}
#[async_trait::async_trait]
pub trait DisputeMetric<T>
where
T: AnalyticsDataSource + DisputeMetricAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_metrics(
&self,
dimensions: &[DisputeDimensions],
merchant_id: &str,
filters: &DisputeFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(DisputeMetricsBucketIdentifier, DisputeMetricRow)>>;
}
#[async_trait::async_trait]
impl<T> DisputeMetric<T> for DisputeMetrics
where
T: AnalyticsDataSource + DisputeMetricAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_metrics(
&self,
dimensions: &[DisputeDimensions],
merchant_id: &str,
filters: &DisputeFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(DisputeMetricsBucketIdentifier, DisputeMetricRow)>> {
match self {
Self::TotalAmountDisputed => {
TotalAmountDisputed::default()
.load_metrics(
dimensions,
merchant_id,
filters,
granularity,
time_range,
pool,
)
.await
}
Self::DisputeStatusMetric => {
DisputeStatusMetric::default()
.load_metrics(
dimensions,
merchant_id,
filters,
granularity,
time_range,
pool,
)
.await
}
Self::TotalDisputeLostAmount => {
TotalDisputeLostAmount::default()
.load_metrics(
dimensions,
merchant_id,
filters,
granularity,
time_range,
pool,
)
.await
}
}
}
}

View File

@ -0,0 +1,119 @@
use api_models::analytics::{
disputes::{DisputeDimensions, DisputeFilters, DisputeMetricsBucketIdentifier},
Granularity, TimeRange,
};
use common_utils::errors::ReportSwitchExt;
use error_stack::ResultExt;
use time::PrimitiveDateTime;
use super::DisputeMetricRow;
use crate::{
query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult},
};
#[derive(Default)]
pub(super) struct DisputeStatusMetric {}
#[async_trait::async_trait]
impl<T> super::DisputeMetric<T> for DisputeStatusMetric
where
T: AnalyticsDataSource + super::DisputeMetricAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_metrics(
&self,
dimensions: &[DisputeDimensions],
merchant_id: &str,
filters: &DisputeFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(DisputeMetricsBucketIdentifier, DisputeMetricRow)>>
where
T: AnalyticsDataSource + super::DisputeMetricAnalytics,
{
let mut query_builder = QueryBuilder::new(AnalyticsCollection::Dispute);
for dim in dimensions {
query_builder.add_select_column(dim).switch()?;
}
query_builder.add_select_column("dispute_status").switch()?;
query_builder
.add_select_column(Aggregate::Count {
field: None,
alias: Some("count"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Min {
field: "created_at",
alias: Some("start_bucket"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Max {
field: "created_at",
alias: Some("end_bucket"),
})
.switch()?;
filters.set_filter_clause(&mut query_builder).switch()?;
query_builder
.add_filter_clause("merchant_id", merchant_id)
.switch()?;
time_range.set_filter_clause(&mut query_builder).switch()?;
for dim in dimensions {
query_builder.add_group_by_clause(dim).switch()?;
}
query_builder
.add_group_by_clause("dispute_status")
.switch()?;
if let Some(granularity) = granularity.as_ref() {
granularity
.set_group_by_clause(&mut query_builder)
.switch()?;
}
query_builder
.execute_query::<DisputeMetricRow, _>(pool)
.await
.change_context(MetricsError::QueryBuildingError)?
.change_context(MetricsError::QueryExecutionFailure)?
.into_iter()
.map(|i| {
Ok((
DisputeMetricsBucketIdentifier::new(
i.dispute_stage.as_ref().map(|i| i.0),
i.connector.clone(),
TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,
_ => time_range.start_time,
},
end_time: granularity.as_ref().map_or_else(
|| Ok(time_range.end_time),
|g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(),
)?,
},
),
i,
))
})
.collect::<error_stack::Result<
Vec<(DisputeMetricsBucketIdentifier, DisputeMetricRow)>,
crate::query::PostProcessingError,
>>()
.change_context(MetricsError::PostProcessingFailure)
}
}

View File

@ -0,0 +1,116 @@
use api_models::analytics::{
disputes::{DisputeDimensions, DisputeFilters, DisputeMetricsBucketIdentifier},
Granularity, TimeRange,
};
use common_utils::errors::ReportSwitchExt;
use error_stack::ResultExt;
use time::PrimitiveDateTime;
use super::DisputeMetricRow;
use crate::{
query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult},
};
#[derive(Default)]
pub(super) struct TotalAmountDisputed {}
#[async_trait::async_trait]
impl<T> super::DisputeMetric<T> for TotalAmountDisputed
where
T: AnalyticsDataSource + super::DisputeMetricAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_metrics(
&self,
dimensions: &[DisputeDimensions],
merchant_id: &str,
filters: &DisputeFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(DisputeMetricsBucketIdentifier, DisputeMetricRow)>>
where
T: AnalyticsDataSource + super::DisputeMetricAnalytics,
{
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::Dispute);
for dim in dimensions {
query_builder.add_select_column(dim).switch()?;
}
query_builder
.add_select_column(Aggregate::Sum {
field: "dispute_amount",
alias: Some("total"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Min {
field: "created_at",
alias: Some("start_bucket"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Max {
field: "created_at",
alias: Some("end_bucket"),
})
.switch()?;
filters.set_filter_clause(&mut query_builder).switch()?;
query_builder
.add_filter_clause("merchant_id", merchant_id)
.switch()?;
time_range
.set_filter_clause(&mut query_builder)
.attach_printable("Error filtering time range")
.switch()?;
for dim in dimensions.iter() {
query_builder.add_group_by_clause(dim).switch()?;
}
if let Some(granularity) = granularity.as_ref() {
granularity
.set_group_by_clause(&mut query_builder)
.switch()?;
}
query_builder
.add_filter_clause("dispute_status", "dispute_won")
.switch()?;
query_builder
.execute_query::<DisputeMetricRow, _>(pool)
.await
.change_context(MetricsError::QueryBuildingError)?
.change_context(MetricsError::QueryExecutionFailure)?
.into_iter()
.map(|i| {
Ok((
DisputeMetricsBucketIdentifier::new(
i.dispute_stage.as_ref().map(|i| i.0),
i.connector.clone(),
TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,
_ => time_range.start_time,
},
end_time: granularity.as_ref().map_or_else(
|| Ok(time_range.end_time),
|g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(),
)?,
},
),
i,
))
})
.collect::<error_stack::Result<Vec<_>, crate::query::PostProcessingError>>()
.change_context(MetricsError::PostProcessingFailure)
}
}

View File

@ -0,0 +1,117 @@
use api_models::analytics::{
disputes::{DisputeDimensions, DisputeFilters, DisputeMetricsBucketIdentifier},
Granularity, TimeRange,
};
use common_utils::errors::ReportSwitchExt;
use error_stack::ResultExt;
use time::PrimitiveDateTime;
use super::DisputeMetricRow;
use crate::{
query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult},
};
#[derive(Default)]
pub(super) struct TotalDisputeLostAmount {}
#[async_trait::async_trait]
impl<T> super::DisputeMetric<T> for TotalDisputeLostAmount
where
T: AnalyticsDataSource + super::DisputeMetricAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_metrics(
&self,
dimensions: &[DisputeDimensions],
merchant_id: &str,
filters: &DisputeFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(DisputeMetricsBucketIdentifier, DisputeMetricRow)>>
where
T: AnalyticsDataSource + super::DisputeMetricAnalytics,
{
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::Dispute);
for dim in dimensions.iter() {
query_builder.add_select_column(dim).switch()?;
}
query_builder
.add_select_column(Aggregate::Sum {
field: "dispute_amount",
alias: Some("total"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Min {
field: "created_at",
alias: Some("start_bucket"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Max {
field: "created_at",
alias: Some("end_bucket"),
})
.switch()?;
filters.set_filter_clause(&mut query_builder).switch()?;
query_builder
.add_filter_clause("merchant_id", merchant_id)
.switch()?;
time_range
.set_filter_clause(&mut query_builder)
.attach_printable("Error filtering time range")
.switch()?;
for dim in dimensions.iter() {
query_builder.add_group_by_clause(dim).switch()?;
}
if let Some(granularity) = granularity.as_ref() {
granularity
.set_group_by_clause(&mut query_builder)
.switch()?;
}
query_builder
.add_filter_clause("dispute_status", "dispute_lost")
.switch()?;
query_builder
.execute_query::<DisputeMetricRow, _>(pool)
.await
.change_context(MetricsError::QueryBuildingError)?
.change_context(MetricsError::QueryExecutionFailure)?
.into_iter()
.map(|i| {
Ok((
DisputeMetricsBucketIdentifier::new(
i.dispute_stage.as_ref().map(|i| i.0),
i.connector.clone(),
TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,
_ => time_range.start_time,
},
end_time: granularity.as_ref().map_or_else(
|| Ok(time_range.end_time),
|g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(),
)?,
},
),
i,
))
})
.collect::<error_stack::Result<Vec<_>, crate::query::PostProcessingError>>()
.change_context(MetricsError::PostProcessingFailure)
}
}

View File

@ -0,0 +1,29 @@
use api_models::analytics::disputes::{DisputeDimensions, DisputeFilters};
use error_stack::ResultExt;
use crate::{
query::{QueryBuilder, QueryFilter, QueryResult, ToSql},
types::{AnalyticsCollection, AnalyticsDataSource},
};
impl<T> QueryFilter<T> for DisputeFilters
where
T: AnalyticsDataSource,
AnalyticsCollection: ToSql<T>,
{
fn set_filter_clause(&self, builder: &mut QueryBuilder<T>) -> QueryResult<()> {
if !self.connector.is_empty() {
builder
.add_filter_in_range_clause(DisputeDimensions::Connector, &self.connector)
.attach_printable("Error adding connector filter")?;
}
if !self.dispute_stage.is_empty() {
builder
.add_filter_in_range_clause(DisputeDimensions::DisputeStage, &self.dispute_stage)
.attach_printable("Error adding dispute stage filter")?;
}
Ok(())
}
}

View File

@ -15,6 +15,7 @@ pub mod sdk_events;
mod sqlx; mod sqlx;
mod types; mod types;
use api_event::metrics::{ApiEventMetric, ApiEventMetricRow}; use api_event::metrics::{ApiEventMetric, ApiEventMetricRow};
use disputes::metrics::{DisputeMetric, DisputeMetricRow};
pub use types::AnalyticsDomain; pub use types::AnalyticsDomain;
pub mod lambda_utils; pub mod lambda_utils;
pub mod utils; pub mod utils;
@ -25,6 +26,7 @@ use api_models::analytics::{
api_event::{ api_event::{
ApiEventDimensions, ApiEventFilters, ApiEventMetrics, ApiEventMetricsBucketIdentifier, ApiEventDimensions, ApiEventFilters, ApiEventMetrics, ApiEventMetricsBucketIdentifier,
}, },
disputes::{DisputeDimensions, DisputeFilters, DisputeMetrics, DisputeMetricsBucketIdentifier},
payments::{PaymentDimensions, PaymentFilters, PaymentMetrics, PaymentMetricsBucketIdentifier}, payments::{PaymentDimensions, PaymentFilters, PaymentMetrics, PaymentMetricsBucketIdentifier},
refunds::{RefundDimensions, RefundFilters, RefundMetrics, RefundMetricsBucketIdentifier}, refunds::{RefundDimensions, RefundFilters, RefundMetrics, RefundMetricsBucketIdentifier},
sdk_events::{ sdk_events::{
@ -393,6 +395,106 @@ impl AnalyticsProvider {
.await .await
} }
pub async fn get_dispute_metrics(
&self,
metric: &DisputeMetrics,
dimensions: &[DisputeDimensions],
merchant_id: &str,
filters: &DisputeFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
) -> types::MetricsResult<Vec<(DisputeMetricsBucketIdentifier, DisputeMetricRow)>> {
// Metrics to get the fetch time for each refund metric
metrics::request::record_operation_time(
async {
match self {
Self::Sqlx(pool) => {
metric
.load_metrics(
dimensions,
merchant_id,
filters,
granularity,
time_range,
pool,
)
.await
}
Self::Clickhouse(pool) => {
metric
.load_metrics(
dimensions,
merchant_id,
filters,
granularity,
time_range,
pool,
)
.await
}
Self::CombinedCkh(sqlx_pool, ckh_pool) => {
let (ckh_result, sqlx_result) = tokio::join!(
metric.load_metrics(
dimensions,
merchant_id,
filters,
granularity,
time_range,
ckh_pool,
),
metric.load_metrics(
dimensions,
merchant_id,
filters,
granularity,
time_range,
sqlx_pool,
)
);
match (&sqlx_result, &ckh_result) {
(Ok(ref sqlx_res), Ok(ref ckh_res)) if sqlx_res != ckh_res => {
logger::error!(clickhouse_result=?ckh_res, postgres_result=?sqlx_res, "Mismatch between clickhouse & postgres disputes analytics metrics")
}
_ => {}
};
ckh_result
}
Self::CombinedSqlx(sqlx_pool, ckh_pool) => {
let (ckh_result, sqlx_result) = tokio::join!(
metric.load_metrics(
dimensions,
merchant_id,
filters,
granularity,
time_range,
ckh_pool,
),
metric.load_metrics(
dimensions,
merchant_id,
filters,
granularity,
time_range,
sqlx_pool,
)
);
match (&sqlx_result, &ckh_result) {
(Ok(ref sqlx_res), Ok(ref ckh_res)) if sqlx_res != ckh_res => {
logger::error!(clickhouse_result=?ckh_res, postgres_result=?sqlx_res, "Mismatch between clickhouse & postgres disputes analytics metrics")
}
_ => {}
};
sqlx_result
}
}
},
&metrics::METRIC_FETCH_TIME,
metric,
self,
)
.await
}
pub async fn get_sdk_event_metrics( pub async fn get_sdk_event_metrics(
&self, &self,
metric: &SdkEventMetrics, metric: &SdkEventMetrics,

View File

@ -11,7 +11,8 @@ use api_models::{
Granularity, Granularity,
}, },
enums::{ enums::{
AttemptStatus, AuthenticationType, Connector, Currency, PaymentMethod, PaymentMethodType, AttemptStatus, AuthenticationType, Connector, Currency, DisputeStage, PaymentMethod,
PaymentMethodType,
}, },
refunds::RefundStatus, refunds::RefundStatus,
}; };
@ -363,8 +364,6 @@ impl_to_sql_for_to_string!(
PaymentDimensions, PaymentDimensions,
&PaymentDistributions, &PaymentDistributions,
RefundDimensions, RefundDimensions,
&DisputeDimensions,
DisputeDimensions,
PaymentMethod, PaymentMethod,
PaymentMethodType, PaymentMethodType,
AuthenticationType, AuthenticationType,
@ -386,6 +385,8 @@ impl_to_sql_for_to_string!(&SdkEventDimensions, SdkEventDimensions, SdkEventName
impl_to_sql_for_to_string!(&ApiEventDimensions, ApiEventDimensions); impl_to_sql_for_to_string!(&ApiEventDimensions, ApiEventDimensions);
impl_to_sql_for_to_string!(&DisputeDimensions, DisputeDimensions, DisputeStage);
#[derive(Debug)] #[derive(Debug)]
pub enum FilterTypes { pub enum FilterTypes {
Equal, Equal,

View File

@ -1,6 +1,9 @@
use std::{fmt::Display, str::FromStr}; use std::{fmt::Display, str::FromStr};
use api_models::analytics::refunds::RefundType; use api_models::{
analytics::refunds::RefundType,
enums::{DisputeStage, DisputeStatus},
};
use common_utils::errors::{CustomResult, ParsingError}; use common_utils::errors::{CustomResult, ParsingError};
use diesel_models::enums::{ use diesel_models::enums::{
AttemptStatus, AuthenticationType, Currency, PaymentMethod, RefundStatus, AttemptStatus, AuthenticationType, Currency, PaymentMethod, RefundStatus,
@ -89,6 +92,8 @@ db_type!(AttemptStatus);
db_type!(PaymentMethod, TEXT); db_type!(PaymentMethod, TEXT);
db_type!(RefundStatus); db_type!(RefundStatus);
db_type!(RefundType); db_type!(RefundType);
db_type!(DisputeStage);
db_type!(DisputeStatus);
impl<'q, Type> Encode<'q, Postgres> for DBEnumWrapper<Type> impl<'q, Type> Encode<'q, Postgres> for DBEnumWrapper<Type>
where where
@ -145,6 +150,7 @@ impl super::payments::distribution::PaymentDistributionAnalytics for SqlxClient
impl super::refunds::metrics::RefundMetricAnalytics for SqlxClient {} impl super::refunds::metrics::RefundMetricAnalytics for SqlxClient {}
impl super::refunds::filters::RefundFilterAnalytics for SqlxClient {} impl super::refunds::filters::RefundFilterAnalytics for SqlxClient {}
impl super::disputes::filters::DisputeFilterAnalytics for SqlxClient {} impl super::disputes::filters::DisputeFilterAnalytics for SqlxClient {}
impl super::disputes::metrics::DisputeMetricAnalytics for SqlxClient {}
#[async_trait::async_trait] #[async_trait::async_trait]
impl AnalyticsDataSource for SqlxClient { impl AnalyticsDataSource for SqlxClient {
@ -454,6 +460,48 @@ impl<'a> FromRow<'a, PgRow> for super::disputes::filters::DisputeFilterRow {
}) })
} }
} }
impl<'a> FromRow<'a, PgRow> for super::disputes::metrics::DisputeMetricRow {
fn from_row(row: &'a PgRow) -> sqlx::Result<Self> {
let dispute_stage: Option<DBEnumWrapper<DisputeStage>> =
row.try_get("dispute_stage").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
let dispute_status: Option<DBEnumWrapper<DisputeStatus>> =
row.try_get("dispute_status").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
let connector: Option<String> = row.try_get("connector").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
let total: Option<bigdecimal::BigDecimal> = row.try_get("total").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
let count: Option<i64> = row.try_get("count").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
// Removing millisecond precision to get accurate diffs against clickhouse
let start_bucket: Option<PrimitiveDateTime> = row
.try_get::<Option<PrimitiveDateTime>, _>("start_bucket")?
.and_then(|dt| dt.replace_millisecond(0).ok());
let end_bucket: Option<PrimitiveDateTime> = row
.try_get::<Option<PrimitiveDateTime>, _>("end_bucket")?
.and_then(|dt| dt.replace_millisecond(0).ok());
Ok(Self {
dispute_stage,
dispute_status,
connector,
total,
count,
start_bucket,
end_bucket,
})
}
}
impl ToSql<SqlxClient> for PrimitiveDateTime { impl ToSql<SqlxClient> for PrimitiveDateTime {
fn to_sql(&self, _table_engine: &TableEngine) -> error_stack::Result<String, ParsingError> { fn to_sql(&self, _table_engine: &TableEngine) -> error_stack::Result<String, ParsingError> {

View File

@ -5,7 +5,7 @@ use masking::Secret;
use self::{ use self::{
api_event::{ApiEventDimensions, ApiEventMetrics}, api_event::{ApiEventDimensions, ApiEventMetrics},
disputes::DisputeDimensions, disputes::{DisputeDimensions, DisputeMetrics},
payments::{PaymentDimensions, PaymentDistributions, PaymentMetrics}, payments::{PaymentDimensions, PaymentDistributions, PaymentMetrics},
refunds::{RefundDimensions, RefundMetrics}, refunds::{RefundDimensions, RefundMetrics},
sdk_events::{SdkEventDimensions, SdkEventMetrics}, sdk_events::{SdkEventDimensions, SdkEventMetrics},
@ -271,3 +271,17 @@ pub struct DisputeFilterValue {
pub dimension: DisputeDimensions, pub dimension: DisputeDimensions,
pub values: Vec<String>, pub values: Vec<String>,
} }
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct GetDisputeMetricRequest {
pub time_series: Option<TimeSeries>,
pub time_range: TimeRange,
#[serde(default)]
pub group_by_names: Vec<DisputeDimensions>,
#[serde(default)]
pub filters: disputes::DisputeFilters,
pub metrics: HashSet<DisputeMetrics>,
#[serde(default)]
pub delta: bool,
}

View File

@ -1,4 +1,10 @@
use super::NameDescription; use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
};
use super::{NameDescription, TimeRange};
use crate::enums::DisputeStage;
#[derive( #[derive(
Clone, Clone,
@ -15,9 +21,7 @@ use super::NameDescription;
#[strum(serialize_all = "snake_case")] #[strum(serialize_all = "snake_case")]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub enum DisputeMetrics { pub enum DisputeMetrics {
DisputesChallenged, DisputeStatusMetric,
DisputesWon,
DisputesLost,
TotalAmountDisputed, TotalAmountDisputed,
TotalDisputeLostAmount, TotalDisputeLostAmount,
} }
@ -42,8 +46,6 @@ pub enum DisputeDimensions {
// Do not change the order of these enums // Do not change the order of these enums
// Consult the Dashboard FE folks since these also affects the order of metrics on FE // Consult the Dashboard FE folks since these also affects the order of metrics on FE
Connector, Connector,
DisputeStatus,
ConnectorStatus,
DisputeStage, DisputeStage,
} }
@ -64,3 +66,70 @@ impl From<DisputeMetrics> for NameDescription {
} }
} }
} }
#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)]
pub struct DisputeFilters {
#[serde(default)]
pub dispute_stage: Vec<DisputeStage>,
pub connector: Vec<String>,
}
#[derive(Debug, serde::Serialize, Eq)]
pub struct DisputeMetricsBucketIdentifier {
pub dispute_stage: Option<DisputeStage>,
pub connector: Option<String>,
#[serde(rename = "time_range")]
pub time_bucket: TimeRange,
#[serde(rename = "time_bucket")]
#[serde(with = "common_utils::custom_serde::iso8601custom")]
pub start_time: time::PrimitiveDateTime,
}
impl Hash for DisputeMetricsBucketIdentifier {
fn hash<H: Hasher>(&self, state: &mut H) {
self.dispute_stage.hash(state);
self.connector.hash(state);
self.time_bucket.hash(state);
}
}
impl PartialEq for DisputeMetricsBucketIdentifier {
fn eq(&self, other: &Self) -> bool {
let mut left = DefaultHasher::new();
self.hash(&mut left);
let mut right = DefaultHasher::new();
other.hash(&mut right);
left.finish() == right.finish()
}
}
impl DisputeMetricsBucketIdentifier {
pub fn new(
dispute_stage: Option<DisputeStage>,
connector: Option<String>,
normalized_time_range: TimeRange,
) -> Self {
Self {
dispute_stage,
connector,
time_bucket: normalized_time_range,
start_time: normalized_time_range.start_time,
}
}
}
#[derive(Debug, serde::Serialize)]
pub struct DisputeMetricsBucketValue {
pub disputes_challenged: Option<u64>,
pub disputes_won: Option<u64>,
pub disputes_lost: Option<u64>,
pub total_amount_disputed: Option<u64>,
pub total_dispute_lost_amount: Option<u64>,
pub total_dispute: Option<u64>,
}
#[derive(Debug, serde::Serialize)]
pub struct DisputeMetricsBucketResponse {
#[serde(flatten)]
pub values: DisputeMetricsBucketValue,
#[serde(flatten)]
pub dimensions: DisputeMetricsBucketIdentifier,
}

View File

@ -97,7 +97,8 @@ impl_misc_api_event_type!(
ConnectorEventsRequest, ConnectorEventsRequest,
OutgoingWebhookLogsRequest, OutgoingWebhookLogsRequest,
GetDisputeFilterRequest, GetDisputeFilterRequest,
DisputeFiltersResponse DisputeFiltersResponse,
GetDisputeMetricRequest
); );
#[cfg(feature = "stripe")] #[cfg(feature = "stripe")]

View File

@ -9,8 +9,9 @@ pub mod routes {
}; };
use api_models::analytics::{ use api_models::analytics::{
GenerateReportRequest, GetApiEventFiltersRequest, GetApiEventMetricRequest, GenerateReportRequest, GetApiEventFiltersRequest, GetApiEventMetricRequest,
GetPaymentFiltersRequest, GetPaymentMetricRequest, GetRefundFilterRequest, GetDisputeMetricRequest, GetPaymentFiltersRequest, GetPaymentMetricRequest,
GetRefundMetricRequest, GetSdkEventFiltersRequest, GetSdkEventMetricRequest, ReportRequest, GetRefundFilterRequest, GetRefundMetricRequest, GetSdkEventFiltersRequest,
GetSdkEventMetricRequest, ReportRequest,
}; };
use error_stack::ResultExt; use error_stack::ResultExt;
use router_env::AnalyticsFlow; use router_env::AnalyticsFlow;
@ -92,6 +93,10 @@ pub mod routes {
web::resource("filters/disputes") web::resource("filters/disputes")
.route(web::post().to(get_dispute_filters)), .route(web::post().to(get_dispute_filters)),
) )
.service(
web::resource("metrics/disputes")
.route(web::post().to(get_dispute_metrics)),
)
} }
route route
} }
@ -612,4 +617,39 @@ pub mod routes {
)) ))
.await .await
} }
/// # Panics
///
/// Panics if `json_payload` array does not contain one `GetDisputeMetricRequest` element.
pub async fn get_dispute_metrics(
state: web::Data<AppState>,
req: actix_web::HttpRequest,
json_payload: web::Json<[GetDisputeMetricRequest; 1]>,
) -> impl Responder {
// safety: This shouldn't panic owing to the data type
#[allow(clippy::expect_used)]
let payload = json_payload
.into_inner()
.to_vec()
.pop()
.expect("Couldn't get GetDisputeMetricRequest");
let flow = AnalyticsFlow::GetDisputeMetrics;
Box::pin(api::server_wrap(
flow,
state,
&req,
payload,
|state, auth: AuthenticationData, req| async move {
analytics::disputes::get_metrics(
&state.pool,
&auth.merchant_account.merchant_id,
req,
)
.await
.map(ApplicationResponse::Json)
},
&auth::JWTAuth(Permission::Analytics),
api_locking::LockAction::NotApplicable,
))
.await
}
} }

View File

@ -33,6 +33,7 @@ pub enum EventType {
ApiLogs, ApiLogs,
ConnectorApiLogs, ConnectorApiLogs,
OutgoingWebhookLogs, OutgoingWebhookLogs,
Dispute,
} }
#[derive(Debug, Default, Deserialize, Clone)] #[derive(Debug, Default, Deserialize, Clone)]

View File

@ -347,6 +347,7 @@ impl KafkaProducer {
EventType::Refund => &self.refund_analytics_topic, EventType::Refund => &self.refund_analytics_topic,
EventType::ConnectorApiLogs => &self.connector_logs_topic, EventType::ConnectorApiLogs => &self.connector_logs_topic,
EventType::OutgoingWebhookLogs => &self.outgoing_webhook_logs_topic, EventType::OutgoingWebhookLogs => &self.outgoing_webhook_logs_topic,
EventType::Dispute => &self.dispute_analytics_topic,
} }
} }
} }

View File

@ -7,7 +7,7 @@ use crate::types::storage::dispute::Dispute;
#[derive(serde::Serialize, Debug)] #[derive(serde::Serialize, Debug)]
pub struct KafkaDispute<'a> { pub struct KafkaDispute<'a> {
pub dispute_id: &'a String, pub dispute_id: &'a String,
pub amount: &'a String, pub dispute_amount: i64,
pub currency: &'a String, pub currency: &'a String,
pub dispute_stage: &'a storage_enums::DisputeStage, pub dispute_stage: &'a storage_enums::DisputeStage,
pub dispute_status: &'a storage_enums::DisputeStatus, pub dispute_status: &'a storage_enums::DisputeStatus,
@ -38,7 +38,7 @@ impl<'a> KafkaDispute<'a> {
pub fn from_storage(dispute: &'a Dispute) -> Self { pub fn from_storage(dispute: &'a Dispute) -> Self {
Self { Self {
dispute_id: &dispute.dispute_id, dispute_id: &dispute.dispute_id,
amount: &dispute.amount, dispute_amount: dispute.amount.parse::<i64>().unwrap_or_default(),
currency: &dispute.currency, currency: &dispute.currency,
dispute_stage: &dispute.dispute_stage, dispute_stage: &dispute.dispute_stage,
dispute_status: &dispute.dispute_status, dispute_status: &dispute.dispute_status,

View File

@ -55,6 +55,7 @@ pub enum AnalyticsFlow {
GetConnectorEvents, GetConnectorEvents,
GetOutgoingWebhookEvents, GetOutgoingWebhookEvents,
GetDisputeFilters, GetDisputeFilters,
GetDisputeMetrics,
} }
impl FlowMetric for AnalyticsFlow {} impl FlowMetric for AnalyticsFlow {}