From 7473182b309c344d486aa5e363f49b71ca17e05a Mon Sep 17 00:00:00 2001 From: Sandeep Kumar <83278309+tsdk02@users.noreply.github.com> Date: Thu, 6 Mar 2025 21:49:44 +0530 Subject: [PATCH] feat(analytics): add new filters, dimensions and metrics for authentication analytics (#7451) Co-authored-by: Sandeep Kumar --- crates/analytics/src/auth_events.rs | 4 +- .../analytics/src/auth_events/accumulator.rs | 25 ++++ crates/analytics/src/auth_events/core.rs | 105 +++++++++++-- crates/analytics/src/auth_events/filters.rs | 60 ++++++++ crates/analytics/src/auth_events/metrics.rs | 121 +++++++++++++-- .../metrics/authentication_attempt_count.rs | 38 ++++- .../metrics/authentication_count.rs | 38 ++++- .../metrics/authentication_error_message.rs | 138 ++++++++++++++++++ .../metrics/authentication_funnel.rs | 133 +++++++++++++++++ .../metrics/authentication_success_count.rs | 38 ++++- .../metrics/challenge_attempt_count.rs | 38 ++++- .../metrics/challenge_flow_count.rs | 37 ++++- .../metrics/challenge_success_count.rs | 38 ++++- .../metrics/frictionless_flow_count.rs | 38 ++++- .../metrics/frictionless_success_count.rs | 38 ++++- crates/analytics/src/auth_events/types.rs | 58 ++++++++ crates/analytics/src/clickhouse.rs | 12 ++ crates/analytics/src/core.rs | 2 +- crates/analytics/src/lib.rs | 18 ++- crates/analytics/src/query.rs | 9 +- crates/analytics/src/sqlx.rs | 94 ++++++++++++ crates/analytics/src/utils.rs | 12 +- crates/api_models/src/analytics.rs | 39 ++++- .../api_models/src/analytics/auth_events.rs | 73 ++++++++- crates/api_models/src/events.rs | 9 ++ crates/common_enums/src/enums.rs | 1 + crates/router/src/analytics.rs | 42 +++++- 27 files changed, 1177 insertions(+), 81 deletions(-) create mode 100644 crates/analytics/src/auth_events/filters.rs create mode 100644 crates/analytics/src/auth_events/metrics/authentication_error_message.rs create mode 100644 crates/analytics/src/auth_events/metrics/authentication_funnel.rs create mode 100644 crates/analytics/src/auth_events/types.rs diff --git a/crates/analytics/src/auth_events.rs b/crates/analytics/src/auth_events.rs index e708c3c883..3aa23d0793 100644 --- a/crates/analytics/src/auth_events.rs +++ b/crates/analytics/src/auth_events.rs @@ -1,6 +1,8 @@ pub mod accumulator; mod core; +pub mod filters; pub mod metrics; +pub mod types; pub use accumulator::{AuthEventMetricAccumulator, AuthEventMetricsAccumulator}; -pub use self::core::get_metrics; +pub use self::core::{get_filters, get_metrics}; diff --git a/crates/analytics/src/auth_events/accumulator.rs b/crates/analytics/src/auth_events/accumulator.rs index 446ac6ac8c..13818d2bd4 100644 --- a/crates/analytics/src/auth_events/accumulator.rs +++ b/crates/analytics/src/auth_events/accumulator.rs @@ -6,12 +6,14 @@ use super::metrics::AuthEventMetricRow; pub struct AuthEventMetricsAccumulator { pub authentication_count: CountAccumulator, pub authentication_attempt_count: CountAccumulator, + pub authentication_error_message: AuthenticationErrorMessageAccumulator, pub authentication_success_count: CountAccumulator, pub challenge_flow_count: CountAccumulator, pub challenge_attempt_count: CountAccumulator, pub challenge_success_count: CountAccumulator, pub frictionless_flow_count: CountAccumulator, pub frictionless_success_count: CountAccumulator, + pub authentication_funnel: CountAccumulator, } #[derive(Debug, Default)] @@ -20,6 +22,11 @@ pub struct CountAccumulator { pub count: Option, } +#[derive(Debug, Default)] +pub struct AuthenticationErrorMessageAccumulator { + pub count: Option, +} + pub trait AuthEventMetricAccumulator { type MetricOutput; @@ -44,6 +51,22 @@ impl AuthEventMetricAccumulator for CountAccumulator { } } +impl AuthEventMetricAccumulator for AuthenticationErrorMessageAccumulator { + type MetricOutput = Option; + #[inline] + fn add_metrics_bucket(&mut self, metrics: &AuthEventMetricRow) { + self.count = match (self.count, metrics.count) { + (None, None) => None, + (None, i @ Some(_)) | (i @ Some(_), None) => i, + (Some(a), Some(b)) => Some(a + b), + } + } + #[inline] + fn collect(self) -> Self::MetricOutput { + self.count.and_then(|i| u64::try_from(i).ok()) + } +} + impl AuthEventMetricsAccumulator { pub fn collect(self) -> AuthEventMetricsBucketValue { AuthEventMetricsBucketValue { @@ -55,6 +78,8 @@ impl AuthEventMetricsAccumulator { challenge_success_count: self.challenge_success_count.collect(), frictionless_flow_count: self.frictionless_flow_count.collect(), frictionless_success_count: self.frictionless_success_count.collect(), + error_message_count: self.authentication_error_message.collect(), + authentication_funnel: self.authentication_funnel.collect(), } } } diff --git a/crates/analytics/src/auth_events/core.rs b/crates/analytics/src/auth_events/core.rs index 75bdf4de14..a2640be6ea 100644 --- a/crates/analytics/src/auth_events/core.rs +++ b/crates/analytics/src/auth_events/core.rs @@ -1,13 +1,20 @@ use std::collections::HashMap; use api_models::analytics::{ - auth_events::{AuthEventMetrics, AuthEventMetricsBucketIdentifier, MetricsBucketResponse}, - AnalyticsMetadata, GetAuthEventMetricRequest, MetricsResponse, + auth_events::{ + AuthEventDimensions, AuthEventMetrics, AuthEventMetricsBucketIdentifier, + MetricsBucketResponse, + }, + AuthEventFilterValue, AuthEventFiltersResponse, AuthEventMetricsResponse, + AuthEventsAnalyticsMetadata, GetAuthEventFilterRequest, GetAuthEventMetricRequest, }; -use error_stack::ResultExt; +use error_stack::{report, ResultExt}; use router_env::{instrument, tracing}; -use super::AuthEventMetricsAccumulator; +use super::{ + filters::{get_auth_events_filter_for_dimension, AuthEventFilterRow}, + AuthEventMetricsAccumulator, +}; use crate::{ auth_events::AuthEventMetricAccumulator, errors::{AnalyticsError, AnalyticsResult}, @@ -19,7 +26,7 @@ pub async fn get_metrics( pool: &AnalyticsProvider, merchant_id: &common_utils::id_type::MerchantId, req: GetAuthEventMetricRequest, -) -> AnalyticsResult> { +) -> AnalyticsResult> { let mut metrics_accumulator: HashMap< AuthEventMetricsBucketIdentifier, AuthEventMetricsAccumulator, @@ -34,7 +41,9 @@ pub async fn get_metrics( let data = pool .get_auth_event_metrics( &metric_type, + &req.group_by_names.clone(), &merchant_id_scoped, + &req.filters, req.time_series.map(|t| t.granularity), &req.time_range, ) @@ -77,22 +86,94 @@ pub async fn get_metrics( AuthEventMetrics::FrictionlessSuccessCount => metrics_builder .frictionless_success_count .add_metrics_bucket(&value), + AuthEventMetrics::AuthenticationErrorMessage => metrics_builder + .authentication_error_message + .add_metrics_bucket(&value), + AuthEventMetrics::AuthenticationFunnel => metrics_builder + .authentication_funnel + .add_metrics_bucket(&value), } } } + let mut total_error_message_count = 0; let query_data: Vec = metrics_accumulator .into_iter() - .map(|(id, val)| MetricsBucketResponse { - values: val.collect(), - dimensions: id, + .map(|(id, val)| { + let collected_values = val.collect(); + if let Some(count) = collected_values.error_message_count { + total_error_message_count += count; + } + MetricsBucketResponse { + values: collected_values, + dimensions: id, + } }) .collect(); - - Ok(MetricsResponse { + Ok(AuthEventMetricsResponse { query_data, - meta_data: [AnalyticsMetadata { - current_time_range: req.time_range, + meta_data: [AuthEventsAnalyticsMetadata { + total_error_message_count: Some(total_error_message_count), }], }) } + +pub async fn get_filters( + pool: &AnalyticsProvider, + req: GetAuthEventFilterRequest, + merchant_id: &common_utils::id_type::MerchantId, +) -> AnalyticsResult { + let mut res = AuthEventFiltersResponse::default(); + for dim in req.group_by_names { + let values = match pool { + AnalyticsProvider::Sqlx(_pool) => { + Err(report!(AnalyticsError::UnknownError)) + } + AnalyticsProvider::Clickhouse(pool) => { + get_auth_events_filter_for_dimension(dim, merchant_id, &req.time_range, pool) + .await + .map_err(|e| e.change_context(AnalyticsError::UnknownError)) + } + AnalyticsProvider::CombinedCkh(sqlx_pool, ckh_pool) | AnalyticsProvider::CombinedSqlx(sqlx_pool, ckh_pool) => { + let ckh_result = get_auth_events_filter_for_dimension( + dim, + merchant_id, + &req.time_range, + ckh_pool, + ) + .await + .map_err(|e| e.change_context(AnalyticsError::UnknownError)); + let sqlx_result = get_auth_events_filter_for_dimension( + dim, + merchant_id, + &req.time_range, + sqlx_pool, + ) + .await + .map_err(|e| e.change_context(AnalyticsError::UnknownError)); + match (&sqlx_result, &ckh_result) { + (Ok(ref sqlx_res), Ok(ref ckh_res)) if sqlx_res != ckh_res => { + router_env::logger::error!(clickhouse_result=?ckh_res, postgres_result=?sqlx_res, "Mismatch between clickhouse & postgres refunds analytics filters") + }, + _ => {} + }; + ckh_result + } + } + .change_context(AnalyticsError::UnknownError)? + .into_iter() + .filter_map(|fil: AuthEventFilterRow| match dim { + AuthEventDimensions::AuthenticationStatus => fil.authentication_status.map(|i| i.as_ref().to_string()), + AuthEventDimensions::TransactionStatus => fil.trans_status.map(|i| i.as_ref().to_string()), + AuthEventDimensions::ErrorMessage => fil.error_message, + AuthEventDimensions::AuthenticationConnector => fil.authentication_connector.map(|i| i.as_ref().to_string()), + AuthEventDimensions::MessageVersion => fil.message_version, + }) + .collect::>(); + res.query_data.push(AuthEventFilterValue { + dimension: dim, + values, + }) + } + Ok(res) +} diff --git a/crates/analytics/src/auth_events/filters.rs b/crates/analytics/src/auth_events/filters.rs new file mode 100644 index 0000000000..da8e0b7cfa --- /dev/null +++ b/crates/analytics/src/auth_events/filters.rs @@ -0,0 +1,60 @@ +use api_models::analytics::{auth_events::AuthEventDimensions, Granularity, TimeRange}; +use common_utils::errors::ReportSwitchExt; +use diesel_models::enums::{AuthenticationConnectors, AuthenticationStatus, TransactionStatus}; +use error_stack::ResultExt; +use time::PrimitiveDateTime; + +use crate::{ + query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, ToSql, Window}, + types::{ + AnalyticsCollection, AnalyticsDataSource, DBEnumWrapper, FiltersError, FiltersResult, + LoadRow, + }, +}; + +pub trait AuthEventFilterAnalytics: LoadRow {} + +pub async fn get_auth_events_filter_for_dimension( + dimension: AuthEventDimensions, + merchant_id: &common_utils::id_type::MerchantId, + time_range: &TimeRange, + pool: &T, +) -> FiltersResult> +where + T: AnalyticsDataSource + AuthEventFilterAnalytics, + PrimitiveDateTime: ToSql, + AnalyticsCollection: ToSql, + Granularity: GroupByClause, + Aggregate<&'static str>: ToSql, + Window<&'static str>: ToSql, +{ + let mut query_builder: QueryBuilder = + QueryBuilder::new(AnalyticsCollection::Authentications); + + query_builder.add_select_column(dimension).switch()?; + time_range + .set_filter_clause(&mut query_builder) + .attach_printable("Error filtering time range") + .switch()?; + + query_builder + .add_filter_clause("merchant_id", merchant_id) + .switch()?; + + query_builder.set_distinct(); + + query_builder + .execute_query::(pool) + .await + .change_context(FiltersError::QueryBuildingError)? + .change_context(FiltersError::QueryExecutionFailure) +} + +#[derive(Debug, serde::Serialize, Eq, PartialEq, serde::Deserialize)] +pub struct AuthEventFilterRow { + pub authentication_status: Option>, + pub trans_status: Option>, + pub error_message: Option, + pub authentication_connector: Option>, + pub message_version: Option, +} diff --git a/crates/analytics/src/auth_events/metrics.rs b/crates/analytics/src/auth_events/metrics.rs index f3f0354818..fd94aac614 100644 --- a/crates/analytics/src/auth_events/metrics.rs +++ b/crates/analytics/src/auth_events/metrics.rs @@ -1,18 +1,23 @@ use std::collections::HashSet; use api_models::analytics::{ - auth_events::{AuthEventMetrics, AuthEventMetricsBucketIdentifier}, + auth_events::{ + AuthEventDimensions, AuthEventFilters, AuthEventMetrics, AuthEventMetricsBucketIdentifier, + }, Granularity, TimeRange, }; +use diesel_models::enums as storage_enums; use time::PrimitiveDateTime; use crate::{ query::{Aggregate, GroupByClause, ToSql, Window}, - types::{AnalyticsCollection, AnalyticsDataSource, LoadRow, MetricsResult}, + types::{AnalyticsCollection, AnalyticsDataSource, DBEnumWrapper, LoadRow, MetricsResult}, }; mod authentication_attempt_count; mod authentication_count; +mod authentication_error_message; +mod authentication_funnel; mod authentication_success_count; mod challenge_attempt_count; mod challenge_flow_count; @@ -22,6 +27,8 @@ mod frictionless_success_count; use authentication_attempt_count::AuthenticationAttemptCount; use authentication_count::AuthenticationCount; +use authentication_error_message::AuthenticationErrorMessage; +use authentication_funnel::AuthenticationFunnel; use authentication_success_count::AuthenticationSuccessCount; use challenge_attempt_count::ChallengeAttemptCount; use challenge_flow_count::ChallengeFlowCount; @@ -32,7 +39,15 @@ use frictionless_success_count::FrictionlessSuccessCount; #[derive(Debug, PartialEq, Eq, serde::Deserialize, Hash)] pub struct AuthEventMetricRow { pub count: Option, - pub time_bucket: Option, + pub authentication_status: Option>, + pub trans_status: Option>, + pub error_message: Option, + pub authentication_connector: Option>, + pub message_version: Option, + #[serde(with = "common_utils::custom_serde::iso8601::option")] + pub start_bucket: Option, + #[serde(with = "common_utils::custom_serde::iso8601::option")] + pub end_bucket: Option, } pub trait AuthEventMetricAnalytics: LoadRow {} @@ -45,6 +60,8 @@ where async fn load_metrics( &self, merchant_id: &common_utils::id_type::MerchantId, + dimensions: &[AuthEventDimensions], + filters: &AuthEventFilters, granularity: Option, time_range: &TimeRange, pool: &T, @@ -64,6 +81,8 @@ where async fn load_metrics( &self, merchant_id: &common_utils::id_type::MerchantId, + dimensions: &[AuthEventDimensions], + filters: &AuthEventFilters, granularity: Option, time_range: &TimeRange, pool: &T, @@ -71,42 +90,122 @@ where match self { Self::AuthenticationCount => { AuthenticationCount - .load_metrics(merchant_id, granularity, time_range, pool) + .load_metrics( + merchant_id, + dimensions, + filters, + granularity, + time_range, + pool, + ) .await } Self::AuthenticationAttemptCount => { AuthenticationAttemptCount - .load_metrics(merchant_id, granularity, time_range, pool) + .load_metrics( + merchant_id, + dimensions, + filters, + granularity, + time_range, + pool, + ) .await } Self::AuthenticationSuccessCount => { AuthenticationSuccessCount - .load_metrics(merchant_id, granularity, time_range, pool) + .load_metrics( + merchant_id, + dimensions, + filters, + granularity, + time_range, + pool, + ) .await } Self::ChallengeFlowCount => { ChallengeFlowCount - .load_metrics(merchant_id, granularity, time_range, pool) + .load_metrics( + merchant_id, + dimensions, + filters, + granularity, + time_range, + pool, + ) .await } Self::ChallengeAttemptCount => { ChallengeAttemptCount - .load_metrics(merchant_id, granularity, time_range, pool) + .load_metrics( + merchant_id, + dimensions, + filters, + granularity, + time_range, + pool, + ) .await } Self::ChallengeSuccessCount => { ChallengeSuccessCount - .load_metrics(merchant_id, granularity, time_range, pool) + .load_metrics( + merchant_id, + dimensions, + filters, + granularity, + time_range, + pool, + ) .await } Self::FrictionlessFlowCount => { FrictionlessFlowCount - .load_metrics(merchant_id, granularity, time_range, pool) + .load_metrics( + merchant_id, + dimensions, + filters, + granularity, + time_range, + pool, + ) .await } Self::FrictionlessSuccessCount => { FrictionlessSuccessCount - .load_metrics(merchant_id, granularity, time_range, pool) + .load_metrics( + merchant_id, + dimensions, + filters, + granularity, + time_range, + pool, + ) + .await + } + Self::AuthenticationErrorMessage => { + AuthenticationErrorMessage + .load_metrics( + merchant_id, + dimensions, + filters, + granularity, + time_range, + pool, + ) + .await + } + Self::AuthenticationFunnel => { + AuthenticationFunnel + .load_metrics( + merchant_id, + dimensions, + filters, + granularity, + time_range, + pool, + ) .await } } diff --git a/crates/analytics/src/auth_events/metrics/authentication_attempt_count.rs b/crates/analytics/src/auth_events/metrics/authentication_attempt_count.rs index 2d34344905..32c7638540 100644 --- a/crates/analytics/src/auth_events/metrics/authentication_attempt_count.rs +++ b/crates/analytics/src/auth_events/metrics/authentication_attempt_count.rs @@ -1,7 +1,8 @@ use std::collections::HashSet; use api_models::analytics::{ - auth_events::AuthEventMetricsBucketIdentifier, Granularity, TimeRange, + auth_events::{AuthEventDimensions, AuthEventFilters, AuthEventMetricsBucketIdentifier}, + Granularity, TimeRange, }; use common_enums::AuthenticationStatus; use common_utils::errors::ReportSwitchExt; @@ -10,7 +11,7 @@ use time::PrimitiveDateTime; use super::AuthEventMetricRow; use crate::{ - query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, ToSql, Window}, + query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window}, types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult}, }; @@ -30,6 +31,8 @@ where async fn load_metrics( &self, merchant_id: &common_utils::id_type::MerchantId, + dimensions: &[AuthEventDimensions], + filters: &AuthEventFilters, granularity: Option, time_range: &TimeRange, pool: &T, @@ -37,6 +40,10 @@ where let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::Authentications); + for dim in dimensions.iter() { + query_builder.add_select_column(dim).switch()?; + } + query_builder .add_select_column(Aggregate::Count { field: None, @@ -65,12 +72,19 @@ where query_builder .add_negative_filter_clause("authentication_status", AuthenticationStatus::Pending) .switch()?; - + filters.set_filter_clause(&mut query_builder).switch()?; time_range .set_filter_clause(&mut query_builder) .attach_printable("Error filtering time range") .switch()?; + for dim in dimensions.iter() { + query_builder + .add_group_by_clause(dim) + .attach_printable("Error grouping by dimensions") + .switch()?; + } + if let Some(granularity) = granularity { granularity .set_group_by_clause(&mut query_builder) @@ -86,7 +100,23 @@ where .into_iter() .map(|i| { Ok(( - AuthEventMetricsBucketIdentifier::new(i.time_bucket.clone()), + AuthEventMetricsBucketIdentifier::new( + i.authentication_status.as_ref().map(|i| i.0), + i.trans_status.as_ref().map(|i| i.0.clone()), + i.error_message.clone(), + i.authentication_connector.as_ref().map(|i| i.0), + i.message_version.clone(), + TimeRange { + start_time: match (granularity, i.start_bucket) { + (Some(g), Some(st)) => g.clip_to_start(st)?, + _ => time_range.start_time, + }, + end_time: granularity.as_ref().map_or_else( + || Ok(time_range.end_time), + |g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(), + )?, + }, + ), i, )) }) diff --git a/crates/analytics/src/auth_events/metrics/authentication_count.rs b/crates/analytics/src/auth_events/metrics/authentication_count.rs index 9f2311f563..39df41f53a 100644 --- a/crates/analytics/src/auth_events/metrics/authentication_count.rs +++ b/crates/analytics/src/auth_events/metrics/authentication_count.rs @@ -1,7 +1,8 @@ use std::collections::HashSet; use api_models::analytics::{ - auth_events::AuthEventMetricsBucketIdentifier, Granularity, TimeRange, + auth_events::{AuthEventDimensions, AuthEventFilters, AuthEventMetricsBucketIdentifier}, + Granularity, TimeRange, }; use common_utils::errors::ReportSwitchExt; use error_stack::ResultExt; @@ -9,7 +10,7 @@ use time::PrimitiveDateTime; use super::AuthEventMetricRow; use crate::{ - query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, ToSql, Window}, + query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window}, types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult}, }; @@ -29,13 +30,17 @@ where async fn load_metrics( &self, merchant_id: &common_utils::id_type::MerchantId, + dimensions: &[AuthEventDimensions], + filters: &AuthEventFilters, granularity: Option, time_range: &TimeRange, pool: &T, ) -> MetricsResult> { let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::Authentications); - + for dim in dimensions.iter() { + query_builder.add_select_column(dim).switch()?; + } query_builder .add_select_column(Aggregate::Count { field: None, @@ -60,12 +65,19 @@ where query_builder .add_filter_clause("merchant_id", merchant_id) .switch()?; - + filters.set_filter_clause(&mut query_builder).switch()?; time_range .set_filter_clause(&mut query_builder) .attach_printable("Error filtering time range") .switch()?; + for dim in dimensions.iter() { + query_builder + .add_group_by_clause(dim) + .attach_printable("Error grouping by dimensions") + .switch()?; + } + if let Some(granularity) = granularity { granularity .set_group_by_clause(&mut query_builder) @@ -81,7 +93,23 @@ where .into_iter() .map(|i| { Ok(( - AuthEventMetricsBucketIdentifier::new(i.time_bucket.clone()), + AuthEventMetricsBucketIdentifier::new( + i.authentication_status.as_ref().map(|i| i.0), + i.trans_status.as_ref().map(|i| i.0.clone()), + i.error_message.clone(), + i.authentication_connector.as_ref().map(|i| i.0), + i.message_version.clone(), + TimeRange { + start_time: match (granularity, i.start_bucket) { + (Some(g), Some(st)) => g.clip_to_start(st)?, + _ => time_range.start_time, + }, + end_time: granularity.as_ref().map_or_else( + || Ok(time_range.end_time), + |g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(), + )?, + }, + ), i, )) }) diff --git a/crates/analytics/src/auth_events/metrics/authentication_error_message.rs b/crates/analytics/src/auth_events/metrics/authentication_error_message.rs new file mode 100644 index 0000000000..cdb89b796d --- /dev/null +++ b/crates/analytics/src/auth_events/metrics/authentication_error_message.rs @@ -0,0 +1,138 @@ +use std::collections::HashSet; + +use api_models::analytics::{ + auth_events::{AuthEventDimensions, AuthEventFilters, AuthEventMetricsBucketIdentifier}, + Granularity, TimeRange, +}; +use common_enums::AuthenticationStatus; +use common_utils::errors::ReportSwitchExt; +use error_stack::ResultExt; +use time::PrimitiveDateTime; + +use super::AuthEventMetricRow; +use crate::{ + query::{ + Aggregate, FilterTypes, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, + Window, + }, + types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult}, +}; + +#[derive(Default)] +pub(super) struct AuthenticationErrorMessage; + +#[async_trait::async_trait] +impl super::AuthEventMetric for AuthenticationErrorMessage +where + T: AnalyticsDataSource + super::AuthEventMetricAnalytics, + PrimitiveDateTime: ToSql, + AnalyticsCollection: ToSql, + Granularity: GroupByClause, + Aggregate<&'static str>: ToSql, + Window<&'static str>: ToSql, +{ + async fn load_metrics( + &self, + merchant_id: &common_utils::id_type::MerchantId, + dimensions: &[AuthEventDimensions], + filters: &AuthEventFilters, + granularity: Option, + time_range: &TimeRange, + pool: &T, + ) -> MetricsResult> { + let mut query_builder: QueryBuilder = + QueryBuilder::new(AnalyticsCollection::Authentications); + for dim in dimensions.iter() { + query_builder.add_select_column(dim).switch()?; + } + query_builder + .add_select_column(Aggregate::Count { + field: None, + alias: Some("count"), + }) + .switch()?; + + query_builder + .add_select_column(Aggregate::Min { + field: "created_at", + alias: Some("start_bucket"), + }) + .switch()?; + + query_builder + .add_select_column(Aggregate::Max { + field: "created_at", + alias: Some("end_bucket"), + }) + .switch()?; + + query_builder + .add_filter_clause("merchant_id", merchant_id) + .switch()?; + + query_builder + .add_filter_clause("authentication_status", AuthenticationStatus::Failed) + .switch()?; + + query_builder + .add_custom_filter_clause( + AuthEventDimensions::ErrorMessage, + "NULL", + FilterTypes::IsNotNull, + ) + .switch()?; + filters.set_filter_clause(&mut query_builder).switch()?; + time_range + .set_filter_clause(&mut query_builder) + .attach_printable("Error filtering time range") + .switch()?; + + for dim in dimensions.iter() { + query_builder + .add_group_by_clause(dim) + .attach_printable("Error grouping by dimensions") + .switch()?; + } + + if let Some(granularity) = granularity { + granularity + .set_group_by_clause(&mut query_builder) + .attach_printable("Error adding granularity") + .switch()?; + } + + query_builder + .execute_query::(pool) + .await + .change_context(MetricsError::QueryBuildingError)? + .change_context(MetricsError::QueryExecutionFailure)? + .into_iter() + .map(|i| { + Ok(( + AuthEventMetricsBucketIdentifier::new( + i.authentication_status.as_ref().map(|i| i.0), + i.trans_status.as_ref().map(|i| i.0.clone()), + i.error_message.clone(), + i.authentication_connector.as_ref().map(|i| i.0), + i.message_version.clone(), + TimeRange { + start_time: match (granularity, i.start_bucket) { + (Some(g), Some(st)) => g.clip_to_start(st)?, + _ => time_range.start_time, + }, + end_time: granularity.as_ref().map_or_else( + || Ok(time_range.end_time), + |g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(), + )?, + }, + ), + i, + )) + }) + .collect::, + crate::query::PostProcessingError, + >>() + .change_context(MetricsError::PostProcessingFailure) + } +} diff --git a/crates/analytics/src/auth_events/metrics/authentication_funnel.rs b/crates/analytics/src/auth_events/metrics/authentication_funnel.rs new file mode 100644 index 0000000000..37f9dfd1c1 --- /dev/null +++ b/crates/analytics/src/auth_events/metrics/authentication_funnel.rs @@ -0,0 +1,133 @@ +use std::collections::HashSet; + +use api_models::analytics::{ + auth_events::{AuthEventDimensions, AuthEventFilters, AuthEventMetricsBucketIdentifier}, + Granularity, TimeRange, +}; +use common_utils::errors::ReportSwitchExt; +use error_stack::ResultExt; +use time::PrimitiveDateTime; + +use super::AuthEventMetricRow; +use crate::{ + query::{ + Aggregate, FilterTypes, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, + Window, + }, + types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult}, +}; + +#[derive(Default)] +pub(super) struct AuthenticationFunnel; + +#[async_trait::async_trait] +impl super::AuthEventMetric for AuthenticationFunnel +where + T: AnalyticsDataSource + super::AuthEventMetricAnalytics, + PrimitiveDateTime: ToSql, + AnalyticsCollection: ToSql, + Granularity: GroupByClause, + Aggregate<&'static str>: ToSql, + Window<&'static str>: ToSql, +{ + async fn load_metrics( + &self, + merchant_id: &common_utils::id_type::MerchantId, + dimensions: &[AuthEventDimensions], + filters: &AuthEventFilters, + granularity: Option, + time_range: &TimeRange, + pool: &T, + ) -> MetricsResult> { + let mut query_builder: QueryBuilder = + QueryBuilder::new(AnalyticsCollection::Authentications); + for dim in dimensions.iter() { + query_builder.add_select_column(dim).switch()?; + } + query_builder + .add_select_column(Aggregate::Count { + field: None, + alias: Some("count"), + }) + .switch()?; + + query_builder + .add_select_column(Aggregate::Min { + field: "created_at", + alias: Some("start_bucket"), + }) + .switch()?; + + query_builder + .add_select_column(Aggregate::Max { + field: "created_at", + alias: Some("end_bucket"), + }) + .switch()?; + + query_builder + .add_filter_clause("merchant_id", merchant_id) + .switch()?; + + query_builder + .add_custom_filter_clause( + AuthEventDimensions::TransactionStatus, + "NULL", + FilterTypes::IsNotNull, + ) + .switch()?; + filters.set_filter_clause(&mut query_builder).switch()?; + time_range + .set_filter_clause(&mut query_builder) + .attach_printable("Error filtering time range") + .switch()?; + + for dim in dimensions.iter() { + query_builder + .add_group_by_clause(dim) + .attach_printable("Error grouping by dimensions") + .switch()?; + } + + if let Some(granularity) = granularity { + granularity + .set_group_by_clause(&mut query_builder) + .attach_printable("Error adding granularity") + .switch()?; + } + + query_builder + .execute_query::(pool) + .await + .change_context(MetricsError::QueryBuildingError)? + .change_context(MetricsError::QueryExecutionFailure)? + .into_iter() + .map(|i| { + Ok(( + AuthEventMetricsBucketIdentifier::new( + i.authentication_status.as_ref().map(|i| i.0), + i.trans_status.as_ref().map(|i| i.0.clone()), + i.error_message.clone(), + i.authentication_connector.as_ref().map(|i| i.0), + i.message_version.clone(), + TimeRange { + start_time: match (granularity, i.start_bucket) { + (Some(g), Some(st)) => g.clip_to_start(st)?, + _ => time_range.start_time, + }, + end_time: granularity.as_ref().map_or_else( + || Ok(time_range.end_time), + |g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(), + )?, + }, + ), + i, + )) + }) + .collect::, + crate::query::PostProcessingError, + >>() + .change_context(MetricsError::PostProcessingFailure) + } +} diff --git a/crates/analytics/src/auth_events/metrics/authentication_success_count.rs b/crates/analytics/src/auth_events/metrics/authentication_success_count.rs index e887807f41..039ef00dd6 100644 --- a/crates/analytics/src/auth_events/metrics/authentication_success_count.rs +++ b/crates/analytics/src/auth_events/metrics/authentication_success_count.rs @@ -1,7 +1,8 @@ use std::collections::HashSet; use api_models::analytics::{ - auth_events::AuthEventMetricsBucketIdentifier, Granularity, TimeRange, + auth_events::{AuthEventDimensions, AuthEventFilters, AuthEventMetricsBucketIdentifier}, + Granularity, TimeRange, }; use common_enums::AuthenticationStatus; use common_utils::errors::ReportSwitchExt; @@ -10,7 +11,7 @@ use time::PrimitiveDateTime; use super::AuthEventMetricRow; use crate::{ - query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, ToSql, Window}, + query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window}, types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult}, }; @@ -30,13 +31,17 @@ where async fn load_metrics( &self, merchant_id: &common_utils::id_type::MerchantId, + dimensions: &[AuthEventDimensions], + filters: &AuthEventFilters, granularity: Option, time_range: &TimeRange, pool: &T, ) -> MetricsResult> { let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::Authentications); - + for dim in dimensions.iter() { + query_builder.add_select_column(dim).switch()?; + } query_builder .add_select_column(Aggregate::Count { field: None, @@ -65,12 +70,19 @@ where query_builder .add_filter_clause("authentication_status", AuthenticationStatus::Success) .switch()?; - + filters.set_filter_clause(&mut query_builder).switch()?; time_range .set_filter_clause(&mut query_builder) .attach_printable("Error filtering time range") .switch()?; + for dim in dimensions.iter() { + query_builder + .add_group_by_clause(dim) + .attach_printable("Error grouping by dimensions") + .switch()?; + } + if let Some(granularity) = granularity { granularity .set_group_by_clause(&mut query_builder) @@ -86,7 +98,23 @@ where .into_iter() .map(|i| { Ok(( - AuthEventMetricsBucketIdentifier::new(i.time_bucket.clone()), + AuthEventMetricsBucketIdentifier::new( + i.authentication_status.as_ref().map(|i| i.0), + i.trans_status.as_ref().map(|i| i.0.clone()), + i.error_message.clone(), + i.authentication_connector.as_ref().map(|i| i.0), + i.message_version.clone(), + TimeRange { + start_time: match (granularity, i.start_bucket) { + (Some(g), Some(st)) => g.clip_to_start(st)?, + _ => time_range.start_time, + }, + end_time: granularity.as_ref().map_or_else( + || Ok(time_range.end_time), + |g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(), + )?, + }, + ), i, )) }) diff --git a/crates/analytics/src/auth_events/metrics/challenge_attempt_count.rs b/crates/analytics/src/auth_events/metrics/challenge_attempt_count.rs index f1f6a39799..beccc093b1 100644 --- a/crates/analytics/src/auth_events/metrics/challenge_attempt_count.rs +++ b/crates/analytics/src/auth_events/metrics/challenge_attempt_count.rs @@ -1,7 +1,8 @@ use std::collections::HashSet; use api_models::analytics::{ - auth_events::AuthEventMetricsBucketIdentifier, Granularity, TimeRange, + auth_events::{AuthEventDimensions, AuthEventFilters, AuthEventMetricsBucketIdentifier}, + Granularity, TimeRange, }; use common_utils::errors::ReportSwitchExt; use error_stack::ResultExt; @@ -9,7 +10,7 @@ use time::PrimitiveDateTime; use super::AuthEventMetricRow; use crate::{ - query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, ToSql, Window}, + query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window}, types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult}, }; @@ -29,13 +30,17 @@ where async fn load_metrics( &self, merchant_id: &common_utils::id_type::MerchantId, + dimensions: &[AuthEventDimensions], + filters: &AuthEventFilters, granularity: Option, time_range: &TimeRange, pool: &T, ) -> MetricsResult> { let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::Authentications); - + for dim in dimensions.iter() { + query_builder.add_select_column(dim).switch()?; + } query_builder .add_select_column(Aggregate::Count { field: None, @@ -68,12 +73,19 @@ where query_builder .add_negative_filter_clause("authentication_status", "pending") .switch()?; - + filters.set_filter_clause(&mut query_builder).switch()?; time_range .set_filter_clause(&mut query_builder) .attach_printable("Error filtering time range") .switch()?; + for dim in dimensions.iter() { + query_builder + .add_group_by_clause(dim) + .attach_printable("Error grouping by dimensions") + .switch()?; + } + if let Some(granularity) = granularity { granularity .set_group_by_clause(&mut query_builder) @@ -89,7 +101,23 @@ where .into_iter() .map(|i| { Ok(( - AuthEventMetricsBucketIdentifier::new(i.time_bucket.clone()), + AuthEventMetricsBucketIdentifier::new( + i.authentication_status.as_ref().map(|i| i.0), + i.trans_status.as_ref().map(|i| i.0.clone()), + i.error_message.clone(), + i.authentication_connector.as_ref().map(|i| i.0), + i.message_version.clone(), + TimeRange { + start_time: match (granularity, i.start_bucket) { + (Some(g), Some(st)) => g.clip_to_start(st)?, + _ => time_range.start_time, + }, + end_time: granularity.as_ref().map_or_else( + || Ok(time_range.end_time), + |g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(), + )?, + }, + ), i, )) }) diff --git a/crates/analytics/src/auth_events/metrics/challenge_flow_count.rs b/crates/analytics/src/auth_events/metrics/challenge_flow_count.rs index c08618cc0d..4d07cffba9 100644 --- a/crates/analytics/src/auth_events/metrics/challenge_flow_count.rs +++ b/crates/analytics/src/auth_events/metrics/challenge_flow_count.rs @@ -1,7 +1,8 @@ use std::collections::HashSet; use api_models::analytics::{ - auth_events::AuthEventMetricsBucketIdentifier, Granularity, TimeRange, + auth_events::{AuthEventDimensions, AuthEventFilters, AuthEventMetricsBucketIdentifier}, + Granularity, TimeRange, }; use common_utils::errors::ReportSwitchExt; use error_stack::ResultExt; @@ -9,7 +10,7 @@ use time::PrimitiveDateTime; use super::AuthEventMetricRow; use crate::{ - query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, ToSql, Window}, + query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window}, types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult}, }; @@ -29,13 +30,17 @@ where async fn load_metrics( &self, merchant_id: &common_utils::id_type::MerchantId, + dimensions: &[AuthEventDimensions], + filters: &AuthEventFilters, granularity: Option, time_range: &TimeRange, pool: &T, ) -> MetricsResult> { let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::Authentications); - + for dim in dimensions.iter() { + query_builder.add_select_column(dim).switch()?; + } query_builder .add_select_column(Aggregate::Count { field: None, @@ -64,12 +69,18 @@ where query_builder .add_filter_clause("trans_status", "C".to_string()) .switch()?; - + filters.set_filter_clause(&mut query_builder).switch()?; time_range .set_filter_clause(&mut query_builder) .attach_printable("Error filtering time range") .switch()?; + for dim in dimensions.iter() { + query_builder + .add_group_by_clause(dim) + .attach_printable("Error grouping by dimensions") + .switch()?; + } if let Some(granularity) = granularity { granularity .set_group_by_clause(&mut query_builder) @@ -85,7 +96,23 @@ where .into_iter() .map(|i| { Ok(( - AuthEventMetricsBucketIdentifier::new(i.time_bucket.clone()), + AuthEventMetricsBucketIdentifier::new( + i.authentication_status.as_ref().map(|i| i.0), + i.trans_status.as_ref().map(|i| i.0.clone()), + i.error_message.clone(), + i.authentication_connector.as_ref().map(|i| i.0), + i.message_version.clone(), + TimeRange { + start_time: match (granularity, i.start_bucket) { + (Some(g), Some(st)) => g.clip_to_start(st)?, + _ => time_range.start_time, + }, + end_time: granularity.as_ref().map_or_else( + || Ok(time_range.end_time), + |g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(), + )?, + }, + ), i, )) }) diff --git a/crates/analytics/src/auth_events/metrics/challenge_success_count.rs b/crates/analytics/src/auth_events/metrics/challenge_success_count.rs index 3fb75efd56..310a45f053 100644 --- a/crates/analytics/src/auth_events/metrics/challenge_success_count.rs +++ b/crates/analytics/src/auth_events/metrics/challenge_success_count.rs @@ -1,7 +1,8 @@ use std::collections::HashSet; use api_models::analytics::{ - auth_events::AuthEventMetricsBucketIdentifier, Granularity, TimeRange, + auth_events::{AuthEventDimensions, AuthEventFilters, AuthEventMetricsBucketIdentifier}, + Granularity, TimeRange, }; use common_enums::AuthenticationStatus; use common_utils::errors::ReportSwitchExt; @@ -10,7 +11,7 @@ use time::PrimitiveDateTime; use super::AuthEventMetricRow; use crate::{ - query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, ToSql, Window}, + query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window}, types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult}, }; @@ -30,13 +31,17 @@ where async fn load_metrics( &self, merchant_id: &common_utils::id_type::MerchantId, + dimensions: &[AuthEventDimensions], + filters: &AuthEventFilters, granularity: Option, time_range: &TimeRange, pool: &T, ) -> MetricsResult> { let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::Authentications); - + for dim in dimensions.iter() { + query_builder.add_select_column(dim).switch()?; + } query_builder .add_select_column(Aggregate::Count { field: None, @@ -69,12 +74,19 @@ where query_builder .add_filter_clause("trans_status", "C".to_string()) .switch()?; - + filters.set_filter_clause(&mut query_builder).switch()?; time_range .set_filter_clause(&mut query_builder) .attach_printable("Error filtering time range") .switch()?; + for dim in dimensions.iter() { + query_builder + .add_group_by_clause(dim) + .attach_printable("Error grouping by dimensions") + .switch()?; + } + if let Some(granularity) = granularity { granularity .set_group_by_clause(&mut query_builder) @@ -90,7 +102,23 @@ where .into_iter() .map(|i| { Ok(( - AuthEventMetricsBucketIdentifier::new(i.time_bucket.clone()), + AuthEventMetricsBucketIdentifier::new( + i.authentication_status.as_ref().map(|i| i.0), + i.trans_status.as_ref().map(|i| i.0.clone()), + i.error_message.clone(), + i.authentication_connector.as_ref().map(|i| i.0), + i.message_version.clone(), + TimeRange { + start_time: match (granularity, i.start_bucket) { + (Some(g), Some(st)) => g.clip_to_start(st)?, + _ => time_range.start_time, + }, + end_time: granularity.as_ref().map_or_else( + || Ok(time_range.end_time), + |g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(), + )?, + }, + ), i, )) }) diff --git a/crates/analytics/src/auth_events/metrics/frictionless_flow_count.rs b/crates/analytics/src/auth_events/metrics/frictionless_flow_count.rs index 8859c60fc3..24857bfb84 100644 --- a/crates/analytics/src/auth_events/metrics/frictionless_flow_count.rs +++ b/crates/analytics/src/auth_events/metrics/frictionless_flow_count.rs @@ -1,7 +1,8 @@ use std::collections::HashSet; use api_models::analytics::{ - auth_events::AuthEventMetricsBucketIdentifier, Granularity, TimeRange, + auth_events::{AuthEventDimensions, AuthEventFilters, AuthEventMetricsBucketIdentifier}, + Granularity, TimeRange, }; use common_utils::errors::ReportSwitchExt; use error_stack::ResultExt; @@ -9,7 +10,7 @@ use time::PrimitiveDateTime; use super::AuthEventMetricRow; use crate::{ - query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, ToSql, Window}, + query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window}, types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult}, }; @@ -29,13 +30,17 @@ where async fn load_metrics( &self, merchant_id: &common_utils::id_type::MerchantId, + dimensions: &[AuthEventDimensions], + filters: &AuthEventFilters, granularity: Option, time_range: &TimeRange, pool: &T, ) -> MetricsResult> { let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::Authentications); - + for dim in dimensions.iter() { + query_builder.add_select_column(dim).switch()?; + } query_builder .add_select_column(Aggregate::Count { field: None, @@ -64,12 +69,19 @@ where query_builder .add_filter_clause("trans_status", "Y".to_string()) .switch()?; - + filters.set_filter_clause(&mut query_builder).switch()?; time_range .set_filter_clause(&mut query_builder) .attach_printable("Error filtering time range") .switch()?; + for dim in dimensions.iter() { + query_builder + .add_group_by_clause(dim) + .attach_printable("Error grouping by dimensions") + .switch()?; + } + if let Some(granularity) = granularity { granularity .set_group_by_clause(&mut query_builder) @@ -85,7 +97,23 @@ where .into_iter() .map(|i| { Ok(( - AuthEventMetricsBucketIdentifier::new(i.time_bucket.clone()), + AuthEventMetricsBucketIdentifier::new( + i.authentication_status.as_ref().map(|i| i.0), + i.trans_status.as_ref().map(|i| i.0.clone()), + i.error_message.clone(), + i.authentication_connector.as_ref().map(|i| i.0), + i.message_version.clone(), + TimeRange { + start_time: match (granularity, i.start_bucket) { + (Some(g), Some(st)) => g.clip_to_start(st)?, + _ => time_range.start_time, + }, + end_time: granularity.as_ref().map_or_else( + || Ok(time_range.end_time), + |g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(), + )?, + }, + ), i, )) }) diff --git a/crates/analytics/src/auth_events/metrics/frictionless_success_count.rs b/crates/analytics/src/auth_events/metrics/frictionless_success_count.rs index 3d5d894e7d..79fef8a16d 100644 --- a/crates/analytics/src/auth_events/metrics/frictionless_success_count.rs +++ b/crates/analytics/src/auth_events/metrics/frictionless_success_count.rs @@ -1,7 +1,8 @@ use std::collections::HashSet; use api_models::analytics::{ - auth_events::AuthEventMetricsBucketIdentifier, Granularity, TimeRange, + auth_events::{AuthEventDimensions, AuthEventFilters, AuthEventMetricsBucketIdentifier}, + Granularity, TimeRange, }; use common_enums::AuthenticationStatus; use common_utils::errors::ReportSwitchExt; @@ -10,7 +11,7 @@ use time::PrimitiveDateTime; use super::AuthEventMetricRow; use crate::{ - query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, ToSql, Window}, + query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window}, types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult}, }; @@ -30,13 +31,17 @@ where async fn load_metrics( &self, merchant_id: &common_utils::id_type::MerchantId, + dimensions: &[AuthEventDimensions], + filters: &AuthEventFilters, granularity: Option, time_range: &TimeRange, pool: &T, ) -> MetricsResult> { let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::Authentications); - + for dim in dimensions.iter() { + query_builder.add_select_column(dim).switch()?; + } query_builder .add_select_column(Aggregate::Count { field: None, @@ -69,12 +74,19 @@ where query_builder .add_filter_clause("authentication_status", AuthenticationStatus::Success) .switch()?; - + filters.set_filter_clause(&mut query_builder).switch()?; time_range .set_filter_clause(&mut query_builder) .attach_printable("Error filtering time range") .switch()?; + for dim in dimensions.iter() { + query_builder + .add_group_by_clause(dim) + .attach_printable("Error grouping by dimensions") + .switch()?; + } + if let Some(granularity) = granularity { granularity .set_group_by_clause(&mut query_builder) @@ -90,7 +102,23 @@ where .into_iter() .map(|i| { Ok(( - AuthEventMetricsBucketIdentifier::new(i.time_bucket.clone()), + AuthEventMetricsBucketIdentifier::new( + i.authentication_status.as_ref().map(|i| i.0), + i.trans_status.as_ref().map(|i| i.0.clone()), + i.error_message.clone(), + i.authentication_connector.as_ref().map(|i| i.0), + i.message_version.clone(), + TimeRange { + start_time: match (granularity, i.start_bucket) { + (Some(g), Some(st)) => g.clip_to_start(st)?, + _ => time_range.start_time, + }, + end_time: granularity.as_ref().map_or_else( + || Ok(time_range.end_time), + |g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(), + )?, + }, + ), i, )) }) diff --git a/crates/analytics/src/auth_events/types.rs b/crates/analytics/src/auth_events/types.rs new file mode 100644 index 0000000000..ac7ee2462e --- /dev/null +++ b/crates/analytics/src/auth_events/types.rs @@ -0,0 +1,58 @@ +use api_models::analytics::auth_events::{AuthEventDimensions, AuthEventFilters}; +use error_stack::ResultExt; + +use crate::{ + query::{QueryBuilder, QueryFilter, QueryResult, ToSql}, + types::{AnalyticsCollection, AnalyticsDataSource}, +}; + +impl QueryFilter for AuthEventFilters +where + T: AnalyticsDataSource, + AnalyticsCollection: ToSql, +{ + fn set_filter_clause(&self, builder: &mut QueryBuilder) -> QueryResult<()> { + if !self.authentication_status.is_empty() { + builder + .add_filter_in_range_clause( + AuthEventDimensions::AuthenticationStatus, + &self.authentication_status, + ) + .attach_printable("Error adding authentication status filter")?; + } + + if !self.trans_status.is_empty() { + builder + .add_filter_in_range_clause( + AuthEventDimensions::TransactionStatus, + &self.trans_status, + ) + .attach_printable("Error adding transaction status filter")?; + } + + if !self.error_message.is_empty() { + builder + .add_filter_in_range_clause(AuthEventDimensions::ErrorMessage, &self.error_message) + .attach_printable("Error adding error message filter")?; + } + + if !self.authentication_connector.is_empty() { + builder + .add_filter_in_range_clause( + AuthEventDimensions::AuthenticationConnector, + &self.authentication_connector, + ) + .attach_printable("Error adding authentication connector filter")?; + } + + if !self.message_version.is_empty() { + builder + .add_filter_in_range_clause( + AuthEventDimensions::MessageVersion, + &self.message_version, + ) + .attach_printable("Error adding message version filter")?; + } + Ok(()) + } +} diff --git a/crates/analytics/src/clickhouse.rs b/crates/analytics/src/clickhouse.rs index 1c86e7cbcf..1d52cc9e76 100644 --- a/crates/analytics/src/clickhouse.rs +++ b/crates/analytics/src/clickhouse.rs @@ -28,6 +28,7 @@ use crate::{ filters::ApiEventFilter, metrics::{latency::LatencyAvg, ApiEventMetricRow}, }, + auth_events::filters::AuthEventFilterRow, connector_events::events::ConnectorEventsResult, disputes::{filters::DisputeFilterRow, metrics::DisputeMetricRow}, outgoing_webhook_event::events::OutgoingWebhookLogsResult, @@ -181,6 +182,7 @@ impl super::sdk_events::metrics::SdkEventMetricAnalytics for ClickhouseClient {} impl super::sdk_events::events::SdkEventsFilterAnalytics for ClickhouseClient {} impl super::active_payments::metrics::ActivePaymentsMetricAnalytics for ClickhouseClient {} impl super::auth_events::metrics::AuthEventMetricAnalytics for ClickhouseClient {} +impl super::auth_events::filters::AuthEventFilterAnalytics for ClickhouseClient {} impl super::api_event::events::ApiLogsFilterAnalytics for ClickhouseClient {} impl super::api_event::filters::ApiEventFilterAnalytics for ClickhouseClient {} impl super::api_event::metrics::ApiEventMetricAnalytics for ClickhouseClient {} @@ -403,6 +405,16 @@ impl TryInto for serde_json::Value { } } +impl TryInto for serde_json::Value { + type Error = Report; + + fn try_into(self) -> Result { + serde_json::from_value(self).change_context(ParsingError::StructParseFailure( + "Failed to parse AuthEventFilterRow in clickhouse results", + )) + } +} + impl TryInto for serde_json::Value { type Error = Report; diff --git a/crates/analytics/src/core.rs b/crates/analytics/src/core.rs index 0e3ced7993..980e17bc90 100644 --- a/crates/analytics/src/core.rs +++ b/crates/analytics/src/core.rs @@ -34,7 +34,7 @@ pub async fn get_domain_info( AnalyticsDomain::AuthEvents => GetInfoResponse { metrics: utils::get_auth_event_metrics_info(), download_dimensions: None, - dimensions: Vec::new(), + dimensions: utils::get_auth_event_dimensions(), }, AnalyticsDomain::ApiEvents => GetInfoResponse { metrics: utils::get_api_event_metrics_info(), diff --git a/crates/analytics/src/lib.rs b/crates/analytics/src/lib.rs index ef7108e8ef..f698b1161a 100644 --- a/crates/analytics/src/lib.rs +++ b/crates/analytics/src/lib.rs @@ -41,7 +41,9 @@ use api_models::analytics::{ api_event::{ ApiEventDimensions, ApiEventFilters, ApiEventMetrics, ApiEventMetricsBucketIdentifier, }, - auth_events::{AuthEventMetrics, AuthEventMetricsBucketIdentifier}, + auth_events::{ + AuthEventDimensions, AuthEventFilters, AuthEventMetrics, AuthEventMetricsBucketIdentifier, + }, disputes::{DisputeDimensions, DisputeFilters, DisputeMetrics, DisputeMetricsBucketIdentifier}, frm::{FrmDimensions, FrmFilters, FrmMetrics, FrmMetricsBucketIdentifier}, payment_intents::{ @@ -908,7 +910,9 @@ impl AnalyticsProvider { pub async fn get_auth_event_metrics( &self, metric: &AuthEventMetrics, + dimensions: &[AuthEventDimensions], merchant_id: &common_utils::id_type::MerchantId, + filters: &AuthEventFilters, granularity: Option, time_range: &TimeRange, ) -> types::MetricsResult> { @@ -916,13 +920,22 @@ impl AnalyticsProvider { Self::Sqlx(_pool) => Err(report!(MetricsError::NotImplemented)), Self::Clickhouse(pool) => { metric - .load_metrics(merchant_id, granularity, time_range, pool) + .load_metrics( + merchant_id, + dimensions, + filters, + granularity, + time_range, + pool, + ) .await } Self::CombinedCkh(_sqlx_pool, ckh_pool) | Self::CombinedSqlx(_sqlx_pool, ckh_pool) => { metric .load_metrics( merchant_id, + dimensions, + filters, granularity, // Since API events are ckh only use ckh here time_range, @@ -1126,6 +1139,7 @@ pub enum AnalyticsFlow { GetFrmMetrics, GetSdkMetrics, GetAuthMetrics, + GetAuthEventFilters, GetActivePaymentsMetrics, GetPaymentFilters, GetPaymentIntentFilters, diff --git a/crates/analytics/src/query.rs b/crates/analytics/src/query.rs index 59cb874344..d483ce4360 100644 --- a/crates/analytics/src/query.rs +++ b/crates/analytics/src/query.rs @@ -4,7 +4,7 @@ use api_models::{ analytics::{ self as analytics_api, api_event::ApiEventDimensions, - auth_events::AuthEventFlows, + auth_events::{AuthEventDimensions, AuthEventFlows}, disputes::DisputeDimensions, frm::{FrmDimensions, FrmTransactionType}, payment_intents::PaymentIntentDimensions, @@ -19,7 +19,7 @@ use api_models::{ }, refunds::RefundStatus, }; -use common_enums::{AuthenticationStatus, TransactionStatus}; +use common_enums::{AuthenticationConnectors, AuthenticationStatus, TransactionStatus}; use common_utils::{ errors::{CustomResult, ParsingError}, id_type::{MerchantId, OrganizationId, ProfileId}, @@ -505,6 +505,7 @@ impl_to_sql_for_to_string!( FrmTransactionType, TransactionStatus, AuthenticationStatus, + AuthenticationConnectors, Flow, &String, &bool, @@ -522,7 +523,9 @@ impl_to_sql_for_to_string!( ApiEventDimensions, &DisputeDimensions, DisputeDimensions, - DisputeStage + DisputeStage, + AuthEventDimensions, + &AuthEventDimensions ); #[derive(Debug, Clone, Copy)] diff --git a/crates/analytics/src/sqlx.rs b/crates/analytics/src/sqlx.rs index f3143840f3..a6db92e753 100644 --- a/crates/analytics/src/sqlx.rs +++ b/crates/analytics/src/sqlx.rs @@ -4,6 +4,7 @@ use api_models::{ analytics::{frm::FrmTransactionType, refunds::RefundType}, enums::{DisputeStage, DisputeStatus}, }; +use common_enums::{AuthenticationConnectors, AuthenticationStatus, TransactionStatus}; use common_utils::{ errors::{CustomResult, ParsingError}, DbConnectionParams, @@ -96,6 +97,9 @@ db_type!(FraudCheckStatus); db_type!(FrmTransactionType); db_type!(DisputeStage); db_type!(DisputeStatus); +db_type!(AuthenticationStatus); +db_type!(TransactionStatus); +db_type!(AuthenticationConnectors); impl<'q, Type> Encode<'q, Postgres> for DBEnumWrapper where @@ -159,6 +163,8 @@ impl super::disputes::filters::DisputeFilterAnalytics for SqlxClient {} impl super::disputes::metrics::DisputeMetricAnalytics for SqlxClient {} impl super::frm::metrics::FrmMetricAnalytics for SqlxClient {} impl super::frm::filters::FrmFilterAnalytics for SqlxClient {} +impl super::auth_events::metrics::AuthEventMetricAnalytics for SqlxClient {} +impl super::auth_events::filters::AuthEventFilterAnalytics for SqlxClient {} #[async_trait::async_trait] impl AnalyticsDataSource for SqlxClient { @@ -190,6 +196,94 @@ impl HealthCheck for SqlxClient { } } +impl<'a> FromRow<'a, PgRow> for super::auth_events::metrics::AuthEventMetricRow { + fn from_row(row: &'a PgRow) -> sqlx::Result { + let authentication_status: Option> = + row.try_get("authentication_status").or_else(|e| match e { + ColumnNotFound(_) => Ok(Default::default()), + e => Err(e), + })?; + let trans_status: Option> = + row.try_get("trans_status").or_else(|e| match e { + ColumnNotFound(_) => Ok(Default::default()), + e => Err(e), + })?; + let error_message: Option = row.try_get("error_message").or_else(|e| match e { + ColumnNotFound(_) => Ok(Default::default()), + e => Err(e), + })?; + let authentication_connector: Option> = row + .try_get("authentication_connector") + .or_else(|e| match e { + ColumnNotFound(_) => Ok(Default::default()), + e => Err(e), + })?; + let message_version: Option = + row.try_get("message_version").or_else(|e| match e { + ColumnNotFound(_) => Ok(Default::default()), + e => Err(e), + })?; + let count: Option = row.try_get("count").or_else(|e| match e { + ColumnNotFound(_) => Ok(Default::default()), + e => Err(e), + })?; + // Removing millisecond precision to get accurate diffs against clickhouse + let start_bucket: Option = row + .try_get::, _>("start_bucket")? + .and_then(|dt| dt.replace_millisecond(0).ok()); + let end_bucket: Option = row + .try_get::, _>("end_bucket")? + .and_then(|dt| dt.replace_millisecond(0).ok()); + Ok(Self { + authentication_status, + trans_status, + error_message, + authentication_connector, + message_version, + count, + start_bucket, + end_bucket, + }) + } +} + +impl<'a> FromRow<'a, PgRow> for super::auth_events::filters::AuthEventFilterRow { + fn from_row(row: &'a PgRow) -> sqlx::Result { + let authentication_status: Option> = + row.try_get("authentication_status").or_else(|e| match e { + ColumnNotFound(_) => Ok(Default::default()), + e => Err(e), + })?; + let trans_status: Option> = + row.try_get("trans_status").or_else(|e| match e { + ColumnNotFound(_) => Ok(Default::default()), + e => Err(e), + })?; + let error_message: Option = row.try_get("error_message").or_else(|e| match e { + ColumnNotFound(_) => Ok(Default::default()), + e => Err(e), + })?; + let authentication_connector: Option> = row + .try_get("authentication_connector") + .or_else(|e| match e { + ColumnNotFound(_) => Ok(Default::default()), + e => Err(e), + })?; + let message_version: Option = + row.try_get("message_version").or_else(|e| match e { + ColumnNotFound(_) => Ok(Default::default()), + e => Err(e), + })?; + Ok(Self { + authentication_status, + trans_status, + error_message, + authentication_connector, + message_version, + }) + } +} + impl<'a> FromRow<'a, PgRow> for super::refunds::metrics::RefundMetricRow { fn from_row(row: &'a PgRow) -> sqlx::Result { let currency: Option> = diff --git a/crates/analytics/src/utils.rs b/crates/analytics/src/utils.rs index fc21bf0981..a0ddead136 100644 --- a/crates/analytics/src/utils.rs +++ b/crates/analytics/src/utils.rs @@ -1,6 +1,6 @@ use api_models::analytics::{ api_event::{ApiEventDimensions, ApiEventMetrics}, - auth_events::AuthEventMetrics, + auth_events::{AuthEventDimensions, AuthEventMetrics}, disputes::{DisputeDimensions, DisputeMetrics}, frm::{FrmDimensions, FrmMetrics}, payment_intents::{PaymentIntentDimensions, PaymentIntentMetrics}, @@ -47,6 +47,16 @@ pub fn get_payment_intent_dimensions() -> Vec { .collect() } +pub fn get_auth_event_dimensions() -> Vec { + vec![ + AuthEventDimensions::AuthenticationConnector, + AuthEventDimensions::MessageVersion, + ] + .into_iter() + .map(Into::into) + .collect() +} + pub fn get_refund_dimensions() -> Vec { RefundDimensions::iter().map(Into::into).collect() } diff --git a/crates/api_models/src/analytics.rs b/crates/api_models/src/analytics.rs index 132272f0e4..71d7f80eb7 100644 --- a/crates/api_models/src/analytics.rs +++ b/crates/api_models/src/analytics.rs @@ -7,7 +7,7 @@ use masking::Secret; use self::{ active_payments::ActivePaymentsMetrics, api_event::{ApiEventDimensions, ApiEventMetrics}, - auth_events::AuthEventMetrics, + auth_events::{AuthEventDimensions, AuthEventFilters, AuthEventMetrics}, disputes::{DisputeDimensions, DisputeMetrics}, frm::{FrmDimensions, FrmMetrics}, payment_intents::{PaymentIntentDimensions, PaymentIntentMetrics}, @@ -226,6 +226,10 @@ pub struct GetAuthEventMetricRequest { pub time_series: Option, pub time_range: TimeRange, #[serde(default)] + pub group_by_names: Vec, + #[serde(default)] + pub filters: AuthEventFilters, + #[serde(default)] pub metrics: HashSet, #[serde(default)] pub delta: bool, @@ -509,3 +513,36 @@ pub struct SankeyResponse { pub dispute_status: Option, pub first_attempt: i64, } + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct GetAuthEventFilterRequest { + pub time_range: TimeRange, + #[serde(default)] + pub group_by_names: Vec, +} + +#[derive(Debug, Default, serde::Serialize, Eq, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct AuthEventFiltersResponse { + pub query_data: Vec, +} + +#[derive(Debug, serde::Serialize, Eq, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct AuthEventFilterValue { + pub dimension: AuthEventDimensions, + pub values: Vec, +} + +#[derive(Debug, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AuthEventMetricsResponse { + pub query_data: Vec, + pub meta_data: [AuthEventsAnalyticsMetadata; 1], +} + +#[derive(Debug, serde::Serialize)] +pub struct AuthEventsAnalyticsMetadata { + pub total_error_message_count: Option, +} diff --git a/crates/api_models/src/analytics/auth_events.rs b/crates/api_models/src/analytics/auth_events.rs index 6f52755134..5c2d4ed206 100644 --- a/crates/api_models/src/analytics/auth_events.rs +++ b/crates/api_models/src/analytics/auth_events.rs @@ -3,7 +3,23 @@ use std::{ hash::{Hash, Hasher}, }; -use super::NameDescription; +use common_enums::{AuthenticationConnectors, AuthenticationStatus, TransactionStatus}; + +use super::{NameDescription, TimeRange}; + +#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)] +pub struct AuthEventFilters { + #[serde(default)] + pub authentication_status: Vec, + #[serde(default)] + pub trans_status: Vec, + #[serde(default)] + pub error_message: Vec, + #[serde(default)] + pub authentication_connector: Vec, + #[serde(default)] + pub message_version: Vec, +} #[derive( Debug, @@ -22,10 +38,13 @@ use super::NameDescription; #[serde(rename_all = "snake_case")] #[strum(serialize_all = "snake_case")] pub enum AuthEventDimensions { - #[serde(rename = "authentication_status")] AuthenticationStatus, + #[strum(serialize = "trans_status")] #[serde(rename = "trans_status")] TransactionStatus, + ErrorMessage, + AuthenticationConnector, + MessageVersion, } #[derive( @@ -51,6 +70,8 @@ pub enum AuthEventMetrics { FrictionlessSuccessCount, ChallengeAttemptCount, ChallengeSuccessCount, + AuthenticationErrorMessage, + AuthenticationFunnel, } #[derive( @@ -79,6 +100,7 @@ pub mod metric_behaviour { pub struct FrictionlessSuccessCount; pub struct ChallengeAttemptCount; pub struct ChallengeSuccessCount; + pub struct AuthenticationErrorMessage; } impl From for NameDescription { @@ -90,19 +112,58 @@ impl From for NameDescription { } } +impl From for NameDescription { + fn from(value: AuthEventDimensions) -> Self { + Self { + name: value.to_string(), + desc: String::new(), + } + } +} + #[derive(Debug, serde::Serialize, Eq)] pub struct AuthEventMetricsBucketIdentifier { - pub time_bucket: Option, + pub authentication_status: Option, + pub trans_status: Option, + pub error_message: Option, + pub authentication_connector: Option, + pub message_version: Option, + #[serde(rename = "time_range")] + pub time_bucket: TimeRange, + #[serde(rename = "time_bucket")] + #[serde(with = "common_utils::custom_serde::iso8601custom")] + pub start_time: time::PrimitiveDateTime, } impl AuthEventMetricsBucketIdentifier { - pub fn new(time_bucket: Option) -> Self { - Self { time_bucket } + #[allow(clippy::too_many_arguments)] + pub fn new( + authentication_status: Option, + trans_status: Option, + error_message: Option, + authentication_connector: Option, + message_version: Option, + normalized_time_range: TimeRange, + ) -> Self { + Self { + authentication_status, + trans_status, + error_message, + authentication_connector, + message_version, + time_bucket: normalized_time_range, + start_time: normalized_time_range.start_time, + } } } impl Hash for AuthEventMetricsBucketIdentifier { fn hash(&self, state: &mut H) { + self.authentication_status.hash(state); + self.trans_status.hash(state); + self.authentication_connector.hash(state); + self.message_version.hash(state); + self.error_message.hash(state); self.time_bucket.hash(state); } } @@ -127,6 +188,8 @@ pub struct AuthEventMetricsBucketValue { pub challenge_success_count: Option, pub frictionless_flow_count: Option, pub frictionless_success_count: Option, + pub error_message_count: Option, + pub authentication_funnel: Option, } #[derive(Debug, serde::Serialize)] diff --git a/crates/api_models/src/events.rs b/crates/api_models/src/events.rs index bf7544f7c0..31b0c1d8dc 100644 --- a/crates/api_models/src/events.rs +++ b/crates/api_models/src/events.rs @@ -113,10 +113,12 @@ impl_api_event_type!( GetActivePaymentsMetricRequest, GetSdkEventMetricRequest, GetAuthEventMetricRequest, + GetAuthEventFilterRequest, GetPaymentFiltersRequest, PaymentFiltersResponse, GetRefundFilterRequest, RefundFiltersResponse, + AuthEventFiltersResponse, GetSdkEventFiltersRequest, SdkEventFiltersResponse, ApiLogsRequest, @@ -180,6 +182,13 @@ impl ApiEventMetric for DisputesMetricsResponse { Some(ApiEventsType::Miscellaneous) } } + +impl ApiEventMetric for AuthEventMetricsResponse { + fn get_api_event_type(&self) -> Option { + Some(ApiEventsType::Miscellaneous) + } +} + #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] impl ApiEventMetric for PaymentMethodIntentConfirmInternal { fn get_api_event_type(&self) -> Option { diff --git a/crates/common_enums/src/enums.rs b/crates/common_enums/src/enums.rs index dbbee9764f..296bdbd9dd 100644 --- a/crates/common_enums/src/enums.rs +++ b/crates/common_enums/src/enums.rs @@ -6686,6 +6686,7 @@ impl From for EntityType { serde::Serialize, serde::Deserialize, Eq, + Hash, PartialEq, ToSchema, strum::Display, diff --git a/crates/router/src/analytics.rs b/crates/router/src/analytics.rs index deaf84bb00..858c16d052 100644 --- a/crates/router/src/analytics.rs +++ b/crates/router/src/analytics.rs @@ -19,11 +19,11 @@ pub mod routes { GetGlobalSearchRequest, GetSearchRequest, GetSearchRequestWithIndex, SearchIndex, }, AnalyticsRequest, GenerateReportRequest, GetActivePaymentsMetricRequest, - GetApiEventFiltersRequest, GetApiEventMetricRequest, GetAuthEventMetricRequest, - GetDisputeMetricRequest, GetFrmFilterRequest, GetFrmMetricRequest, - GetPaymentFiltersRequest, GetPaymentIntentFiltersRequest, GetPaymentIntentMetricRequest, - GetPaymentMetricRequest, GetRefundFilterRequest, GetRefundMetricRequest, - GetSdkEventFiltersRequest, GetSdkEventMetricRequest, ReportRequest, + GetApiEventFiltersRequest, GetApiEventMetricRequest, GetAuthEventFilterRequest, + GetAuthEventMetricRequest, GetDisputeMetricRequest, GetFrmFilterRequest, + GetFrmMetricRequest, GetPaymentFiltersRequest, GetPaymentIntentFiltersRequest, + GetPaymentIntentMetricRequest, GetPaymentMetricRequest, GetRefundFilterRequest, + GetRefundMetricRequest, GetSdkEventFiltersRequest, GetSdkEventMetricRequest, ReportRequest, }; use common_enums::EntityType; use common_utils::types::TimeRange; @@ -106,6 +106,10 @@ pub mod routes { web::resource("metrics/auth_events") .route(web::post().to(get_auth_event_metrics)), ) + .service( + web::resource("filters/auth_events") + .route(web::post().to(get_merchant_auth_events_filters)), + ) .service( web::resource("metrics/frm").route(web::post().to(get_frm_metrics)), ) @@ -1018,6 +1022,34 @@ pub mod routes { .await } + pub async fn get_merchant_auth_events_filters( + state: web::Data, + req: actix_web::HttpRequest, + json_payload: web::Json, + ) -> impl Responder { + let flow = AnalyticsFlow::GetAuthEventFilters; + Box::pin(api::server_wrap( + flow, + state, + &req, + json_payload.into_inner(), + |state, auth: AuthenticationData, req, _| async move { + analytics::auth_events::get_filters( + &state.pool, + req, + auth.merchant_account.get_id(), + ) + .await + .map(ApplicationResponse::Json) + }, + &auth::JWTAuth { + permission: Permission::MerchantAnalyticsRead, + }, + api_locking::LockAction::NotApplicable, + )) + .await + } + pub async fn get_org_payment_filters( state: web::Data, req: actix_web::HttpRequest,