feat(analytics): Add refund sessionized metrics for Analytics V2 dashboard (#6616)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Sandeep Kumar
2024-12-05 15:39:40 +05:30
committed by GitHub
parent de80121871
commit 774a53ee89
29 changed files with 1527 additions and 78 deletions

View File

@ -16,7 +16,9 @@ use super::{
distribution::PaymentDistributionRow, filters::PaymentFilterRow, metrics::PaymentMetricRow,
},
query::{Aggregate, ToSql, Window},
refunds::{filters::RefundFilterRow, metrics::RefundMetricRow},
refunds::{
distribution::RefundDistributionRow, filters::RefundFilterRow, metrics::RefundMetricRow,
},
sdk_events::{filters::SdkEventFilter, metrics::SdkEventMetricRow},
types::{AnalyticsCollection, AnalyticsDataSource, LoadRow, QueryExecutionError},
};
@ -170,6 +172,7 @@ impl super::payment_intents::filters::PaymentIntentFilterAnalytics for Clickhous
impl super::payment_intents::metrics::PaymentIntentMetricAnalytics for ClickhouseClient {}
impl super::refunds::metrics::RefundMetricAnalytics for ClickhouseClient {}
impl super::refunds::filters::RefundFilterAnalytics for ClickhouseClient {}
impl super::refunds::distribution::RefundDistributionAnalytics for ClickhouseClient {}
impl super::frm::metrics::FrmMetricAnalytics for ClickhouseClient {}
impl super::frm::filters::FrmFilterAnalytics for ClickhouseClient {}
impl super::sdk_events::filters::SdkEventFilterAnalytics for ClickhouseClient {}
@ -300,6 +303,16 @@ impl TryInto<RefundFilterRow> for serde_json::Value {
}
}
impl TryInto<RefundDistributionRow> for serde_json::Value {
type Error = Report<ParsingError>;
fn try_into(self) -> Result<RefundDistributionRow, Self::Error> {
serde_json::from_value(self).change_context(ParsingError::StructParseFailure(
"Failed to parse RefundDistributionRow in clickhouse results",
))
}
}
impl TryInto<FrmMetricRow> for serde_json::Value {
type Error = Report<ParsingError>;

View File

@ -29,6 +29,7 @@ use hyperswitch_interfaces::secrets_interface::{
secret_state::{RawSecret, SecretStateContainer, SecuredSecret},
SecretManagementInterface, SecretsManagementError,
};
use refunds::distribution::{RefundDistribution, RefundDistributionRow};
pub use types::AnalyticsDomain;
pub mod lambda_utils;
pub mod utils;
@ -52,7 +53,7 @@ use api_models::analytics::{
sdk_events::{
SdkEventDimensions, SdkEventFilters, SdkEventMetrics, SdkEventMetricsBucketIdentifier,
},
Distribution, Granularity, TimeRange,
Granularity, PaymentDistributionBody, RefundDistributionBody, TimeRange,
};
use clickhouse::ClickhouseClient;
pub use clickhouse::ClickhouseConfig;
@ -215,7 +216,7 @@ impl AnalyticsProvider {
pub async fn get_payment_distribution(
&self,
distribution: &Distribution,
distribution: &PaymentDistributionBody,
dimensions: &[PaymentDimensions],
auth: &AuthInfo,
filters: &PaymentFilters,
@ -528,6 +529,116 @@ impl AnalyticsProvider {
.await
}
pub async fn get_refund_distribution(
&self,
distribution: &RefundDistributionBody,
dimensions: &[RefundDimensions],
auth: &AuthInfo,
filters: &RefundFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
) -> types::MetricsResult<Vec<(RefundMetricsBucketIdentifier, RefundDistributionRow)>> {
// Metrics to get the fetch time for each payment metric
metrics::request::record_operation_time(
async {
match self {
Self::Sqlx(pool) => {
distribution.distribution_for
.load_distribution(
distribution,
dimensions,
auth,
filters,
granularity,
time_range,
pool,
)
.await
}
Self::Clickhouse(pool) => {
distribution.distribution_for
.load_distribution(
distribution,
dimensions,
auth,
filters,
granularity,
time_range,
pool,
)
.await
}
Self::CombinedCkh(sqlx_pool, ckh_pool) => {
let (ckh_result, sqlx_result) = tokio::join!(distribution.distribution_for
.load_distribution(
distribution,
dimensions,
auth,
filters,
granularity,
time_range,
ckh_pool,
),
distribution.distribution_for
.load_distribution(
distribution,
dimensions,
auth,
filters,
granularity,
time_range,
sqlx_pool,
));
match (&sqlx_result, &ckh_result) {
(Ok(ref sqlx_res), Ok(ref ckh_res)) if sqlx_res != ckh_res => {
router_env::logger::error!(clickhouse_result=?ckh_res, postgres_result=?sqlx_res, "Mismatch between clickhouse & postgres payments analytics distribution")
},
_ => {}
};
ckh_result
}
Self::CombinedSqlx(sqlx_pool, ckh_pool) => {
let (ckh_result, sqlx_result) = tokio::join!(distribution.distribution_for
.load_distribution(
distribution,
dimensions,
auth,
filters,
granularity,
time_range,
ckh_pool,
),
distribution.distribution_for
.load_distribution(
distribution,
dimensions,
auth,
filters,
granularity,
time_range,
sqlx_pool,
));
match (&sqlx_result, &ckh_result) {
(Ok(ref sqlx_res), Ok(ref ckh_res)) if sqlx_res != ckh_res => {
router_env::logger::error!(clickhouse_result=?ckh_res, postgres_result=?sqlx_res, "Mismatch between clickhouse & postgres payments analytics distribution")
},
_ => {}
};
sqlx_result
}
}
},
&metrics::METRIC_FETCH_TIME,
&distribution.distribution_for,
self,
)
.await
}
pub async fn get_frm_metrics(
&self,
metric: &FrmMetrics,

View File

@ -2,7 +2,7 @@ use api_models::analytics::{
payments::{
PaymentDimensions, PaymentDistributions, PaymentFilters, PaymentMetricsBucketIdentifier,
},
Distribution, Granularity, TimeRange,
Granularity, PaymentDistributionBody, TimeRange,
};
use diesel_models::enums as storage_enums;
use time::PrimitiveDateTime;
@ -53,7 +53,7 @@ where
#[allow(clippy::too_many_arguments)]
async fn load_distribution(
&self,
distribution: &Distribution,
distribution: &PaymentDistributionBody,
dimensions: &[PaymentDimensions],
auth: &AuthInfo,
filters: &PaymentFilters,
@ -75,7 +75,7 @@ where
{
async fn load_distribution(
&self,
distribution: &Distribution,
distribution: &PaymentDistributionBody,
dimensions: &[PaymentDimensions],
auth: &AuthInfo,
filters: &PaymentFilters,

View File

@ -1,6 +1,6 @@
use api_models::analytics::{
payments::{PaymentDimensions, PaymentFilters, PaymentMetricsBucketIdentifier},
Distribution, Granularity, TimeRange,
Granularity, PaymentDistributionBody, TimeRange,
};
use common_utils::errors::ReportSwitchExt;
use diesel_models::enums as storage_enums;
@ -31,7 +31,7 @@ where
{
async fn load_distribution(
&self,
distribution: &Distribution,
distribution: &PaymentDistributionBody,
dimensions: &[PaymentDimensions],
auth: &AuthInfo,
filters: &PaymentFilters,

View File

@ -9,7 +9,7 @@ use api_models::{
frm::{FrmDimensions, FrmTransactionType},
payment_intents::PaymentIntentDimensions,
payments::{PaymentDimensions, PaymentDistributions},
refunds::{RefundDimensions, RefundType},
refunds::{RefundDimensions, RefundDistributions, RefundType},
sdk_events::{SdkEventDimensions, SdkEventNames},
Granularity,
},
@ -488,6 +488,7 @@ impl_to_sql_for_to_string!(
PaymentIntentDimensions,
&PaymentDistributions,
RefundDimensions,
&RefundDistributions,
FrmDimensions,
PaymentMethod,
PaymentMethodType,

View File

@ -1,6 +1,7 @@
pub mod accumulator;
mod core;
pub mod distribution;
pub mod filters;
pub mod metrics;
pub mod types;

View File

@ -1,19 +1,56 @@
use api_models::analytics::refunds::RefundMetricsBucketValue;
use api_models::analytics::refunds::{
ErrorMessagesResult, ReasonsResult, RefundMetricsBucketValue,
};
use bigdecimal::ToPrimitive;
use diesel_models::enums as storage_enums;
use super::metrics::RefundMetricRow;
use super::{distribution::RefundDistributionRow, metrics::RefundMetricRow};
#[derive(Debug, Default)]
pub struct RefundMetricsAccumulator {
pub refund_success_rate: SuccessRateAccumulator,
pub refund_count: CountAccumulator,
pub refund_success: CountAccumulator,
pub processed_amount: PaymentProcessedAmountAccumulator,
pub processed_amount: RefundProcessedAmountAccumulator,
pub refund_reason: RefundReasonAccumulator,
pub refund_reason_distribution: RefundReasonDistributionAccumulator,
pub refund_error_message: RefundReasonAccumulator,
pub refund_error_message_distribution: RefundErrorMessageDistributionAccumulator,
}
#[derive(Debug, Default)]
pub struct RefundReasonDistributionRow {
pub count: i64,
pub total: i64,
pub refund_reason: String,
}
#[derive(Debug, Default)]
pub struct RefundReasonDistributionAccumulator {
pub refund_reason_vec: Vec<RefundReasonDistributionRow>,
}
#[derive(Debug, Default)]
pub struct RefundErrorMessageDistributionRow {
pub count: i64,
pub total: i64,
pub refund_error_message: String,
}
#[derive(Debug, Default)]
pub struct RefundErrorMessageDistributionAccumulator {
pub refund_error_message_vec: Vec<RefundErrorMessageDistributionRow>,
}
#[derive(Debug, Default)]
#[repr(transparent)]
pub struct RefundReasonAccumulator {
pub count: u64,
}
#[derive(Debug, Default)]
pub struct SuccessRateAccumulator {
pub success: i64,
pub total: i64,
pub success: u32,
pub total: u32,
}
#[derive(Debug, Default)]
#[repr(transparent)]
@ -21,8 +58,8 @@ pub struct CountAccumulator {
pub count: Option<i64>,
}
#[derive(Debug, Default)]
#[repr(transparent)]
pub struct PaymentProcessedAmountAccumulator {
pub struct RefundProcessedAmountAccumulator {
pub count: Option<i64>,
pub total: Option<i64>,
}
@ -34,6 +71,93 @@ pub trait RefundMetricAccumulator {
fn collect(self) -> Self::MetricOutput;
}
pub trait RefundDistributionAccumulator {
type DistributionOutput;
fn add_distribution_bucket(&mut self, distribution: &RefundDistributionRow);
fn collect(self) -> Self::DistributionOutput;
}
impl RefundDistributionAccumulator for RefundReasonDistributionAccumulator {
type DistributionOutput = Option<Vec<ReasonsResult>>;
fn add_distribution_bucket(&mut self, distribution: &RefundDistributionRow) {
self.refund_reason_vec.push(RefundReasonDistributionRow {
count: distribution.count.unwrap_or_default(),
total: distribution
.total
.clone()
.map(|i| i.to_i64().unwrap_or_default())
.unwrap_or_default(),
refund_reason: distribution.refund_reason.clone().unwrap_or_default(),
})
}
fn collect(mut self) -> Self::DistributionOutput {
if self.refund_reason_vec.is_empty() {
None
} else {
self.refund_reason_vec.sort_by(|a, b| b.count.cmp(&a.count));
let mut res: Vec<ReasonsResult> = Vec::new();
for val in self.refund_reason_vec.into_iter() {
let perc = f64::from(u32::try_from(val.count).ok()?) * 100.0
/ f64::from(u32::try_from(val.total).ok()?);
res.push(ReasonsResult {
reason: val.refund_reason,
count: val.count,
percentage: (perc * 100.0).round() / 100.0,
})
}
Some(res)
}
}
}
impl RefundDistributionAccumulator for RefundErrorMessageDistributionAccumulator {
type DistributionOutput = Option<Vec<ErrorMessagesResult>>;
fn add_distribution_bucket(&mut self, distribution: &RefundDistributionRow) {
self.refund_error_message_vec
.push(RefundErrorMessageDistributionRow {
count: distribution.count.unwrap_or_default(),
total: distribution
.total
.clone()
.map(|i| i.to_i64().unwrap_or_default())
.unwrap_or_default(),
refund_error_message: distribution
.refund_error_message
.clone()
.unwrap_or_default(),
})
}
fn collect(mut self) -> Self::DistributionOutput {
if self.refund_error_message_vec.is_empty() {
None
} else {
self.refund_error_message_vec
.sort_by(|a, b| b.count.cmp(&a.count));
let mut res: Vec<ErrorMessagesResult> = Vec::new();
for val in self.refund_error_message_vec.into_iter() {
let perc = f64::from(u32::try_from(val.count).ok()?) * 100.0
/ f64::from(u32::try_from(val.total).ok()?);
res.push(ErrorMessagesResult {
error_message: val.refund_error_message,
count: val.count,
percentage: (perc * 100.0).round() / 100.0,
})
}
Some(res)
}
}
}
impl RefundMetricAccumulator for CountAccumulator {
type MetricOutput = Option<u64>;
#[inline]
@ -50,62 +174,103 @@ impl RefundMetricAccumulator for CountAccumulator {
}
}
impl RefundMetricAccumulator for PaymentProcessedAmountAccumulator {
type MetricOutput = (Option<u64>, Option<u64>);
impl RefundMetricAccumulator for RefundProcessedAmountAccumulator {
type MetricOutput = (Option<u64>, Option<u64>, Option<u64>);
#[inline]
fn add_metrics_bucket(&mut self, metrics: &RefundMetricRow) {
self.total = match (
self.total,
metrics
.total
.as_ref()
.and_then(bigdecimal::ToPrimitive::to_i64),
metrics.total.as_ref().and_then(ToPrimitive::to_i64),
) {
(None, None) => None,
(None, i @ Some(_)) | (i @ Some(_), None) => i,
(Some(a), Some(b)) => Some(a + b),
}
};
self.count = match (self.count, metrics.count) {
(None, None) => None,
(None, i @ Some(_)) | (i @ Some(_), None) => i,
(Some(a), Some(b)) => Some(a + b),
};
}
#[inline]
fn collect(self) -> Self::MetricOutput {
(self.total.and_then(|i| u64::try_from(i).ok()), Some(0))
let total = u64::try_from(self.total.unwrap_or_default()).ok();
let count = self.count.and_then(|i| u64::try_from(i).ok());
(total, count, Some(0))
}
}
impl RefundMetricAccumulator for SuccessRateAccumulator {
type MetricOutput = Option<f64>;
type MetricOutput = (Option<u32>, Option<u32>, Option<f64>);
fn add_metrics_bucket(&mut self, metrics: &RefundMetricRow) {
if let Some(ref refund_status) = metrics.refund_status {
if refund_status.as_ref() == &storage_enums::RefundStatus::Success {
self.success += metrics.count.unwrap_or_default();
if let Some(success) = metrics
.count
.and_then(|success| u32::try_from(success).ok())
{
self.success += success;
}
}
};
self.total += metrics.count.unwrap_or_default();
if let Some(total) = metrics.count.and_then(|total| u32::try_from(total).ok()) {
self.total += total;
}
}
fn collect(self) -> Self::MetricOutput {
if self.total <= 0 {
None
if self.total == 0 {
(None, None, None)
} else {
Some(
f64::from(u32::try_from(self.success).ok()?) * 100.0
/ f64::from(u32::try_from(self.total).ok()?),
)
let success = Some(self.success);
let total = Some(self.total);
let success_rate = match (success, total) {
(Some(s), Some(t)) if t > 0 => Some(f64::from(s) * 100.0 / f64::from(t)),
_ => None,
};
(success, total, success_rate)
}
}
}
impl RefundMetricAccumulator for RefundReasonAccumulator {
type MetricOutput = Option<u64>;
fn add_metrics_bucket(&mut self, metrics: &RefundMetricRow) {
if let Some(count) = metrics.count {
if let Ok(count_u64) = u64::try_from(count) {
self.count += count_u64;
}
}
}
fn collect(self) -> Self::MetricOutput {
Some(self.count)
}
}
impl RefundMetricsAccumulator {
pub fn collect(self) -> RefundMetricsBucketValue {
let (refund_processed_amount, refund_processed_amount_in_usd) =
let (successful_refunds, total_refunds, refund_success_rate) =
self.refund_success_rate.collect();
let (refund_processed_amount, refund_processed_count, refund_processed_amount_in_usd) =
self.processed_amount.collect();
RefundMetricsBucketValue {
refund_success_rate: self.refund_success_rate.collect(),
successful_refunds,
total_refunds,
refund_success_rate,
refund_count: self.refund_count.collect(),
refund_success_count: self.refund_success.collect(),
refund_processed_amount,
refund_processed_amount_in_usd,
refund_processed_count,
refund_reason_distribution: self.refund_reason_distribution.collect(),
refund_error_message_distribution: self.refund_error_message_distribution.collect(),
refund_reason_count: self.refund_reason.collect(),
refund_error_message_count: self.refund_error_message.collect(),
}
}
}

View File

@ -1,15 +1,17 @@
#![allow(dead_code)]
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use api_models::analytics::{
refunds::{
RefundDimensions, RefundMetrics, RefundMetricsBucketIdentifier, RefundMetricsBucketResponse,
RefundDimensions, RefundDistributions, RefundMetrics, RefundMetricsBucketIdentifier,
RefundMetricsBucketResponse,
},
GetRefundFilterRequest, GetRefundMetricRequest, RefundFilterValue, RefundFiltersResponse,
RefundsAnalyticsMetadata, RefundsMetricsResponse,
};
use bigdecimal::ToPrimitive;
use common_enums::Currency;
use common_utils::errors::CustomResult;
use currency_conversion::{conversion::convert, types::ExchangeRates};
use error_stack::ResultExt;
use router_env::{
@ -19,17 +21,31 @@ use router_env::{
};
use super::{
distribution::RefundDistributionRow,
filters::{get_refund_filter_for_dimension, RefundFilterRow},
metrics::RefundMetricRow,
RefundMetricsAccumulator,
};
use crate::{
enums::AuthInfo,
errors::{AnalyticsError, AnalyticsResult},
metrics,
refunds::RefundMetricAccumulator,
refunds::{accumulator::RefundDistributionAccumulator, RefundMetricAccumulator},
AnalyticsProvider,
};
#[derive(Debug)]
pub enum TaskType {
MetricTask(
RefundMetrics,
CustomResult<HashSet<(RefundMetricsBucketIdentifier, RefundMetricRow)>, AnalyticsError>,
),
DistributionTask(
RefundDistributions,
CustomResult<Vec<(RefundMetricsBucketIdentifier, RefundDistributionRow)>, AnalyticsError>,
),
}
pub async fn get_metrics(
pool: &AnalyticsProvider,
ex_rates: &ExchangeRates,
@ -62,18 +78,48 @@ pub async fn get_metrics(
)
.await
.change_context(AnalyticsError::UnknownError);
(metric_type, data)
TaskType::MetricTask(metric_type, data)
}
.instrument(task_span),
);
}
while let Some((metric, data)) = set
if let Some(distribution) = req.clone().distribution {
let req = req.clone();
let pool = pool.clone();
let task_span = tracing::debug_span!(
"analytics_refunds_distribution_query",
refund_distribution = distribution.distribution_for.as_ref()
);
let auth_scoped = auth.to_owned();
set.spawn(
async move {
let data = pool
.get_refund_distribution(
&distribution,
&req.group_by_names.clone(),
&auth_scoped,
&req.filters,
&req.time_series.map(|t| t.granularity),
&req.time_range,
)
.await
.change_context(AnalyticsError::UnknownError);
TaskType::DistributionTask(distribution.distribution_for, data)
}
.instrument(task_span),
);
}
while let Some(task_type) = set
.join_next()
.await
.transpose()
.change_context(AnalyticsError::UnknownError)?
{
match task_type {
TaskType::MetricTask(metric, data) => {
let data = data?;
let attributes = &add_attributes([
("metric_type", metric.to_string()),
@ -90,11 +136,10 @@ pub async fn get_metrics(
logger::debug!(bucket_id=?id, bucket_value=?value, "Bucket row for metric {metric}");
let metrics_builder = metrics_accumulator.entry(id).or_default();
match metric {
RefundMetrics::RefundSuccessRate | RefundMetrics::SessionizedRefundSuccessRate => {
metrics_builder
RefundMetrics::RefundSuccessRate
| RefundMetrics::SessionizedRefundSuccessRate => metrics_builder
.refund_success_rate
.add_metrics_bucket(&value)
}
.add_metrics_bucket(&value),
RefundMetrics::RefundCount | RefundMetrics::SessionizedRefundCount => {
metrics_builder.refund_count.add_metrics_bucket(&value)
}
@ -106,6 +151,12 @@ pub async fn get_metrics(
| RefundMetrics::SessionizedRefundProcessedAmount => {
metrics_builder.processed_amount.add_metrics_bucket(&value)
}
RefundMetrics::SessionizedRefundReason => {
metrics_builder.refund_reason.add_metrics_bucket(&value)
}
RefundMetrics::SessionizedRefundErrorMessage => metrics_builder
.refund_error_message
.add_metrics_bucket(&value),
}
}
@ -115,12 +166,57 @@ pub async fn get_metrics(
metrics_accumulator
);
}
TaskType::DistributionTask(distribution, data) => {
let data = data?;
let attributes = &add_attributes([
("distribution_type", distribution.to_string()),
("source", pool.to_string()),
]);
let value = u64::try_from(data.len());
if let Ok(val) = value {
metrics::BUCKETS_FETCHED.record(&metrics::CONTEXT, val, attributes);
logger::debug!("Attributes: {:?}, Buckets fetched: {}", attributes, val);
}
for (id, value) in data {
logger::debug!(bucket_id=?id, bucket_value=?value, "Bucket row for distribution {distribution}");
let metrics_builder = metrics_accumulator.entry(id).or_default();
match distribution {
RefundDistributions::SessionizedRefundReason => metrics_builder
.refund_reason_distribution
.add_distribution_bucket(&value),
RefundDistributions::SessionizedRefundErrorMessage => metrics_builder
.refund_error_message_distribution
.add_distribution_bucket(&value),
}
}
logger::debug!(
"Analytics Accumulated Results: distribution: {}, results: {:#?}",
distribution,
metrics_accumulator
);
}
}
}
let mut success = 0;
let mut total = 0;
let mut total_refund_processed_amount = 0;
let mut total_refund_processed_amount_in_usd = 0;
let mut total_refund_processed_count = 0;
let mut total_refund_reason_count = 0;
let mut total_refund_error_message_count = 0;
let query_data: Vec<RefundMetricsBucketResponse> = metrics_accumulator
.into_iter()
.map(|(id, val)| {
let mut collected_values = val.collect();
if let Some(success_count) = collected_values.successful_refunds {
success += success_count;
}
if let Some(total_count) = collected_values.total_refunds {
total += total_count;
}
if let Some(amount) = collected_values.refund_processed_amount {
let amount_in_usd = id
.currency
@ -142,18 +238,34 @@ pub async fn get_metrics(
total_refund_processed_amount += amount;
total_refund_processed_amount_in_usd += amount_in_usd.unwrap_or(0);
}
if let Some(count) = collected_values.refund_processed_count {
total_refund_processed_count += count;
}
if let Some(total_count) = collected_values.refund_reason_count {
total_refund_reason_count += total_count;
}
if let Some(total_count) = collected_values.refund_error_message_count {
total_refund_error_message_count += total_count;
}
RefundMetricsBucketResponse {
values: collected_values,
dimensions: id,
}
})
.collect();
let total_refund_success_rate = match (success, total) {
(s, t) if t > 0 => Some(f64::from(s) * 100.0 / f64::from(t)),
_ => None,
};
Ok(RefundsMetricsResponse {
query_data,
meta_data: [RefundsAnalyticsMetadata {
total_refund_success_rate,
total_refund_processed_amount: Some(total_refund_processed_amount),
total_refund_processed_amount_in_usd: Some(total_refund_processed_amount_in_usd),
total_refund_processed_count: Some(total_refund_processed_count),
total_refund_reason_count: Some(total_refund_reason_count),
total_refund_error_message_count: Some(total_refund_error_message_count),
}],
})
}
@ -229,6 +341,8 @@ pub async fn get_filters(
RefundDimensions::Connector => fil.connector,
RefundDimensions::RefundType => fil.refund_type.map(|i| i.as_ref().to_string()),
RefundDimensions::ProfileId => fil.profile_id,
RefundDimensions::RefundReason => fil.refund_reason,
RefundDimensions::RefundErrorMessage => fil.refund_error_message,
})
.collect::<Vec<String>>();
res.query_data.push(RefundFilterValue {

View File

@ -0,0 +1,105 @@
use api_models::analytics::{
refunds::{
RefundDimensions, RefundDistributions, RefundFilters, RefundMetricsBucketIdentifier,
RefundType,
},
Granularity, RefundDistributionBody, TimeRange,
};
use diesel_models::enums as storage_enums;
use time::PrimitiveDateTime;
use crate::{
enums::AuthInfo,
query::{Aggregate, GroupByClause, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, DBEnumWrapper, LoadRow, MetricsResult},
};
mod sessionized_distribution;
#[derive(Debug, PartialEq, Eq, serde::Deserialize)]
pub struct RefundDistributionRow {
pub currency: Option<DBEnumWrapper<storage_enums::Currency>>,
pub refund_status: Option<DBEnumWrapper<storage_enums::RefundStatus>>,
pub connector: Option<String>,
pub refund_type: Option<DBEnumWrapper<RefundType>>,
pub profile_id: Option<String>,
pub total: Option<bigdecimal::BigDecimal>,
pub count: Option<i64>,
pub refund_reason: Option<String>,
pub refund_error_message: Option<String>,
#[serde(with = "common_utils::custom_serde::iso8601::option")]
pub start_bucket: Option<PrimitiveDateTime>,
#[serde(with = "common_utils::custom_serde::iso8601::option")]
pub end_bucket: Option<PrimitiveDateTime>,
}
pub trait RefundDistributionAnalytics: LoadRow<RefundDistributionRow> {}
#[async_trait::async_trait]
pub trait RefundDistribution<T>
where
T: AnalyticsDataSource + RefundDistributionAnalytics,
{
#[allow(clippy::too_many_arguments)]
async fn load_distribution(
&self,
distribution: &RefundDistributionBody,
dimensions: &[RefundDimensions],
auth: &AuthInfo,
filters: &RefundFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(RefundMetricsBucketIdentifier, RefundDistributionRow)>>;
}
#[async_trait::async_trait]
impl<T> RefundDistribution<T> for RefundDistributions
where
T: AnalyticsDataSource + RefundDistributionAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_distribution(
&self,
distribution: &RefundDistributionBody,
dimensions: &[RefundDimensions],
auth: &AuthInfo,
filters: &RefundFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(RefundMetricsBucketIdentifier, RefundDistributionRow)>> {
match self {
Self::SessionizedRefundReason => {
sessionized_distribution::RefundReason
.load_distribution(
distribution,
dimensions,
auth,
filters,
granularity,
time_range,
pool,
)
.await
}
Self::SessionizedRefundErrorMessage => {
sessionized_distribution::RefundErrorMessage
.load_distribution(
distribution,
dimensions,
auth,
filters,
granularity,
time_range,
pool,
)
.await
}
}
}
}

View File

@ -0,0 +1,7 @@
mod refund_error_message;
mod refund_reason;
pub(super) use refund_error_message::RefundErrorMessage;
pub(super) use refund_reason::RefundReason;
pub use super::{RefundDistribution, RefundDistributionAnalytics, RefundDistributionRow};

View File

@ -0,0 +1,177 @@
use api_models::analytics::{
refunds::{RefundDimensions, RefundFilters, RefundMetricsBucketIdentifier},
Granularity, RefundDistributionBody, TimeRange,
};
use common_utils::errors::ReportSwitchExt;
use diesel_models::enums as storage_enums;
use error_stack::ResultExt;
use time::PrimitiveDateTime;
use super::{RefundDistribution, RefundDistributionRow};
use crate::{
enums::AuthInfo,
query::{
Aggregate, GroupByClause, Order, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window,
},
types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult},
};
#[derive(Default)]
pub(crate) struct RefundErrorMessage;
#[async_trait::async_trait]
impl<T> RefundDistribution<T> for RefundErrorMessage
where
T: AnalyticsDataSource + super::RefundDistributionAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_distribution(
&self,
distribution: &RefundDistributionBody,
dimensions: &[RefundDimensions],
auth: &AuthInfo,
filters: &RefundFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(RefundMetricsBucketIdentifier, RefundDistributionRow)>> {
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::RefundSessionized);
for dim in dimensions.iter() {
query_builder.add_select_column(dim).switch()?;
}
query_builder
.add_select_column(&distribution.distribution_for)
.switch()?;
query_builder
.add_select_column(Aggregate::Count {
field: None,
alias: Some("count"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Min {
field: "created_at",
alias: Some("start_bucket"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Max {
field: "created_at",
alias: Some("end_bucket"),
})
.switch()?;
filters.set_filter_clause(&mut query_builder).switch()?;
auth.set_filter_clause(&mut query_builder).switch()?;
time_range
.set_filter_clause(&mut query_builder)
.attach_printable("Error filtering time range")
.switch()?;
query_builder
.add_filter_clause(
RefundDimensions::RefundStatus,
storage_enums::RefundStatus::Failure,
)
.switch()?;
for dim in dimensions.iter() {
query_builder
.add_group_by_clause(dim)
.attach_printable("Error grouping by dimensions")
.switch()?;
}
query_builder
.add_group_by_clause(&distribution.distribution_for)
.attach_printable("Error grouping by distribution_for")
.switch()?;
if let Some(granularity) = granularity.as_ref() {
granularity
.set_group_by_clause(&mut query_builder)
.attach_printable("Error adding granularity")
.switch()?;
}
for dim in dimensions.iter() {
query_builder.add_outer_select_column(dim).switch()?;
}
query_builder
.add_outer_select_column(&distribution.distribution_for)
.switch()?;
query_builder.add_outer_select_column("count").switch()?;
query_builder
.add_outer_select_column("start_bucket")
.switch()?;
query_builder
.add_outer_select_column("end_bucket")
.switch()?;
let sql_dimensions = query_builder.transform_to_sql_values(dimensions).switch()?;
query_builder
.add_outer_select_column(Window::Sum {
field: "count",
partition_by: Some(sql_dimensions),
order_by: None,
alias: Some("total"),
})
.switch()?;
query_builder
.add_top_n_clause(
dimensions,
distribution.distribution_cardinality.into(),
"count",
Order::Descending,
)
.switch()?;
query_builder
.execute_query::<RefundDistributionRow, _>(pool)
.await
.change_context(MetricsError::QueryBuildingError)?
.change_context(MetricsError::QueryExecutionFailure)?
.into_iter()
.map(|i| {
Ok((
RefundMetricsBucketIdentifier::new(
i.currency.as_ref().map(|i| i.0),
i.refund_status.as_ref().map(|i| i.0.to_string()),
i.connector.clone(),
i.refund_type.as_ref().map(|i| i.0.to_string()),
i.profile_id.clone(),
i.refund_reason.clone(),
i.refund_error_message.clone(),
TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,
_ => time_range.start_time,
},
end_time: granularity.as_ref().map_or_else(
|| Ok(time_range.end_time),
|g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(),
)?,
},
),
i,
))
})
.collect::<error_stack::Result<
Vec<(RefundMetricsBucketIdentifier, RefundDistributionRow)>,
crate::query::PostProcessingError,
>>()
.change_context(MetricsError::PostProcessingFailure)
}
}

View File

@ -0,0 +1,169 @@
use api_models::analytics::{
refunds::{RefundDimensions, RefundFilters, RefundMetricsBucketIdentifier},
Granularity, RefundDistributionBody, TimeRange,
};
use common_utils::errors::ReportSwitchExt;
use error_stack::ResultExt;
use time::PrimitiveDateTime;
use super::{RefundDistribution, RefundDistributionRow};
use crate::{
enums::AuthInfo,
query::{
Aggregate, GroupByClause, Order, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window,
},
types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult},
};
#[derive(Default)]
pub(crate) struct RefundReason;
#[async_trait::async_trait]
impl<T> RefundDistribution<T> for RefundReason
where
T: AnalyticsDataSource + super::RefundDistributionAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_distribution(
&self,
distribution: &RefundDistributionBody,
dimensions: &[RefundDimensions],
auth: &AuthInfo,
filters: &RefundFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(RefundMetricsBucketIdentifier, RefundDistributionRow)>> {
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::RefundSessionized);
for dim in dimensions.iter() {
query_builder.add_select_column(dim).switch()?;
}
query_builder
.add_select_column(&distribution.distribution_for)
.switch()?;
query_builder
.add_select_column(Aggregate::Count {
field: None,
alias: Some("count"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Min {
field: "created_at",
alias: Some("start_bucket"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Max {
field: "created_at",
alias: Some("end_bucket"),
})
.switch()?;
filters.set_filter_clause(&mut query_builder).switch()?;
auth.set_filter_clause(&mut query_builder).switch()?;
time_range
.set_filter_clause(&mut query_builder)
.attach_printable("Error filtering time range")
.switch()?;
for dim in dimensions.iter() {
query_builder
.add_group_by_clause(dim)
.attach_printable("Error grouping by dimensions")
.switch()?;
}
query_builder
.add_group_by_clause(&distribution.distribution_for)
.attach_printable("Error grouping by distribution_for")
.switch()?;
if let Some(granularity) = granularity.as_ref() {
granularity
.set_group_by_clause(&mut query_builder)
.attach_printable("Error adding granularity")
.switch()?;
}
for dim in dimensions.iter() {
query_builder.add_outer_select_column(dim).switch()?;
}
query_builder
.add_outer_select_column(&distribution.distribution_for)
.switch()?;
query_builder.add_outer_select_column("count").switch()?;
query_builder
.add_outer_select_column("start_bucket")
.switch()?;
query_builder
.add_outer_select_column("end_bucket")
.switch()?;
let sql_dimensions = query_builder.transform_to_sql_values(dimensions).switch()?;
query_builder
.add_outer_select_column(Window::Sum {
field: "count",
partition_by: Some(sql_dimensions),
order_by: None,
alias: Some("total"),
})
.switch()?;
query_builder
.add_top_n_clause(
dimensions,
distribution.distribution_cardinality.into(),
"count",
Order::Descending,
)
.switch()?;
query_builder
.execute_query::<RefundDistributionRow, _>(pool)
.await
.change_context(MetricsError::QueryBuildingError)?
.change_context(MetricsError::QueryExecutionFailure)?
.into_iter()
.map(|i| {
Ok((
RefundMetricsBucketIdentifier::new(
i.currency.as_ref().map(|i| i.0),
i.refund_status.as_ref().map(|i| i.0.to_string()),
i.connector.clone(),
i.refund_type.as_ref().map(|i| i.0.to_string()),
i.profile_id.clone(),
i.refund_reason.clone(),
i.refund_error_message.clone(),
TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,
_ => time_range.start_time,
},
end_time: granularity.as_ref().map_or_else(
|| Ok(time_range.end_time),
|g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(),
)?,
},
),
i,
))
})
.collect::<error_stack::Result<
Vec<(RefundMetricsBucketIdentifier, RefundDistributionRow)>,
crate::query::PostProcessingError,
>>()
.change_context(MetricsError::PostProcessingFailure)
}
}

View File

@ -56,4 +56,6 @@ pub struct RefundFilterRow {
pub connector: Option<String>,
pub refund_type: Option<DBEnumWrapper<RefundType>>,
pub profile_id: Option<String>,
pub refund_reason: Option<String>,
pub refund_error_message: Option<String>,
}

View File

@ -31,6 +31,8 @@ pub struct RefundMetricRow {
pub connector: Option<String>,
pub refund_type: Option<DBEnumWrapper<RefundType>>,
pub profile_id: Option<String>,
pub refund_reason: Option<String>,
pub refund_error_message: Option<String>,
pub total: Option<bigdecimal::BigDecimal>,
pub count: Option<i64>,
#[serde(with = "common_utils::custom_serde::iso8601::option")]
@ -122,6 +124,16 @@ where
.load_metrics(dimensions, auth, filters, granularity, time_range, pool)
.await
}
Self::SessionizedRefundReason => {
sessionized_metrics::RefundReason
.load_metrics(dimensions, auth, filters, granularity, time_range, pool)
.await
}
Self::SessionizedRefundErrorMessage => {
sessionized_metrics::RefundErrorMessage
.load_metrics(dimensions, auth, filters, granularity, time_range, pool)
.await
}
}
}
}

View File

@ -99,6 +99,8 @@ where
i.connector.clone(),
i.refund_type.as_ref().map(|i| i.0.to_string()),
i.profile_id.clone(),
i.refund_reason.clone(),
i.refund_error_message.clone(),
TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,

View File

@ -107,6 +107,8 @@ where
i.connector.clone(),
i.refund_type.as_ref().map(|i| i.0.to_string()),
i.profile_id.clone(),
i.refund_reason.clone(),
i.refund_error_message.clone(),
TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,

View File

@ -102,6 +102,8 @@ where
i.connector.clone(),
i.refund_type.as_ref().map(|i| i.0.to_string()),
i.profile_id.clone(),
i.refund_reason.clone(),
i.refund_error_message.clone(),
TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,

View File

@ -97,6 +97,8 @@ where
i.connector.clone(),
i.refund_type.as_ref().map(|i| i.0.to_string()),
i.profile_id.clone(),
i.refund_reason.clone(),
i.refund_error_message.clone(),
TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,

View File

@ -1,10 +1,14 @@
mod refund_count;
mod refund_error_message;
mod refund_processed_amount;
mod refund_reason;
mod refund_success_count;
mod refund_success_rate;
pub(super) use refund_count::RefundCount;
pub(super) use refund_error_message::RefundErrorMessage;
pub(super) use refund_processed_amount::RefundProcessedAmount;
pub(super) use refund_reason::RefundReason;
pub(super) use refund_success_count::RefundSuccessCount;
pub(super) use refund_success_rate::RefundSuccessRate;

View File

@ -100,6 +100,8 @@ where
i.connector.clone(),
i.refund_type.as_ref().map(|i| i.0.to_string()),
i.profile_id.clone(),
i.refund_reason.clone(),
i.refund_error_message.clone(),
TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,

View File

@ -0,0 +1,190 @@
use std::collections::HashSet;
use api_models::analytics::{
refunds::{RefundDimensions, RefundFilters, RefundMetricsBucketIdentifier},
Granularity, TimeRange,
};
use common_utils::errors::ReportSwitchExt;
use diesel_models::enums as storage_enums;
use error_stack::ResultExt;
use time::PrimitiveDateTime;
use super::RefundMetricRow;
use crate::{
enums::AuthInfo,
query::{
Aggregate, FilterTypes, GroupByClause, Order, QueryBuilder, QueryFilter, SeriesBucket,
ToSql, Window,
},
types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult},
};
#[derive(Default)]
pub(crate) struct RefundErrorMessage;
#[async_trait::async_trait]
impl<T> super::RefundMetric<T> for RefundErrorMessage
where
T: AnalyticsDataSource + super::RefundMetricAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_metrics(
&self,
dimensions: &[RefundDimensions],
auth: &AuthInfo,
filters: &RefundFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<HashSet<(RefundMetricsBucketIdentifier, RefundMetricRow)>> {
let mut inner_query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::RefundSessionized);
inner_query_builder
.add_select_column("sum(sign_flag)")
.switch()?;
inner_query_builder
.add_custom_filter_clause(
RefundDimensions::RefundErrorMessage,
"NULL",
FilterTypes::IsNotNull,
)
.switch()?;
time_range
.set_filter_clause(&mut inner_query_builder)
.attach_printable("Error filtering time range for inner query")
.switch()?;
let inner_query_string = inner_query_builder
.build_query()
.attach_printable("Error building inner query")
.change_context(MetricsError::QueryBuildingError)?;
let mut outer_query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::RefundSessionized);
for dim in dimensions.iter() {
outer_query_builder.add_select_column(dim).switch()?;
}
outer_query_builder
.add_select_column("sum(sign_flag) AS count")
.switch()?;
outer_query_builder
.add_select_column(format!("({}) AS total", inner_query_string))
.switch()?;
outer_query_builder
.add_select_column(Aggregate::Min {
field: "created_at",
alias: Some("start_bucket"),
})
.switch()?;
outer_query_builder
.add_select_column(Aggregate::Max {
field: "created_at",
alias: Some("end_bucket"),
})
.switch()?;
filters
.set_filter_clause(&mut outer_query_builder)
.switch()?;
auth.set_filter_clause(&mut outer_query_builder).switch()?;
time_range
.set_filter_clause(&mut outer_query_builder)
.attach_printable("Error filtering time range for outer query")
.switch()?;
outer_query_builder
.add_filter_clause(
RefundDimensions::RefundStatus,
storage_enums::RefundStatus::Failure,
)
.switch()?;
outer_query_builder
.add_custom_filter_clause(
RefundDimensions::RefundErrorMessage,
"NULL",
FilterTypes::IsNotNull,
)
.switch()?;
for dim in dimensions.iter() {
outer_query_builder
.add_group_by_clause(dim)
.attach_printable("Error grouping by dimensions")
.switch()?;
}
if let Some(granularity) = granularity.as_ref() {
granularity
.set_group_by_clause(&mut outer_query_builder)
.attach_printable("Error adding granularity")
.switch()?;
}
outer_query_builder
.add_order_by_clause("count", Order::Descending)
.attach_printable("Error adding order by clause")
.switch()?;
let filtered_dimensions: Vec<&RefundDimensions> = dimensions
.iter()
.filter(|&&dim| dim != RefundDimensions::RefundErrorMessage)
.collect();
for dim in &filtered_dimensions {
outer_query_builder
.add_order_by_clause(*dim, Order::Ascending)
.attach_printable("Error adding order by clause")
.switch()?;
}
outer_query_builder
.execute_query::<RefundMetricRow, _>(pool)
.await
.change_context(MetricsError::QueryBuildingError)?
.change_context(MetricsError::QueryExecutionFailure)?
.into_iter()
.map(|i| {
Ok((
RefundMetricsBucketIdentifier::new(
i.currency.as_ref().map(|i| i.0),
None,
i.connector.clone(),
i.refund_type.as_ref().map(|i| i.0.to_string()),
i.profile_id.clone(),
i.refund_reason.clone(),
i.refund_error_message.clone(),
TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,
_ => time_range.start_time,
},
end_time: granularity.as_ref().map_or_else(
|| Ok(time_range.end_time),
|g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(),
)?,
},
),
i,
))
})
.collect::<error_stack::Result<
HashSet<(RefundMetricsBucketIdentifier, RefundMetricRow)>,
crate::query::PostProcessingError,
>>()
.change_context(MetricsError::PostProcessingFailure)
}
}

View File

@ -47,6 +47,12 @@ where
query_builder.add_select_column(dim).switch()?;
}
query_builder
.add_select_column(Aggregate::Count {
field: None,
alias: Some("count"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Sum {
field: "refund_amount",
@ -109,6 +115,8 @@ where
i.connector.clone(),
i.refund_type.as_ref().map(|i| i.0.to_string()),
i.profile_id.clone(),
i.refund_reason.clone(),
i.refund_error_message.clone(),
TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,

View File

@ -0,0 +1,182 @@
use std::collections::HashSet;
use api_models::analytics::{
refunds::{RefundDimensions, RefundFilters, RefundMetricsBucketIdentifier},
Granularity, TimeRange,
};
use common_utils::errors::ReportSwitchExt;
use error_stack::ResultExt;
use time::PrimitiveDateTime;
use super::RefundMetricRow;
use crate::{
enums::AuthInfo,
query::{
Aggregate, FilterTypes, GroupByClause, Order, QueryBuilder, QueryFilter, SeriesBucket,
ToSql, Window,
},
types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult},
};
#[derive(Default)]
pub(crate) struct RefundReason;
#[async_trait::async_trait]
impl<T> super::RefundMetric<T> for RefundReason
where
T: AnalyticsDataSource + super::RefundMetricAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_metrics(
&self,
dimensions: &[RefundDimensions],
auth: &AuthInfo,
filters: &RefundFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<HashSet<(RefundMetricsBucketIdentifier, RefundMetricRow)>> {
let mut inner_query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::RefundSessionized);
inner_query_builder
.add_select_column("sum(sign_flag)")
.switch()?;
inner_query_builder
.add_custom_filter_clause(
RefundDimensions::RefundReason,
"NULL",
FilterTypes::IsNotNull,
)
.switch()?;
time_range
.set_filter_clause(&mut inner_query_builder)
.attach_printable("Error filtering time range for inner query")
.switch()?;
let inner_query_string = inner_query_builder
.build_query()
.attach_printable("Error building inner query")
.change_context(MetricsError::QueryBuildingError)?;
let mut outer_query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::RefundSessionized);
for dim in dimensions.iter() {
outer_query_builder.add_select_column(dim).switch()?;
}
outer_query_builder
.add_select_column("sum(sign_flag) AS count")
.switch()?;
outer_query_builder
.add_select_column(format!("({}) AS total", inner_query_string))
.switch()?;
outer_query_builder
.add_select_column(Aggregate::Min {
field: "created_at",
alias: Some("start_bucket"),
})
.switch()?;
outer_query_builder
.add_select_column(Aggregate::Max {
field: "created_at",
alias: Some("end_bucket"),
})
.switch()?;
filters
.set_filter_clause(&mut outer_query_builder)
.switch()?;
auth.set_filter_clause(&mut outer_query_builder).switch()?;
time_range
.set_filter_clause(&mut outer_query_builder)
.attach_printable("Error filtering time range for outer query")
.switch()?;
outer_query_builder
.add_custom_filter_clause(
RefundDimensions::RefundReason,
"NULL",
FilterTypes::IsNotNull,
)
.switch()?;
for dim in dimensions.iter() {
outer_query_builder
.add_group_by_clause(dim)
.attach_printable("Error grouping by dimensions")
.switch()?;
}
if let Some(granularity) = granularity.as_ref() {
granularity
.set_group_by_clause(&mut outer_query_builder)
.attach_printable("Error adding granularity")
.switch()?;
}
outer_query_builder
.add_order_by_clause("count", Order::Descending)
.attach_printable("Error adding order by clause")
.switch()?;
let filtered_dimensions: Vec<&RefundDimensions> = dimensions
.iter()
.filter(|&&dim| dim != RefundDimensions::RefundReason)
.collect();
for dim in &filtered_dimensions {
outer_query_builder
.add_order_by_clause(*dim, Order::Ascending)
.attach_printable("Error adding order by clause")
.switch()?;
}
outer_query_builder
.execute_query::<RefundMetricRow, _>(pool)
.await
.change_context(MetricsError::QueryBuildingError)?
.change_context(MetricsError::QueryExecutionFailure)?
.into_iter()
.map(|i| {
Ok((
RefundMetricsBucketIdentifier::new(
i.currency.as_ref().map(|i| i.0),
None,
i.connector.clone(),
i.refund_type.as_ref().map(|i| i.0.to_string()),
i.profile_id.clone(),
i.refund_reason.clone(),
i.refund_error_message.clone(),
TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,
_ => time_range.start_time,
},
end_time: granularity.as_ref().map_or_else(
|| Ok(time_range.end_time),
|g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(),
)?,
},
),
i,
))
})
.collect::<error_stack::Result<
HashSet<(RefundMetricsBucketIdentifier, RefundMetricRow)>,
crate::query::PostProcessingError,
>>()
.change_context(MetricsError::PostProcessingFailure)
}
}

View File

@ -102,6 +102,8 @@ where
i.connector.clone(),
i.refund_type.as_ref().map(|i| i.0.to_string()),
i.profile_id.clone(),
i.refund_reason.clone(),
i.refund_error_message.clone(),
TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,

View File

@ -97,6 +97,8 @@ where
i.connector.clone(),
i.refund_type.as_ref().map(|i| i.0.to_string()),
i.profile_id.clone(),
i.refund_reason.clone(),
i.refund_error_message.clone(),
TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,

View File

@ -42,6 +42,21 @@ where
.attach_printable("Error adding profile id filter")?;
}
if !self.refund_reason.is_empty() {
builder
.add_filter_in_range_clause(RefundDimensions::RefundReason, &self.refund_reason)
.attach_printable("Error adding refund reason filter")?;
}
if !self.refund_error_message.is_empty() {
builder
.add_filter_in_range_clause(
RefundDimensions::RefundErrorMessage,
&self.refund_error_message,
)
.attach_printable("Error adding refund error message filter")?;
}
Ok(())
}
}

View File

@ -154,6 +154,7 @@ impl super::payment_intents::filters::PaymentIntentFilterAnalytics for SqlxClien
impl super::payment_intents::metrics::PaymentIntentMetricAnalytics for SqlxClient {}
impl super::refunds::metrics::RefundMetricAnalytics for SqlxClient {}
impl super::refunds::filters::RefundFilterAnalytics for SqlxClient {}
impl super::refunds::distribution::RefundDistributionAnalytics for SqlxClient {}
impl super::disputes::filters::DisputeFilterAnalytics for SqlxClient {}
impl super::disputes::metrics::DisputeMetricAnalytics for SqlxClient {}
impl super::frm::metrics::FrmMetricAnalytics for SqlxClient {}
@ -214,6 +215,15 @@ impl<'a> FromRow<'a, PgRow> for super::refunds::metrics::RefundMetricRow {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
let refund_reason: Option<String> = row.try_get("refund_reason").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
let refund_error_message: Option<String> =
row.try_get("refund_error_message").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
let total: Option<bigdecimal::BigDecimal> = row.try_get("total").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
@ -235,6 +245,8 @@ impl<'a> FromRow<'a, PgRow> for super::refunds::metrics::RefundMetricRow {
connector,
refund_type,
profile_id,
refund_reason,
refund_error_message,
total,
count,
start_bucket,
@ -791,12 +803,88 @@ impl<'a> FromRow<'a, PgRow> for super::refunds::filters::RefundFilterRow {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
let refund_reason: Option<String> = row.try_get("refund_reason").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
let refund_error_message: Option<String> =
row.try_get("refund_error_message").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
Ok(Self {
currency,
refund_status,
connector,
refund_type,
profile_id,
refund_reason,
refund_error_message,
})
}
}
impl<'a> FromRow<'a, PgRow> for super::refunds::distribution::RefundDistributionRow {
fn from_row(row: &'a PgRow) -> sqlx::Result<Self> {
let currency: Option<DBEnumWrapper<Currency>> =
row.try_get("currency").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
let refund_status: Option<DBEnumWrapper<RefundStatus>> =
row.try_get("refund_status").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
let connector: Option<String> = row.try_get("connector").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
let refund_type: Option<DBEnumWrapper<RefundType>> =
row.try_get("refund_type").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
let profile_id: Option<String> = row.try_get("profile_id").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
let total: Option<bigdecimal::BigDecimal> = row.try_get("total").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
let count: Option<i64> = row.try_get("count").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
let refund_reason: Option<String> = row.try_get("refund_reason").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
let refund_error_message: Option<String> =
row.try_get("refund_error_message").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
// Removing millisecond precision to get accurate diffs against clickhouse
let start_bucket: Option<PrimitiveDateTime> = row
.try_get::<Option<PrimitiveDateTime>, _>("start_bucket")?
.and_then(|dt| dt.replace_millisecond(0).ok());
let end_bucket: Option<PrimitiveDateTime> = row
.try_get::<Option<PrimitiveDateTime>, _>("end_bucket")?
.and_then(|dt| dt.replace_millisecond(0).ok());
Ok(Self {
currency,
refund_status,
connector,
refund_type,
profile_id,
total,
count,
refund_reason,
refund_error_message,
start_bucket,
end_bucket,
})
}
}

View File

@ -12,7 +12,7 @@ use self::{
frm::{FrmDimensions, FrmMetrics},
payment_intents::{PaymentIntentDimensions, PaymentIntentMetrics},
payments::{PaymentDimensions, PaymentDistributions, PaymentMetrics},
refunds::{RefundDimensions, RefundMetrics},
refunds::{RefundDimensions, RefundDistributions, RefundMetrics},
sdk_events::{SdkEventDimensions, SdkEventMetrics},
};
pub mod active_payments;
@ -73,7 +73,7 @@ pub struct GetPaymentMetricRequest {
#[serde(default)]
pub filters: payments::PaymentFilters,
pub metrics: HashSet<PaymentMetrics>,
pub distribution: Option<Distribution>,
pub distribution: Option<PaymentDistributionBody>,
#[serde(default)]
pub delta: bool,
}
@ -98,11 +98,18 @@ impl Into<u64> for QueryLimit {
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Distribution {
pub struct PaymentDistributionBody {
pub distribution_for: PaymentDistributions,
pub distribution_cardinality: QueryLimit,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct RefundDistributionBody {
pub distribution_for: RefundDistributions,
pub distribution_cardinality: QueryLimit,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ReportRequest {
@ -142,6 +149,7 @@ pub struct GetRefundMetricRequest {
#[serde(default)]
pub filters: refunds::RefundFilters,
pub metrics: HashSet<RefundMetrics>,
pub distribution: Option<RefundDistributionBody>,
#[serde(default)]
pub delta: bool,
}
@ -230,8 +238,12 @@ pub struct PaymentIntentsAnalyticsMetadata {
#[derive(Debug, serde::Serialize)]
pub struct RefundsAnalyticsMetadata {
pub total_refund_success_rate: Option<f64>,
pub total_refund_processed_amount: Option<u64>,
pub total_refund_processed_amount_in_usd: Option<u64>,
pub total_refund_processed_count: Option<u64>,
pub total_refund_reason_count: Option<u64>,
pub total_refund_error_message_count: Option<u64>,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]

View File

@ -43,6 +43,10 @@ pub struct RefundFilters {
pub refund_type: Vec<RefundType>,
#[serde(default)]
pub profile_id: Vec<id_type::ProfileId>,
#[serde(default)]
pub refund_reason: Vec<String>,
#[serde(default)]
pub refund_error_message: Vec<String>,
}
#[derive(
@ -67,6 +71,8 @@ pub enum RefundDimensions {
Connector,
RefundType,
ProfileId,
RefundReason,
RefundErrorMessage,
}
#[derive(
@ -92,6 +98,44 @@ pub enum RefundMetrics {
SessionizedRefundCount,
SessionizedRefundSuccessCount,
SessionizedRefundProcessedAmount,
SessionizedRefundReason,
SessionizedRefundErrorMessage,
}
#[derive(Debug, Default, serde::Serialize)]
pub struct ReasonsResult {
pub reason: String,
pub count: i64,
pub percentage: f64,
}
#[derive(Debug, Default, serde::Serialize)]
pub struct ErrorMessagesResult {
pub error_message: String,
pub count: i64,
pub percentage: f64,
}
#[derive(
Clone,
Copy,
Debug,
Hash,
PartialEq,
Eq,
serde::Serialize,
serde::Deserialize,
strum::Display,
strum::EnumIter,
strum::AsRefStr,
)]
#[strum(serialize_all = "snake_case")]
#[serde(rename_all = "snake_case")]
pub enum RefundDistributions {
#[strum(serialize = "refund_reason")]
SessionizedRefundReason,
#[strum(serialize = "refund_error_message")]
SessionizedRefundErrorMessage,
}
pub mod metric_behaviour {
@ -124,9 +168,10 @@ pub struct RefundMetricsBucketIdentifier {
pub currency: Option<Currency>,
pub refund_status: Option<String>,
pub connector: Option<String>,
pub refund_type: Option<String>,
pub profile_id: Option<String>,
pub refund_reason: Option<String>,
pub refund_error_message: Option<String>,
#[serde(rename = "time_range")]
pub time_bucket: TimeRange,
#[serde(rename = "time_bucket")]
@ -141,6 +186,8 @@ impl Hash for RefundMetricsBucketIdentifier {
self.connector.hash(state);
self.refund_type.hash(state);
self.profile_id.hash(state);
self.refund_reason.hash(state);
self.refund_error_message.hash(state);
self.time_bucket.hash(state);
}
}
@ -155,12 +202,15 @@ impl PartialEq for RefundMetricsBucketIdentifier {
}
impl RefundMetricsBucketIdentifier {
#[allow(clippy::too_many_arguments)]
pub fn new(
currency: Option<Currency>,
refund_status: Option<String>,
connector: Option<String>,
refund_type: Option<String>,
profile_id: Option<String>,
refund_reason: Option<String>,
refund_error_message: Option<String>,
normalized_time_range: TimeRange,
) -> Self {
Self {
@ -169,6 +219,8 @@ impl RefundMetricsBucketIdentifier {
connector,
refund_type,
profile_id,
refund_reason,
refund_error_message,
time_bucket: normalized_time_range,
start_time: normalized_time_range.start_time,
}
@ -176,11 +228,18 @@ impl RefundMetricsBucketIdentifier {
}
#[derive(Debug, serde::Serialize)]
pub struct RefundMetricsBucketValue {
pub successful_refunds: Option<u32>,
pub total_refunds: Option<u32>,
pub refund_success_rate: Option<f64>,
pub refund_count: Option<u64>,
pub refund_success_count: Option<u64>,
pub refund_processed_amount: Option<u64>,
pub refund_processed_amount_in_usd: Option<u64>,
pub refund_processed_count: Option<u64>,
pub refund_reason_distribution: Option<Vec<ReasonsResult>>,
pub refund_error_message_distribution: Option<Vec<ErrorMessagesResult>>,
pub refund_reason_count: Option<u64>,
pub refund_error_message_count: Option<u64>,
}
#[derive(Debug, serde::Serialize)]
pub struct RefundMetricsBucketResponse {