From cc88c0707fe9fa6b54546c0c73b7b46f42497f17 Mon Sep 17 00:00:00 2001 From: Sandeep Kumar <83278309+tsdk02@users.noreply.github.com> Date: Thu, 4 Jul 2024 12:22:27 +0530 Subject: [PATCH] feat(analytics): FRM Analytics (#4880) Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Co-authored-by: Abhitator216 Co-authored-by: Abhishek Kanojia <89402434+Abhitator216@users.noreply.github.com> Co-authored-by: ivor-juspay <138492857+ivor-juspay@users.noreply.github.com> Co-authored-by: Sampras Lopes --- Cargo.lock | 3 + api-reference/openapi_spec.json | 1 - config/config.example.toml | 1 + config/development.toml | 1 + config/docker_compose.toml | 1 + crates/analytics/Cargo.toml | 1 + .../docs/clickhouse/scripts/fraud_check.sql | 132 ++++++++++++ crates/analytics/src/clickhouse.rs | 24 +++ crates/analytics/src/core.rs | 5 + crates/analytics/src/frm.rs | 9 + crates/analytics/src/frm/accumulator.rs | 78 +++++++ crates/analytics/src/frm/core.rs | 193 ++++++++++++++++++ crates/analytics/src/frm/filters.rs | 59 ++++++ crates/analytics/src/frm/metrics.rs | 99 +++++++++ .../src/frm/metrics/frm_blocked_rate.rs | 117 +++++++++++ .../src/frm/metrics/frm_triggered_attempts.rs | 116 +++++++++++ crates/analytics/src/frm/types.rs | 38 ++++ crates/analytics/src/lib.rs | 105 ++++++++++ crates/analytics/src/query.rs | 7 +- crates/analytics/src/sqlx.rs | 77 ++++++- crates/analytics/src/types.rs | 2 + crates/analytics/src/utils.rs | 9 + crates/api_models/Cargo.toml | 1 - crates/api_models/src/analytics.rs | 47 ++++- crates/api_models/src/analytics/frm.rs | 163 +++++++++++++++ crates/common_enums/Cargo.toml | 2 + crates/common_enums/src/enums.rs | 49 ++++- crates/diesel_models/src/enums.rs | 24 --- crates/router/src/analytics.rs | 67 +++++- crates/router/src/db.rs | 66 +++++- crates/router/src/db/fraud_check.rs | 30 --- crates/router/src/db/kafka_store.rs | 2 +- crates/router/src/events.rs | 1 + crates/router/src/services/kafka.rs | 44 +++- .../router/src/services/kafka/fraud_check.rs | 67 ++++++ .../src/services/kafka/fraud_check_event.rs | 66 ++++++ 36 files changed, 1629 insertions(+), 78 deletions(-) create mode 100644 crates/analytics/docs/clickhouse/scripts/fraud_check.sql create mode 100644 crates/analytics/src/frm.rs create mode 100644 crates/analytics/src/frm/accumulator.rs create mode 100644 crates/analytics/src/frm/core.rs create mode 100644 crates/analytics/src/frm/filters.rs create mode 100644 crates/analytics/src/frm/metrics.rs create mode 100644 crates/analytics/src/frm/metrics/frm_blocked_rate.rs create mode 100644 crates/analytics/src/frm/metrics/frm_triggered_attempts.rs create mode 100644 crates/analytics/src/frm/types.rs create mode 100644 crates/api_models/src/analytics/frm.rs create mode 100644 crates/router/src/services/kafka/fraud_check.rs create mode 100644 crates/router/src/services/kafka/fraud_check_event.rs diff --git a/Cargo.lock b/Cargo.lock index e0b6c4f623..6018680728 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -340,6 +340,7 @@ dependencies = [ "aws-sdk-lambda", "aws-smithy-types 1.1.8", "bigdecimal", + "common_enums", "common_utils", "diesel_models", "error-stack", @@ -1943,6 +1944,8 @@ name = "common_enums" version = "0.1.0" dependencies = [ "diesel", + "frunk", + "frunk_core", "router_derive", "serde", "serde_json", diff --git a/api-reference/openapi_spec.json b/api-reference/openapi_spec.json index 5a04fee131..cbad4558da 100644 --- a/api-reference/openapi_spec.json +++ b/api-reference/openapi_spec.json @@ -7228,7 +7228,6 @@ }, "CaptureStatus": { "type": "string", - "description": "The status of the capture", "enum": [ "started", "charged", diff --git a/config/config.example.toml b/config/config.example.toml index ff7484e9cd..fd6e34f533 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -623,6 +623,7 @@ source = "logs" # The event sink to push events supports kafka or logs (stdout) [events.kafka] brokers = [] # Kafka broker urls for bootstrapping the client +fraud_check_analytics_topic = "topic" # Kafka topic to be used for FraudCheck events intent_analytics_topic = "topic" # Kafka topic to be used for PaymentIntent events attempt_analytics_topic = "topic" # Kafka topic to be used for PaymentAttempt events refund_analytics_topic = "topic" # Kafka topic to be used for Refund events diff --git a/config/development.toml b/config/development.toml index 40fc5cf071..12ff04b012 100644 --- a/config/development.toml +++ b/config/development.toml @@ -615,6 +615,7 @@ source = "logs" [events.kafka] brokers = ["localhost:9092"] +fraud_check_analytics_topic= "hyperswitch-fraud-check-events" intent_analytics_topic = "hyperswitch-payment-intent-events" attempt_analytics_topic = "hyperswitch-payment-attempt-events" refund_analytics_topic = "hyperswitch-refund-events" diff --git a/config/docker_compose.toml b/config/docker_compose.toml index 0472204f63..28bf8fe8bb 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -435,6 +435,7 @@ delay_between_retries_in_milliseconds = 500 [events.kafka] brokers = ["localhost:9092"] +fraud_check_analytics_topic= "hyperswitch-fraud-check-events" intent_analytics_topic = "hyperswitch-payment-intent-events" attempt_analytics_topic = "hyperswitch-payment-attempt-events" refund_analytics_topic = "hyperswitch-refund-events" diff --git a/crates/analytics/Cargo.toml b/crates/analytics/Cargo.toml index 52680df5c4..8cf4b9b911 100644 --- a/crates/analytics/Cargo.toml +++ b/crates/analytics/Cargo.toml @@ -15,6 +15,7 @@ api_models = { version = "0.1.0", path = "../api_models", features = [ "errors", ] } storage_impl = { version = "0.1.0", path = "../storage_impl", default-features = false } +common_enums = { version = "0.1.0", path = "../common_enums" } common_utils = { version = "0.1.0", path = "../common_utils" } external_services = { version = "0.1.0", path = "../external_services", default-features = false } hyperswitch_interfaces = { version = "0.1.0", path = "../hyperswitch_interfaces" } diff --git a/crates/analytics/docs/clickhouse/scripts/fraud_check.sql b/crates/analytics/docs/clickhouse/scripts/fraud_check.sql new file mode 100644 index 0000000000..19e535981b --- /dev/null +++ b/crates/analytics/docs/clickhouse/scripts/fraud_check.sql @@ -0,0 +1,132 @@ +CREATE TABLE fraud_check_queue ( + `frm_id` String, + `payment_id` String, + `merchant_id` String, + `attempt_id` String, + `created_at` DateTime CODEC(T64, LZ4), + `frm_name` LowCardinality(String), + `frm_transaction_id` String, + `frm_transaction_type` LowCardinality(String), + `frm_status` LowCardinality(String), + `frm_score` Int32, + `frm_reason` LowCardinality(String), + `frm_error` Nullable(String), + `amount` UInt32, + `currency` LowCardinality(String), + `payment_method` LowCardinality(String), + `payment_method_type` LowCardinality(String), + `refund_transaction_id` Nullable(String), + `metadata` Nullable(String), + `modified_at` DateTime CODEC(T64, LZ4), + `last_step` LowCardinality(String), + `payment_capture_method` LowCardinality(String), + `sign_flag` Int8 +) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092', +kafka_topic_list = 'hyperswitch-fraud-check-events', +kafka_group_name = 'hyper', +kafka_format = 'JSONEachRow', +kafka_handle_error_mode = 'stream'; + +CREATE TABLE fraud_check ( + `frm_id` String, + `payment_id` String, + `merchant_id` LowCardinality(String), + `attempt_id` String, + `created_at` DateTime DEFAULT now() CODEC(T64, LZ4), + `frm_name` LowCardinality(String), + `frm_transaction_id` String, + `frm_transaction_type` LowCardinality(String), + `frm_status` LowCardinality(String), + `frm_score` Int32, + `frm_reason` LowCardinality(String), + `frm_error` Nullable(String), + `amount` UInt32, + `currency` LowCardinality(String), + `payment_method` LowCardinality(String), + `payment_method_type` LowCardinality(String), + `refund_transaction_id` Nullable(String), + `metadata` Nullable(String), + `modified_at` DateTime DEFAULT now() CODEC(T64, LZ4), + `last_step` LowCardinality(String), + `payment_capture_method` LowCardinality(String), + `sign_flag` Int8 + INDEX frmNameIndex frm_name TYPE bloom_filter GRANULARITY 1, + INDEX frmStatusIndex frm_status TYPE bloom_filter GRANULARITY 1, + INDEX paymentMethodIndex payment_method TYPE bloom_filter GRANULARITY 1, + INDEX paymentMethodTypeIndex payment_method_type TYPE bloom_filter GRANULARITY 1, + INDEX currencyIndex currency TYPE bloom_filter GRANULARITY 1 +) ENGINE = CollapsingMergeTree(sign_flag) PARTITION BY toStartOfDay(created_at) +ORDER BY + (created_at, merchant_id, attempt_id, frm_id) TTL created_at + toIntervalMonth(18) SETTINGS index_granularity = 8192; + +CREATE MATERIALIZED VIEW fraud_check_mv TO fraud_check ( + `frm_id` String, + `payment_id` String, + `merchant_id` String, + `attempt_id` String, + `created_at` DateTime64(3), + `frm_name` LowCardinality(String), + `frm_transaction_id` String, + `frm_transaction_type` LowCardinality(String), + `frm_status` LowCardinality(String), + `frm_score` Int32, + `frm_reason` LowCardinality(String), + `frm_error` Nullable(String), + `amount` UInt32, + `currency` LowCardinality(String), + `payment_method` LowCardinality(String), + `payment_method_type` LowCardinality(String), + `refund_transaction_id` Nullable(String), + `metadata` Nullable(String), + `modified_at` DateTime64(3), + `last_step` LowCardinality(String), + `payment_capture_method` LowCardinality(String), + `sign_flag` Int8 +) AS +SELECT + frm_id, + payment_id, + merchant_id, + attempt_id, + created_at, + frm_name, + frm_transaction_id, + frm_transaction_type, + frm_status, + frm_score, + frm_reason, + frm_error, + amount, + currency, + payment_method, + payment_method_type, + refund_transaction_id, + metadata, + modified_at, + last_step, + payment_capture_method, + sign_flag +FROM + fraud_check_queue +WHERE + length(_error) = 0; + +CREATE MATERIALIZED VIEW fraud_check_parse_errors ( + `topic` String, + `partition` Int64, + `offset` Int64, + `raw` String, + `error` String +) ENGINE = MergeTree +ORDER BY + (topic, partition, offset) SETTINGS index_granularity = 8192 AS +SELECT + _topic AS topic, + _partition AS partition, + _offset AS offset, + _raw_message AS raw, + _error AS error +FROM + fraud_check_queue +WHERE + length(_error) > 0; \ No newline at end of file diff --git a/crates/analytics/src/clickhouse.rs b/crates/analytics/src/clickhouse.rs index b455e79b25..ffca548713 100644 --- a/crates/analytics/src/clickhouse.rs +++ b/crates/analytics/src/clickhouse.rs @@ -9,6 +9,7 @@ use time::PrimitiveDateTime; use super::{ active_payments::metrics::ActivePaymentsMetricRow, auth_events::metrics::AuthEventMetricRow, + frm::{filters::FrmFilterRow, metrics::FrmMetricRow}, health_check::HealthCheck, payment_intents::{filters::PaymentIntentFilterRow, metrics::PaymentIntentMetricRow}, payments::{ @@ -130,6 +131,7 @@ impl AnalyticsDataSource for ClickhouseClient { match table { AnalyticsCollection::Payment | AnalyticsCollection::Refund + | AnalyticsCollection::FraudCheck | AnalyticsCollection::PaymentIntent | AnalyticsCollection::Dispute => { TableEngine::CollapsingMergeTree { sign: "sign_flag" } @@ -162,6 +164,8 @@ impl super::payment_intents::filters::PaymentIntentFilterAnalytics for Clickhous impl super::payment_intents::metrics::PaymentIntentMetricAnalytics for ClickhouseClient {} impl super::refunds::metrics::RefundMetricAnalytics for ClickhouseClient {} impl super::refunds::filters::RefundFilterAnalytics for ClickhouseClient {} +impl super::frm::metrics::FrmMetricAnalytics for ClickhouseClient {} +impl super::frm::filters::FrmFilterAnalytics for ClickhouseClient {} impl super::sdk_events::filters::SdkEventFilterAnalytics for ClickhouseClient {} impl super::sdk_events::metrics::SdkEventMetricAnalytics for ClickhouseClient {} impl super::sdk_events::events::SdkEventsFilterAnalytics for ClickhouseClient {} @@ -290,6 +294,25 @@ impl TryInto for serde_json::Value { } } +impl TryInto for serde_json::Value { + type Error = Report; + + fn try_into(self) -> Result { + serde_json::from_value(self).change_context(ParsingError::StructParseFailure( + "Failed to parse FrmMetricRow in clickhouse results", + )) + } +} + +impl TryInto for serde_json::Value { + type Error = Report; + + fn try_into(self) -> Result { + serde_json::from_value(self).change_context(ParsingError::StructParseFailure( + "Failed to parse FrmFilterRow in clickhouse results", + )) + } +} impl TryInto for serde_json::Value { type Error = Report; @@ -409,6 +432,7 @@ impl ToSql for AnalyticsCollection { match self { Self::Payment => Ok("payment_attempts".to_string()), Self::Refund => Ok("refunds".to_string()), + Self::FraudCheck => Ok("fraud_check".to_string()), Self::SdkEvents => Ok("sdk_events_audit".to_string()), Self::SdkEventsAnalytics => Ok("sdk_events".to_string()), Self::ApiEvents => Ok("api_events_audit".to_string()), diff --git a/crates/analytics/src/core.rs b/crates/analytics/src/core.rs index 2c5945f75b..0e3ced7993 100644 --- a/crates/analytics/src/core.rs +++ b/crates/analytics/src/core.rs @@ -21,6 +21,11 @@ pub async fn get_domain_info( download_dimensions: None, dimensions: utils::get_refund_dimensions(), }, + AnalyticsDomain::Frm => GetInfoResponse { + metrics: utils::get_frm_metrics_info(), + download_dimensions: None, + dimensions: utils::get_frm_dimensions(), + }, AnalyticsDomain::SdkEvents => GetInfoResponse { metrics: utils::get_sdk_event_metrics_info(), download_dimensions: None, diff --git a/crates/analytics/src/frm.rs b/crates/analytics/src/frm.rs new file mode 100644 index 0000000000..7598bddaae --- /dev/null +++ b/crates/analytics/src/frm.rs @@ -0,0 +1,9 @@ +pub mod accumulator; +mod core; + +pub mod filters; +pub mod metrics; +pub mod types; +pub use accumulator::{FrmMetricAccumulator, FrmMetricsAccumulator}; + +pub use self::core::{get_filters, get_metrics}; diff --git a/crates/analytics/src/frm/accumulator.rs b/crates/analytics/src/frm/accumulator.rs new file mode 100644 index 0000000000..04b60beb98 --- /dev/null +++ b/crates/analytics/src/frm/accumulator.rs @@ -0,0 +1,78 @@ +use api_models::analytics::frm::FrmMetricsBucketValue; +use common_enums::enums as storage_enums; + +use super::metrics::FrmMetricRow; +#[derive(Debug, Default)] +pub struct FrmMetricsAccumulator { + pub frm_triggered_attempts: TriggeredAttemptsAccumulator, + pub frm_blocked_rate: BlockedRateAccumulator, +} + +#[derive(Debug, Default)] +#[repr(transparent)] +pub struct TriggeredAttemptsAccumulator { + pub count: Option, +} + +#[derive(Debug, Default)] +pub struct BlockedRateAccumulator { + pub fraud: i64, + pub total: i64, +} + +pub trait FrmMetricAccumulator { + type MetricOutput; + + fn add_metrics_bucket(&mut self, metrics: &FrmMetricRow); + + fn collect(self) -> Self::MetricOutput; +} + +impl FrmMetricAccumulator for TriggeredAttemptsAccumulator { + type MetricOutput = Option; + #[inline] + fn add_metrics_bucket(&mut self, metrics: &FrmMetricRow) { + self.count = match (self.count, metrics.count) { + (None, None) => None, + (None, i @ Some(_)) | (i @ Some(_), None) => i, + (Some(a), Some(b)) => Some(a + b), + } + } + #[inline] + fn collect(self) -> Self::MetricOutput { + self.count.and_then(|i| u64::try_from(i).ok()) + } +} + +impl FrmMetricAccumulator for BlockedRateAccumulator { + type MetricOutput = Option; + + fn add_metrics_bucket(&mut self, metrics: &FrmMetricRow) { + if let Some(ref frm_status) = metrics.frm_status { + if frm_status.as_ref() == &storage_enums::FraudCheckStatus::Fraud { + self.fraud += metrics.count.unwrap_or_default(); + } + }; + self.total += metrics.count.unwrap_or_default(); + } + + fn collect(self) -> Self::MetricOutput { + if self.total <= 0 { + None + } else { + Some( + f64::from(u32::try_from(self.fraud).ok()?) * 100.0 + / f64::from(u32::try_from(self.total).ok()?), + ) + } + } +} + +impl FrmMetricsAccumulator { + pub fn collect(self) -> FrmMetricsBucketValue { + FrmMetricsBucketValue { + frm_blocked_rate: self.frm_blocked_rate.collect(), + frm_triggered_attempts: self.frm_triggered_attempts.collect(), + } + } +} diff --git a/crates/analytics/src/frm/core.rs b/crates/analytics/src/frm/core.rs new file mode 100644 index 0000000000..9c9e73b49a --- /dev/null +++ b/crates/analytics/src/frm/core.rs @@ -0,0 +1,193 @@ +#![allow(dead_code)] +use std::collections::HashMap; + +use api_models::analytics::{ + frm::{FrmDimensions, FrmMetrics, FrmMetricsBucketIdentifier, FrmMetricsBucketResponse}, + AnalyticsMetadata, FrmFilterValue, FrmFiltersResponse, GetFrmFilterRequest, + GetFrmMetricRequest, MetricsResponse, +}; +use error_stack::ResultExt; +use router_env::{ + logger, + metrics::add_attributes, + tracing::{self, Instrument}, +}; + +use super::{ + filters::{get_frm_filter_for_dimension, FrmFilterRow}, + FrmMetricsAccumulator, +}; +use crate::{ + errors::{AnalyticsError, AnalyticsResult}, + frm::FrmMetricAccumulator, + metrics, AnalyticsProvider, +}; + +pub async fn get_metrics( + pool: &AnalyticsProvider, + merchant_id: &String, + req: GetFrmMetricRequest, +) -> AnalyticsResult> { + let mut metrics_accumulator: HashMap = + HashMap::new(); + let mut set = tokio::task::JoinSet::new(); + for metric_type in req.metrics.iter().cloned() { + let req = req.clone(); + let pool = pool.clone(); + let task_span = + tracing::debug_span!("analytics_frm_query", frm_metric = metric_type.as_ref()); + // Currently JoinSet works with only static lifetime references even if the task pool does not outlive the given reference + // We can optimize away this clone once that is fixed + let merchant_id_scoped = merchant_id.to_owned(); + set.spawn( + async move { + let data = pool + .get_frm_metrics( + &metric_type, + &req.group_by_names.clone(), + &merchant_id_scoped, + &req.filters, + &req.time_series.map(|t| t.granularity), + &req.time_range, + ) + .await + .change_context(AnalyticsError::UnknownError); + (metric_type, data) + } + .instrument(task_span), + ); + } + + while let Some((metric, data)) = set + .join_next() + .await + .transpose() + .change_context(AnalyticsError::UnknownError)? + { + let data = data?; + + let attributes = &add_attributes([ + ("metric_type", metric.to_string()), + ("source", pool.to_string()), + ]); + let value = u64::try_from(data.len()); + if let Ok(val) = value { + metrics::BUCKETS_FETCHED.record(&metrics::CONTEXT, val, attributes); + logger::debug!("Attributes: {:?}, Buckets fetched: {}", attributes, val); + } + + for (id, value) in data { + logger::debug!(bucket_id=?id, bucket_value=?value, "Bucket row for metric {metric}"); + let metrics_builder = metrics_accumulator.entry(id).or_default(); + match metric { + FrmMetrics::FrmBlockedRate => { + metrics_builder.frm_blocked_rate.add_metrics_bucket(&value) + } + FrmMetrics::FrmTriggeredAttempts => metrics_builder + .frm_triggered_attempts + .add_metrics_bucket(&value), + } + } + + logger::debug!( + "Analytics Accumulated Results: metric: {}, results: {:#?}", + metric, + metrics_accumulator + ); + } + let query_data: Vec = metrics_accumulator + .into_iter() + .map(|(id, val)| FrmMetricsBucketResponse { + values: val.collect(), + dimensions: id, + }) + .collect(); + + Ok(MetricsResponse { + query_data, + meta_data: [AnalyticsMetadata { + current_time_range: req.time_range, + }], + }) +} + +pub async fn get_filters( + pool: &AnalyticsProvider, + req: GetFrmFilterRequest, + merchant_id: &String, +) -> AnalyticsResult { + let mut res = FrmFiltersResponse::default(); + for dim in req.group_by_names { + let values = match pool { + AnalyticsProvider::Sqlx(pool) => { + get_frm_filter_for_dimension(dim, merchant_id, &req.time_range, pool) + .await +} + AnalyticsProvider::Clickhouse(pool) => { + get_frm_filter_for_dimension(dim, merchant_id, &req.time_range, pool) + .await +} + AnalyticsProvider::CombinedCkh(sqlx_pool, ckh_pool) => { + let ckh_result = get_frm_filter_for_dimension( + dim, + merchant_id, + &req.time_range, + ckh_pool, + ) + .await; + let sqlx_result = get_frm_filter_for_dimension( + dim, + merchant_id, + &req.time_range, + sqlx_pool, + ) + .await; + match (&sqlx_result, &ckh_result) { + (Ok(ref sqlx_res), Ok(ref ckh_res)) if sqlx_res != ckh_res => { + logger::error!(clickhouse_result=?ckh_res, postgres_result=?sqlx_res, "Mismatch between clickhouse & postgres frm analytics filters") + }, + _ => {} + }; + ckh_result +} + AnalyticsProvider::CombinedSqlx(sqlx_pool, ckh_pool) => { + let ckh_result = get_frm_filter_for_dimension( + dim, + merchant_id, + &req.time_range, + ckh_pool, + ) + .await; + let sqlx_result = get_frm_filter_for_dimension( + dim, + merchant_id, + &req.time_range, + sqlx_pool, + ) + .await; + match (&sqlx_result, &ckh_result) { + (Ok(ref sqlx_res), Ok(ref ckh_res)) if sqlx_res != ckh_res => { + logger::error!(clickhouse_result=?ckh_res, postgres_result=?sqlx_res, "Mismatch between clickhouse & postgres frm analytics filters") + }, + _ => {} + }; + sqlx_result +} +} + .change_context(AnalyticsError::UnknownError)? + .into_iter() + .filter_map(|fil: FrmFilterRow| match dim { + FrmDimensions::FrmStatus => fil.frm_status.map(|i| i.as_ref().to_string()), + FrmDimensions::FrmName => fil.frm_name, + FrmDimensions::FrmTransactionType => { + fil.frm_transaction_type.map(|i| i.as_ref().to_string()) + } + }) + .collect::>(); + res.query_data.push(FrmFilterValue { + dimension: dim, + values, + }) + } + Ok(res) +} diff --git a/crates/analytics/src/frm/filters.rs b/crates/analytics/src/frm/filters.rs new file mode 100644 index 0000000000..c019f61d42 --- /dev/null +++ b/crates/analytics/src/frm/filters.rs @@ -0,0 +1,59 @@ +use api_models::analytics::{ + frm::{FrmDimensions, FrmTransactionType}, + Granularity, TimeRange, +}; +use common_utils::errors::ReportSwitchExt; +use diesel_models::enums::FraudCheckStatus; +use error_stack::ResultExt; +use time::PrimitiveDateTime; + +use crate::{ + query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, ToSql, Window}, + types::{ + AnalyticsCollection, AnalyticsDataSource, DBEnumWrapper, FiltersError, FiltersResult, + LoadRow, + }, +}; +pub trait FrmFilterAnalytics: LoadRow {} + +pub async fn get_frm_filter_for_dimension( + dimension: FrmDimensions, + merchant: &String, + time_range: &TimeRange, + pool: &T, +) -> FiltersResult> +where + T: AnalyticsDataSource + FrmFilterAnalytics, + PrimitiveDateTime: ToSql, + AnalyticsCollection: ToSql, + Granularity: GroupByClause, + Aggregate<&'static str>: ToSql, + Window<&'static str>: ToSql, +{ + let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::FraudCheck); + + query_builder.add_select_column(dimension).switch()?; + time_range + .set_filter_clause(&mut query_builder) + .attach_printable("Error filtering time range") + .switch()?; + + query_builder + .add_filter_clause("merchant_id", merchant) + .switch()?; + + query_builder.set_distinct(); + + query_builder + .execute_query::(pool) + .await + .change_context(FiltersError::QueryBuildingError)? + .change_context(FiltersError::QueryExecutionFailure) +} + +#[derive(Debug, serde::Serialize, Eq, PartialEq, serde::Deserialize)] +pub struct FrmFilterRow { + pub frm_status: Option>, + pub frm_transaction_type: Option>, + pub frm_name: Option, +} diff --git a/crates/analytics/src/frm/metrics.rs b/crates/analytics/src/frm/metrics.rs new file mode 100644 index 0000000000..8f90409098 --- /dev/null +++ b/crates/analytics/src/frm/metrics.rs @@ -0,0 +1,99 @@ +use api_models::analytics::{ + frm::{FrmDimensions, FrmFilters, FrmMetrics, FrmMetricsBucketIdentifier, FrmTransactionType}, + Granularity, TimeRange, +}; +use diesel_models::enums as storage_enums; +use time::PrimitiveDateTime; +mod frm_blocked_rate; +mod frm_triggered_attempts; + +use frm_blocked_rate::FrmBlockedRate; +use frm_triggered_attempts::FrmTriggeredAttempts; + +use crate::{ + query::{Aggregate, GroupByClause, ToSql, Window}, + types::{AnalyticsCollection, AnalyticsDataSource, DBEnumWrapper, LoadRow, MetricsResult}, +}; +#[derive(Debug, Eq, PartialEq, serde::Deserialize)] +pub struct FrmMetricRow { + pub frm_name: Option, + pub frm_status: Option>, + pub frm_transaction_type: Option>, + pub total: Option, + pub count: Option, + #[serde(with = "common_utils::custom_serde::iso8601::option")] + pub start_bucket: Option, + #[serde(with = "common_utils::custom_serde::iso8601::option")] + pub end_bucket: Option, +} + +pub trait FrmMetricAnalytics: LoadRow {} + +#[async_trait::async_trait] +pub trait FrmMetric +where + T: AnalyticsDataSource + FrmMetricAnalytics, + PrimitiveDateTime: ToSql, + AnalyticsCollection: ToSql, + Granularity: GroupByClause, + Aggregate<&'static str>: ToSql, + Window<&'static str>: ToSql, +{ + async fn load_metrics( + &self, + dimensions: &[FrmDimensions], + merchant_id: &str, + filters: &FrmFilters, + granularity: &Option, + time_range: &TimeRange, + pool: &T, + ) -> MetricsResult>; +} + +#[async_trait::async_trait] +impl FrmMetric for FrmMetrics +where + T: AnalyticsDataSource + FrmMetricAnalytics, + PrimitiveDateTime: ToSql, + AnalyticsCollection: ToSql, + Granularity: GroupByClause, + Aggregate<&'static str>: ToSql, + Window<&'static str>: ToSql, +{ + async fn load_metrics( + &self, + dimensions: &[FrmDimensions], + merchant_id: &str, + filters: &FrmFilters, + granularity: &Option, + time_range: &TimeRange, + pool: &T, + ) -> MetricsResult> { + match self { + Self::FrmTriggeredAttempts => { + FrmTriggeredAttempts::default() + .load_metrics( + dimensions, + merchant_id, + filters, + granularity, + time_range, + pool, + ) + .await + } + Self::FrmBlockedRate => { + FrmBlockedRate::default() + .load_metrics( + dimensions, + merchant_id, + filters, + granularity, + time_range, + pool, + ) + .await + } + } + } +} diff --git a/crates/analytics/src/frm/metrics/frm_blocked_rate.rs b/crates/analytics/src/frm/metrics/frm_blocked_rate.rs new file mode 100644 index 0000000000..5f331feab6 --- /dev/null +++ b/crates/analytics/src/frm/metrics/frm_blocked_rate.rs @@ -0,0 +1,117 @@ +use api_models::analytics::{ + frm::{FrmDimensions, FrmFilters, FrmMetricsBucketIdentifier}, + Granularity, TimeRange, +}; +use common_utils::errors::ReportSwitchExt; +use error_stack::ResultExt; +use time::PrimitiveDateTime; + +use super::FrmMetricRow; +use crate::{ + query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window}, + types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult}, +}; +#[derive(Default)] +pub(super) struct FrmBlockedRate {} + +#[async_trait::async_trait] +impl super::FrmMetric for FrmBlockedRate +where + T: AnalyticsDataSource + super::FrmMetricAnalytics, + PrimitiveDateTime: ToSql, + AnalyticsCollection: ToSql, + Granularity: GroupByClause, + Aggregate<&'static str>: ToSql, + Window<&'static str>: ToSql, +{ + async fn load_metrics( + &self, + dimensions: &[FrmDimensions], + merchant_id: &str, + filters: &FrmFilters, + granularity: &Option, + time_range: &TimeRange, + pool: &T, + ) -> MetricsResult> + where + T: AnalyticsDataSource + super::FrmMetricAnalytics, + { + let mut query_builder = QueryBuilder::new(AnalyticsCollection::FraudCheck); + let mut dimensions = dimensions.to_vec(); + + dimensions.push(FrmDimensions::FrmStatus); + + for dim in dimensions.iter() { + query_builder.add_select_column(dim).switch()?; + } + + query_builder + .add_select_column(Aggregate::Count { + field: None, + alias: Some("count"), + }) + .switch()?; + query_builder + .add_select_column(Aggregate::Min { + field: "created_at", + alias: Some("start_bucket"), + }) + .switch()?; + query_builder + .add_select_column(Aggregate::Max { + field: "created_at", + alias: Some("end_bucket"), + }) + .switch()?; + + filters.set_filter_clause(&mut query_builder).switch()?; + + query_builder + .add_filter_clause("merchant_id", merchant_id) + .switch()?; + + time_range.set_filter_clause(&mut query_builder).switch()?; + + for dim in dimensions.iter() { + query_builder.add_group_by_clause(dim).switch()?; + } + + if let Some(granularity) = granularity.as_ref() { + granularity + .set_group_by_clause(&mut query_builder) + .switch()?; + } + + query_builder + .execute_query::(pool) + .await + .change_context(MetricsError::QueryBuildingError)? + .change_context(MetricsError::QueryExecutionFailure)? + .into_iter() + .map(|i| { + Ok(( + FrmMetricsBucketIdentifier::new( + i.frm_name.as_ref().map(|i| i.to_string()), + None, + i.frm_transaction_type.as_ref().map(|i| i.0.to_string()), + TimeRange { + start_time: match (granularity, i.start_bucket) { + (Some(g), Some(st)) => g.clip_to_start(st)?, + _ => time_range.start_time, + }, + end_time: granularity.as_ref().map_or_else( + || Ok(time_range.end_time), + |g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(), + )?, + }, + ), + i, + )) + }) + .collect::, + crate::query::PostProcessingError, + >>() + .change_context(MetricsError::PostProcessingFailure) + } +} diff --git a/crates/analytics/src/frm/metrics/frm_triggered_attempts.rs b/crates/analytics/src/frm/metrics/frm_triggered_attempts.rs new file mode 100644 index 0000000000..b72345c4e3 --- /dev/null +++ b/crates/analytics/src/frm/metrics/frm_triggered_attempts.rs @@ -0,0 +1,116 @@ +use api_models::analytics::{ + frm::{FrmDimensions, FrmFilters, FrmMetricsBucketIdentifier}, + Granularity, TimeRange, +}; +use common_utils::errors::ReportSwitchExt; +use error_stack::ResultExt; +use time::PrimitiveDateTime; + +use super::FrmMetricRow; +use crate::{ + query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window}, + types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult}, +}; + +#[derive(Default)] +pub(super) struct FrmTriggeredAttempts {} + +#[async_trait::async_trait] +impl super::FrmMetric for FrmTriggeredAttempts +where + T: AnalyticsDataSource + super::FrmMetricAnalytics, + PrimitiveDateTime: ToSql, + AnalyticsCollection: ToSql, + Granularity: GroupByClause, + Aggregate<&'static str>: ToSql, + Window<&'static str>: ToSql, +{ + async fn load_metrics( + &self, + dimensions: &[FrmDimensions], + merchant_id: &str, + filters: &FrmFilters, + granularity: &Option, + time_range: &TimeRange, + pool: &T, + ) -> MetricsResult> { + let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::FraudCheck); + + for dim in dimensions.iter() { + query_builder.add_select_column(dim).switch()?; + } + + query_builder + .add_select_column(Aggregate::Count { + field: None, + alias: Some("count"), + }) + .switch()?; + query_builder + .add_select_column(Aggregate::Min { + field: "created_at", + alias: Some("start_bucket"), + }) + .switch()?; + query_builder + .add_select_column(Aggregate::Max { + field: "created_at", + alias: Some("end_bucket"), + }) + .switch()?; + + filters.set_filter_clause(&mut query_builder).switch()?; + + query_builder + .add_filter_clause("merchant_id", merchant_id) + .switch()?; + + time_range + .set_filter_clause(&mut query_builder) + .attach_printable("Error filtering time range") + .switch()?; + + for dim in dimensions.iter() { + query_builder + .add_group_by_clause(dim) + .attach_printable("Error grouping by dimensions") + .switch()?; + } + + if let Some(granularity) = granularity.as_ref() { + granularity + .set_group_by_clause(&mut query_builder) + .attach_printable("Error adding granularity") + .switch()?; + } + + query_builder + .execute_query::(pool) + .await + .change_context(MetricsError::QueryBuildingError)? + .change_context(MetricsError::QueryExecutionFailure)? + .into_iter() + .map(|i| { + Ok(( + FrmMetricsBucketIdentifier::new( + i.frm_name.as_ref().map(|i| i.to_string()), + i.frm_status.as_ref().map(|i| i.0.to_string()), + i.frm_transaction_type.as_ref().map(|i| i.0.to_string()), + TimeRange { + start_time: match (granularity, i.start_bucket) { + (Some(g), Some(st)) => g.clip_to_start(st)?, + _ => time_range.start_time, + }, + end_time: granularity.as_ref().map_or_else( + || Ok(time_range.end_time), + |g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(), + )?, + }, + ), + i, + )) + }) + .collect::, crate::query::PostProcessingError>>() + .change_context(MetricsError::PostProcessingFailure) + } +} diff --git a/crates/analytics/src/frm/types.rs b/crates/analytics/src/frm/types.rs new file mode 100644 index 0000000000..72fc65098b --- /dev/null +++ b/crates/analytics/src/frm/types.rs @@ -0,0 +1,38 @@ +use api_models::analytics::frm::{FrmDimensions, FrmFilters}; +use error_stack::ResultExt; + +use crate::{ + query::{QueryBuilder, QueryFilter, QueryResult, ToSql}, + types::{AnalyticsCollection, AnalyticsDataSource}, +}; + +impl QueryFilter for FrmFilters +where + T: AnalyticsDataSource, + AnalyticsCollection: ToSql, +{ + fn set_filter_clause(&self, builder: &mut QueryBuilder) -> QueryResult<()> { + if !self.frm_status.is_empty() { + builder + .add_filter_in_range_clause(FrmDimensions::FrmStatus, &self.frm_status) + .attach_printable("Error adding frm status filter")?; + } + + if !self.frm_name.is_empty() { + builder + .add_filter_in_range_clause(FrmDimensions::FrmName, &self.frm_name) + .attach_printable("Error adding frm name filter")?; + } + + if !self.frm_transaction_type.is_empty() { + builder + .add_filter_in_range_clause( + FrmDimensions::FrmTransactionType, + &self.frm_transaction_type, + ) + .attach_printable("Error adding frm transaction type filter")?; + } + + Ok(()) + } +} diff --git a/crates/analytics/src/lib.rs b/crates/analytics/src/lib.rs index 943ac9a531..ca9802b7d3 100644 --- a/crates/analytics/src/lib.rs +++ b/crates/analytics/src/lib.rs @@ -2,6 +2,7 @@ mod clickhouse; pub mod core; pub mod disputes; pub mod errors; +pub mod frm; pub mod metrics; pub mod payment_intents; pub mod payments; @@ -40,6 +41,7 @@ use api_models::analytics::{ }, auth_events::{AuthEventMetrics, AuthEventMetricsBucketIdentifier}, disputes::{DisputeDimensions, DisputeFilters, DisputeMetrics, DisputeMetricsBucketIdentifier}, + frm::{FrmDimensions, FrmFilters, FrmMetrics, FrmMetricsBucketIdentifier}, payment_intents::{ PaymentIntentDimensions, PaymentIntentFilters, PaymentIntentMetrics, PaymentIntentMetricsBucketIdentifier, @@ -65,6 +67,7 @@ use strum::Display; use self::{ active_payments::metrics::{ActivePaymentsMetric, ActivePaymentsMetricRow}, auth_events::metrics::{AuthEventMetric, AuthEventMetricRow}, + frm::metrics::{FrmMetric, FrmMetricRow}, payment_intents::metrics::{PaymentIntentMetric, PaymentIntentMetricRow}, payments::{ distribution::{PaymentDistribution, PaymentDistributionRow}, @@ -524,6 +527,106 @@ impl AnalyticsProvider { .await } + pub async fn get_frm_metrics( + &self, + metric: &FrmMetrics, + dimensions: &[FrmDimensions], + merchant_id: &str, + filters: &FrmFilters, + granularity: &Option, + time_range: &TimeRange, + ) -> types::MetricsResult> { + // Metrics to get the fetch time for each refund metric + metrics::request::record_operation_time( + async { + match self { + Self::Sqlx(pool) => { + metric + .load_metrics( + dimensions, + merchant_id, + filters, + granularity, + time_range, + pool, + ) + .await + } + Self::Clickhouse(pool) => { + metric + .load_metrics( + dimensions, + merchant_id, + filters, + granularity, + time_range, + pool, + ) + .await + } + Self::CombinedCkh(sqlx_pool, ckh_pool) => { + let (ckh_result, sqlx_result) = tokio::join!( + metric.load_metrics( + dimensions, + merchant_id, + filters, + granularity, + time_range, + ckh_pool, + ), + metric.load_metrics( + dimensions, + merchant_id, + filters, + granularity, + time_range, + sqlx_pool, + ) + ); + match (&sqlx_result, &ckh_result) { + (Ok(ref sqlx_res), Ok(ref ckh_res)) if sqlx_res != ckh_res => { + logger::error!(clickhouse_result=?ckh_res, postgres_result=?sqlx_res, "Mismatch between clickhouse & postgres frm analytics metrics") + } + _ => {} + }; + ckh_result + } + Self::CombinedSqlx(sqlx_pool, ckh_pool) => { + let (ckh_result, sqlx_result) = tokio::join!( + metric.load_metrics( + dimensions, + merchant_id, + filters, + granularity, + time_range, + ckh_pool, + ), + metric.load_metrics( + dimensions, + merchant_id, + filters, + granularity, + time_range, + sqlx_pool, + ) + ); + match (&sqlx_result, &ckh_result) { + (Ok(ref sqlx_res), Ok(ref ckh_res)) if sqlx_res != ckh_res => { + logger::error!(clickhouse_result=?ckh_res, postgres_result=?sqlx_res, "Mismatch between clickhouse & postgres frm analytics metrics") + } + _ => {} + }; + sqlx_result + } + } + }, + &metrics::METRIC_FETCH_TIME, + metric, + self, + ) + .await + } + pub async fn get_dispute_metrics( &self, metric: &DisputeMetrics, @@ -870,12 +973,14 @@ pub enum AnalyticsFlow { GetPaymentMetrics, GetPaymentIntentMetrics, GetRefundsMetrics, + GetFrmMetrics, GetSdkMetrics, GetAuthMetrics, GetActivePaymentsMetrics, GetPaymentFilters, GetPaymentIntentFilters, GetRefundFilters, + GetFrmFilters, GetSdkEventFilters, GetApiEvents, GetSdkEvents, diff --git a/crates/analytics/src/query.rs b/crates/analytics/src/query.rs index a257fedc09..381deb60b8 100644 --- a/crates/analytics/src/query.rs +++ b/crates/analytics/src/query.rs @@ -6,6 +6,7 @@ use api_models::{ api_event::ApiEventDimensions, auth_events::AuthEventFlows, disputes::DisputeDimensions, + frm::{FrmDimensions, FrmTransactionType}, payment_intents::PaymentIntentDimensions, payments::{PaymentDimensions, PaymentDistributions}, refunds::{RefundDimensions, RefundType}, @@ -19,7 +20,7 @@ use api_models::{ refunds::RefundStatus, }; use common_utils::errors::{CustomResult, ParsingError}; -use diesel_models::enums as storage_enums; +use diesel_models::{enums as storage_enums, enums::FraudCheckStatus}; use error_stack::ResultExt; use router_env::{logger, Flow}; @@ -372,10 +373,12 @@ impl_to_sql_for_to_string!( &PaymentDimensions, &PaymentIntentDimensions, &RefundDimensions, + &FrmDimensions, PaymentDimensions, PaymentIntentDimensions, &PaymentDistributions, RefundDimensions, + FrmDimensions, PaymentMethod, PaymentMethodType, AuthenticationType, @@ -383,9 +386,11 @@ impl_to_sql_for_to_string!( AttemptStatus, IntentStatus, RefundStatus, + FraudCheckStatus, storage_enums::RefundStatus, Currency, RefundType, + FrmTransactionType, Flow, &String, &bool, diff --git a/crates/analytics/src/sqlx.rs b/crates/analytics/src/sqlx.rs index 6a4faf50eb..656a2448a4 100644 --- a/crates/analytics/src/sqlx.rs +++ b/crates/analytics/src/sqlx.rs @@ -1,7 +1,7 @@ use std::{fmt::Display, str::FromStr}; use api_models::{ - analytics::refunds::RefundType, + analytics::{frm::FrmTransactionType, refunds::RefundType}, enums::{DisputeStage, DisputeStatus}, }; use common_utils::{ @@ -9,7 +9,8 @@ use common_utils::{ DbConnectionParams, }; use diesel_models::enums::{ - AttemptStatus, AuthenticationType, Currency, IntentStatus, PaymentMethod, RefundStatus, + AttemptStatus, AuthenticationType, Currency, FraudCheckStatus, IntentStatus, PaymentMethod, + RefundStatus, }; use error_stack::ResultExt; use sqlx::{ @@ -91,6 +92,8 @@ db_type!(IntentStatus); db_type!(PaymentMethod, TEXT); db_type!(RefundStatus); db_type!(RefundType); +db_type!(FraudCheckStatus); +db_type!(FrmTransactionType); db_type!(DisputeStage); db_type!(DisputeStatus); @@ -150,6 +153,8 @@ impl super::refunds::metrics::RefundMetricAnalytics for SqlxClient {} impl super::refunds::filters::RefundFilterAnalytics for SqlxClient {} impl super::disputes::filters::DisputeFilterAnalytics for SqlxClient {} impl super::disputes::metrics::DisputeMetricAnalytics for SqlxClient {} +impl super::frm::metrics::FrmMetricAnalytics for SqlxClient {} +impl super::frm::filters::FrmFilterAnalytics for SqlxClient {} #[async_trait::async_trait] impl AnalyticsDataSource for SqlxClient { @@ -230,6 +235,49 @@ impl<'a> FromRow<'a, PgRow> for super::refunds::metrics::RefundMetricRow { } } +impl<'a> FromRow<'a, PgRow> for super::frm::metrics::FrmMetricRow { + fn from_row(row: &'a PgRow) -> sqlx::Result { + let frm_name: Option = row.try_get("frm_name").or_else(|e| match e { + ColumnNotFound(_) => Ok(Default::default()), + e => Err(e), + })?; + let frm_status: Option> = + row.try_get("frm_status").or_else(|e| match e { + ColumnNotFound(_) => Ok(Default::default()), + e => Err(e), + })?; + let frm_transaction_type: Option> = + row.try_get("frm_transaction_type").or_else(|e| match e { + ColumnNotFound(_) => Ok(Default::default()), + e => Err(e), + })?; + let total: Option = row.try_get("total").or_else(|e| match e { + ColumnNotFound(_) => Ok(Default::default()), + e => Err(e), + })?; + let count: Option = row.try_get("count").or_else(|e| match e { + ColumnNotFound(_) => Ok(Default::default()), + e => Err(e), + })?; + // Removing millisecond precision to get accurate diffs against clickhouse + let start_bucket: Option = row + .try_get::, _>("start_bucket")? + .and_then(|dt| dt.replace_millisecond(0).ok()); + let end_bucket: Option = row + .try_get::, _>("end_bucket")? + .and_then(|dt| dt.replace_millisecond(0).ok()); + Ok(Self { + frm_name, + frm_status, + frm_transaction_type, + total, + count, + start_bucket, + end_bucket, + }) + } +} + impl<'a> FromRow<'a, PgRow> for super::payments::metrics::PaymentMetricRow { fn from_row(row: &'a PgRow) -> sqlx::Result { let currency: Option> = @@ -516,6 +564,30 @@ impl<'a> FromRow<'a, PgRow> for super::refunds::filters::RefundFilterRow { } } +impl<'a> FromRow<'a, PgRow> for super::frm::filters::FrmFilterRow { + fn from_row(row: &'a PgRow) -> sqlx::Result { + let frm_name: Option = row.try_get("frm_name").or_else(|e| match e { + ColumnNotFound(_) => Ok(Default::default()), + e => Err(e), + })?; + let frm_status: Option> = + row.try_get("frm_status").or_else(|e| match e { + ColumnNotFound(_) => Ok(Default::default()), + e => Err(e), + })?; + let frm_transaction_type: Option> = + row.try_get("frm_transaction_type").or_else(|e| match e { + ColumnNotFound(_) => Ok(Default::default()), + e => Err(e), + })?; + Ok(Self { + frm_name, + frm_status, + frm_transaction_type, + }) + } +} + impl<'a> FromRow<'a, PgRow> for super::disputes::filters::DisputeFilterRow { fn from_row(row: &'a PgRow) -> sqlx::Result { let dispute_stage: Option = row.try_get("dispute_stage").or_else(|e| match e { @@ -604,6 +676,7 @@ impl ToSql for AnalyticsCollection { .attach_printable("SdkEvents table is not implemented for Sqlx"))?, Self::ApiEvents => Err(error_stack::report!(ParsingError::UnknownError) .attach_printable("ApiEvents table is not implemented for Sqlx"))?, + Self::FraudCheck => Ok("fraud_check".to_string()), Self::PaymentIntent => Ok("payment_intent".to_string()), Self::ConnectorEvents => Err(error_stack::report!(ParsingError::UnknownError) .attach_printable("ConnectorEvents table is not implemented for Sqlx"))?, diff --git a/crates/analytics/src/types.rs b/crates/analytics/src/types.rs index 816c77fd30..12a82801d5 100644 --- a/crates/analytics/src/types.rs +++ b/crates/analytics/src/types.rs @@ -15,6 +15,7 @@ use crate::errors::AnalyticsError; pub enum AnalyticsDomain { Payments, Refunds, + Frm, PaymentIntents, AuthEvents, SdkEvents, @@ -26,6 +27,7 @@ pub enum AnalyticsDomain { pub enum AnalyticsCollection { Payment, Refund, + FraudCheck, SdkEvents, SdkEventsAnalytics, ApiEvents, diff --git a/crates/analytics/src/utils.rs b/crates/analytics/src/utils.rs index 3955a8c1df..7b73f5a1c1 100644 --- a/crates/analytics/src/utils.rs +++ b/crates/analytics/src/utils.rs @@ -2,6 +2,7 @@ use api_models::analytics::{ api_event::{ApiEventDimensions, ApiEventMetrics}, auth_events::AuthEventMetrics, disputes::{DisputeDimensions, DisputeMetrics}, + frm::{FrmDimensions, FrmMetrics}, payment_intents::{PaymentIntentDimensions, PaymentIntentMetrics}, payments::{PaymentDimensions, PaymentMetrics}, refunds::{RefundDimensions, RefundMetrics}, @@ -22,6 +23,10 @@ pub fn get_refund_dimensions() -> Vec { RefundDimensions::iter().map(Into::into).collect() } +pub fn get_frm_dimensions() -> Vec { + FrmDimensions::iter().map(Into::into).collect() +} + pub fn get_sdk_event_dimensions() -> Vec { SdkEventDimensions::iter().map(Into::into).collect() } @@ -42,6 +47,10 @@ pub fn get_refund_metrics_info() -> Vec { RefundMetrics::iter().map(Into::into).collect() } +pub fn get_frm_metrics_info() -> Vec { + FrmMetrics::iter().map(Into::into).collect() +} + pub fn get_sdk_event_metrics_info() -> Vec { SdkEventMetrics::iter().map(Into::into).collect() } diff --git a/crates/api_models/Cargo.toml b/crates/api_models/Cargo.toml index 0bd0b01a27..80737cfbc4 100644 --- a/crates/api_models/Cargo.toml +++ b/crates/api_models/Cargo.toml @@ -35,7 +35,6 @@ url = { version = "2.5.0", features = ["serde"] } utoipa = { version = "4.2.0", features = ["preserve_order", "preserve_path_order"] } frunk = "0.4.2" frunk_core = "0.4.2" - # First party crates cards = { version = "0.1.0", path = "../cards" } common_enums = { version = "0.1.0", path = "../common_enums" } diff --git a/crates/api_models/src/analytics.rs b/crates/api_models/src/analytics.rs index 85a9c3ded0..ca10d7ce59 100644 --- a/crates/api_models/src/analytics.rs +++ b/crates/api_models/src/analytics.rs @@ -1,6 +1,6 @@ use std::collections::HashSet; -use common_utils::pii::EmailStrategy; +use common_utils::{events::ApiEventMetric, pii::EmailStrategy}; use masking::Secret; use self::{ @@ -8,6 +8,7 @@ use self::{ api_event::{ApiEventDimensions, ApiEventMetrics}, auth_events::AuthEventMetrics, disputes::{DisputeDimensions, DisputeMetrics}, + frm::{FrmDimensions, FrmMetrics}, payment_intents::{PaymentIntentDimensions, PaymentIntentMetrics}, payments::{PaymentDimensions, PaymentDistributions, PaymentMetrics}, refunds::{RefundDimensions, RefundMetrics}, @@ -20,6 +21,7 @@ pub mod api_event; pub mod auth_events; pub mod connector_events; pub mod disputes; +pub mod frm; pub mod outgoing_webhook_event; pub mod payment_intents; pub mod payments; @@ -144,6 +146,22 @@ pub struct GetRefundMetricRequest { pub delta: bool, } +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct GetFrmMetricRequest { + pub time_series: Option, + pub time_range: TimeRange, + #[serde(default)] + pub group_by_names: Vec, + #[serde(default)] + pub filters: frm::FrmFilters, + pub metrics: HashSet, + #[serde(default)] + pub delta: bool, +} + +impl ApiEventMetric for GetFrmMetricRequest {} + #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct GetSdkEventMetricRequest { @@ -247,6 +265,33 @@ pub struct RefundFilterValue { pub values: Vec, } +#[derive(Debug, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct GetFrmFilterRequest { + pub time_range: TimeRange, + #[serde(default)] + pub group_by_names: Vec, +} + +impl ApiEventMetric for GetFrmFilterRequest {} + +#[derive(Debug, Default, serde::Serialize, Eq, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct FrmFiltersResponse { + pub query_data: Vec, +} + +impl ApiEventMetric for FrmFiltersResponse {} + +#[derive(Debug, serde::Serialize, Eq, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct FrmFilterValue { + pub dimension: FrmDimensions, + pub values: Vec, +} + +impl ApiEventMetric for FrmFilterValue {} + #[derive(Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct GetSdkEventFiltersRequest { diff --git a/crates/api_models/src/analytics/frm.rs b/crates/api_models/src/analytics/frm.rs new file mode 100644 index 0000000000..3ac45e7589 --- /dev/null +++ b/crates/api_models/src/analytics/frm.rs @@ -0,0 +1,163 @@ +use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, +}; + +use common_enums::enums::FraudCheckStatus; + +#[derive( + Clone, + Copy, + Debug, + Default, + Eq, + PartialEq, + serde::Serialize, + serde::Deserialize, + strum::Display, + strum::EnumString, +)] +#[serde(rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] +pub enum FrmTransactionType { + #[default] + PreFrm, + PostFrm, +} + +use super::{NameDescription, TimeRange}; +#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)] +pub struct FrmFilters { + #[serde(default)] + pub frm_status: Vec, + #[serde(default)] + pub frm_name: Vec, + #[serde(default)] + pub frm_transaction_type: Vec, +} + +#[derive( + Debug, + serde::Serialize, + serde::Deserialize, + strum::AsRefStr, + PartialEq, + PartialOrd, + Eq, + Ord, + strum::Display, + strum::EnumIter, + Clone, + Copy, +)] +#[serde(rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] +pub enum FrmDimensions { + FrmStatus, + FrmName, + FrmTransactionType, +} + +#[derive( + Clone, + Debug, + Hash, + PartialEq, + Eq, + serde::Serialize, + serde::Deserialize, + strum::Display, + strum::EnumIter, + strum::AsRefStr, +)] +#[strum(serialize_all = "snake_case")] +#[serde(rename_all = "snake_case")] +pub enum FrmMetrics { + FrmTriggeredAttempts, + FrmBlockedRate, +} + +pub mod metric_behaviour { + pub struct FrmTriggeredAttempts; + pub struct FrmBlockRate; +} + +impl From for NameDescription { + fn from(value: FrmMetrics) -> Self { + Self { + name: value.to_string(), + desc: String::new(), + } + } +} + +impl From for NameDescription { + fn from(value: FrmDimensions) -> Self { + Self { + name: value.to_string(), + desc: String::new(), + } + } +} + +#[derive(Debug, serde::Serialize, Eq)] +pub struct FrmMetricsBucketIdentifier { + pub frm_status: Option, + pub frm_name: Option, + pub frm_transaction_type: Option, + #[serde(rename = "time_range")] + pub time_bucket: TimeRange, + #[serde(rename = "time_bucket")] + #[serde(with = "common_utils::custom_serde::iso8601custom")] + pub start_time: time::PrimitiveDateTime, +} + +impl Hash for FrmMetricsBucketIdentifier { + fn hash(&self, state: &mut H) { + self.frm_status.hash(state); + self.frm_name.hash(state); + self.frm_transaction_type.hash(state); + self.time_bucket.hash(state); + } +} + +impl PartialEq for FrmMetricsBucketIdentifier { + fn eq(&self, other: &Self) -> bool { + let mut left = DefaultHasher::new(); + self.hash(&mut left); + let mut right = DefaultHasher::new(); + other.hash(&mut right); + left.finish() == right.finish() + } +} + +impl FrmMetricsBucketIdentifier { + pub fn new( + frm_status: Option, + frm_name: Option, + frm_transaction_type: Option, + normalized_time_range: TimeRange, + ) -> Self { + Self { + frm_status, + frm_name, + frm_transaction_type, + time_bucket: normalized_time_range, + start_time: normalized_time_range.start_time, + } + } +} + +#[derive(Debug, serde::Serialize)] +pub struct FrmMetricsBucketValue { + pub frm_triggered_attempts: Option, + pub frm_blocked_rate: Option, +} + +#[derive(Debug, serde::Serialize)] +pub struct FrmMetricsBucketResponse { + #[serde(flatten)] + pub values: FrmMetricsBucketValue, + #[serde(flatten)] + pub dimensions: FrmMetricsBucketIdentifier, +} diff --git a/crates/common_enums/Cargo.toml b/crates/common_enums/Cargo.toml index 5c88236b8a..e364af0407 100644 --- a/crates/common_enums/Cargo.toml +++ b/crates/common_enums/Cargo.toml @@ -18,6 +18,8 @@ serde = { version = "1.0.197", features = ["derive"] } serde_json = "1.0.115" strum = { version = "0.26", features = ["derive"] } utoipa = { version = "4.2.0", features = ["preserve_order", "preserve_path_order"] } +frunk = "0.4.2" +frunk_core = "0.4.2" # First party crates router_derive = { version = "0.1.0", path = "../router_derive" } diff --git a/crates/common_enums/src/enums.rs b/crates/common_enums/src/enums.rs index 2ad0ad35cb..492aed3579 100644 --- a/crates/common_enums/src/enums.rs +++ b/crates/common_enums/src/enums.rs @@ -10,7 +10,8 @@ pub mod diesel_exports { DbBlocklistDataKind as BlocklistDataKind, DbCaptureMethod as CaptureMethod, DbCaptureStatus as CaptureStatus, DbConnectorType as ConnectorType, DbCountryAlpha2 as CountryAlpha2, DbCurrency as Currency, DbDisputeStage as DisputeStage, - DbDisputeStatus as DisputeStatus, DbEventType as EventType, DbFutureUsage as FutureUsage, + DbDisputeStatus as DisputeStatus, DbEventType as EventType, + DbFraudCheckStatus as FraudCheckStatus, DbFutureUsage as FutureUsage, DbIntentStatus as IntentStatus, DbMandateStatus as MandateStatus, DbPaymentMethodIssuerCode as PaymentMethodIssuerCode, DbPaymentType as PaymentType, DbRefundStatus as RefundStatus, @@ -236,6 +237,30 @@ pub enum AuthenticationType { } /// The status of the capture +#[derive( + Clone, + Copy, + Debug, + Default, + Eq, + PartialEq, + serde::Serialize, + serde::Deserialize, + strum::Display, + strum::EnumString, + frunk::LabelledGeneric, +)] +#[router_derive::diesel_enum(storage_type = "db_enum")] +#[strum(serialize_all = "snake_case")] +pub enum FraudCheckStatus { + Fraud, + ManualReview, + #[default] + Pending, + Legit, + TransactionFailure, +} + #[derive( Clone, Copy, @@ -1556,6 +1581,28 @@ pub enum RefundStatus { TransactionFailure, } +#[derive( + Clone, + Copy, + Debug, + Default, + Eq, + Hash, + PartialEq, + strum::Display, + strum::EnumString, + strum::EnumIter, + serde::Serialize, + serde::Deserialize, +)] +#[router_derive::diesel_enum(storage_type = "db_enum")] +#[strum(serialize_all = "snake_case")] +pub enum FrmTransactionType { + #[default] + PreFrm, + PostFrm, +} + /// The status of the mandate, which indicates whether it can be used to initiate a payment. #[derive( Clone, diff --git a/crates/diesel_models/src/enums.rs b/crates/diesel_models/src/enums.rs index e95116f99a..5eeed2990a 100644 --- a/crates/diesel_models/src/enums.rs +++ b/crates/diesel_models/src/enums.rs @@ -194,30 +194,6 @@ pub enum FraudCheckType { PostFrm, } -#[derive( - Clone, - Copy, - Debug, - Default, - Eq, - PartialEq, - serde::Serialize, - serde::Deserialize, - strum::Display, - strum::EnumString, - frunk::LabelledGeneric, -)] -#[diesel_enum(storage_type = "db_enum")] -#[strum(serialize_all = "snake_case")] -pub enum FraudCheckStatus { - Fraud, - ManualReview, - #[default] - Pending, - Legit, - TransactionFailure, -} - #[derive( Clone, Copy, diff --git a/crates/router/src/analytics.rs b/crates/router/src/analytics.rs index 64f62f4876..cf98748048 100644 --- a/crates/router/src/analytics.rs +++ b/crates/router/src/analytics.rs @@ -14,9 +14,10 @@ pub mod routes { }, GenerateReportRequest, GetActivePaymentsMetricRequest, GetApiEventFiltersRequest, GetApiEventMetricRequest, GetAuthEventMetricRequest, GetDisputeMetricRequest, - GetPaymentFiltersRequest, GetPaymentIntentFiltersRequest, GetPaymentIntentMetricRequest, - GetPaymentMetricRequest, GetRefundFilterRequest, GetRefundMetricRequest, - GetSdkEventFiltersRequest, GetSdkEventMetricRequest, ReportRequest, + GetFrmFilterRequest, GetFrmMetricRequest, GetPaymentFiltersRequest, + GetPaymentIntentFiltersRequest, GetPaymentIntentMetricRequest, GetPaymentMetricRequest, + GetRefundFilterRequest, GetRefundMetricRequest, GetSdkEventFiltersRequest, + GetSdkEventMetricRequest, ReportRequest, }; use error_stack::ResultExt; @@ -54,6 +55,9 @@ pub mod routes { web::resource("filters/payments") .route(web::post().to(get_payment_filters)), ) + .service( + web::resource("filters/frm").route(web::post().to(get_frm_filters)), + ) .service( web::resource("filters/refunds") .route(web::post().to(get_refund_filters)), @@ -87,6 +91,9 @@ pub mod routes { web::resource("metrics/auth_events") .route(web::post().to(get_auth_event_metrics)), ) + .service( + web::resource("metrics/frm").route(web::post().to(get_frm_metrics)), + ) .service( web::resource("api_event_logs").route(web::get().to(get_api_events)), ) @@ -270,6 +277,38 @@ pub mod routes { .await } + /// # Panics + /// + /// Panics if `json_payload` array does not contain one `GetFrmMetricRequest` element. + pub async fn get_frm_metrics( + state: web::Data, + req: actix_web::HttpRequest, + json_payload: web::Json<[GetFrmMetricRequest; 1]>, + ) -> impl Responder { + #[allow(clippy::expect_used)] + // safety: This shouldn't panic owing to the data type + let payload = json_payload + .into_inner() + .to_vec() + .pop() + .expect("Couldn't get GetFrmMetricRequest"); + let flow = AnalyticsFlow::GetFrmMetrics; + Box::pin(api::server_wrap( + flow, + state, + &req, + payload, + |state, auth: AuthenticationData, req, _| async move { + analytics::frm::get_metrics(&state.pool, &auth.merchant_account.merchant_id, req) + .await + .map(ApplicationResponse::Json) + }, + &auth::JWTAuth(Permission::Analytics), + api_locking::LockAction::NotApplicable, + )) + .await + } + /// # Panics /// /// Panics if `json_payload` array does not contain one `GetSdkEventMetricRequest` element. @@ -458,6 +497,28 @@ pub mod routes { .await } + pub async fn get_frm_filters( + state: web::Data, + req: actix_web::HttpRequest, + json_payload: web::Json, + ) -> impl Responder { + let flow = AnalyticsFlow::GetFrmFilters; + Box::pin(api::server_wrap( + flow, + state, + &req, + json_payload.into_inner(), + |state, auth: AuthenticationData, req: GetFrmFilterRequest, _| async move { + analytics::frm::get_filters(&state.pool, req, &auth.merchant_account.merchant_id) + .await + .map(ApplicationResponse::Json) + }, + &auth::JWTAuth(Permission::Analytics), + api_locking::LockAction::NotApplicable, + )) + .await + } + pub async fn get_sdk_event_filters( state: web::Data, req: actix_web::HttpRequest, diff --git a/crates/router/src/db.rs b/crates/router/src/db.rs index 2a00e7adbb..34210cf6de 100644 --- a/crates/router/src/db.rs +++ b/crates/router/src/db.rs @@ -36,9 +36,8 @@ pub mod user; pub mod user_authentication_method; pub mod user_key_store; pub mod user_role; - use diesel_models::{ - fraud_check::{FraudCheck, FraudCheckNew, FraudCheckUpdate}, + fraud_check::{FraudCheck, FraudCheckUpdate}, organization::{Organization, OrganizationNew, OrganizationUpdate}, }; use error_stack::ResultExt; @@ -53,16 +52,23 @@ use hyperswitch_domain_models::payouts::{ use hyperswitch_domain_models::{PayoutAttemptInterface, PayoutsInterface}; use masking::PeekInterface; use redis_interface::errors::RedisError; +use router_env::logger; use storage_impl::{errors::StorageError, redis::kv_store::RedisConnInterface, MockDb}; pub use self::kafka_store::KafkaStore; use self::{fraud_check::FraudCheckInterface, organization::OrganizationInterface}; pub use crate::{ + core::errors::{self, ProcessTrackerError}, errors::CustomResult, services::{ kafka::{KafkaError, KafkaProducer, MQResult}, Store, }, + types::{ + domain, + storage::{self}, + AccessToken, + }, }; #[derive(PartialEq, Eq)] @@ -259,36 +265,74 @@ impl RequestIdStore for KafkaStore { impl FraudCheckInterface for KafkaStore { async fn insert_fraud_check_response( &self, - new: FraudCheckNew, + new: storage::FraudCheckNew, ) -> CustomResult { - self.diesel_store.insert_fraud_check_response(new).await + let frm = self.diesel_store.insert_fraud_check_response(new).await?; + if let Err(er) = self + .kafka_producer + .log_fraud_check(&frm, None, self.tenant_id.clone()) + .await + { + logger::error!(message = "Failed to log analytics event for fraud check", error_message = ?er); + } + Ok(frm) } async fn update_fraud_check_response_with_attempt_id( &self, - fraud_check: FraudCheck, - fraud_check_update: FraudCheckUpdate, + this: FraudCheck, + fraud_check: FraudCheckUpdate, ) -> CustomResult { - self.diesel_store - .update_fraud_check_response_with_attempt_id(fraud_check, fraud_check_update) + let frm = self + .diesel_store + .update_fraud_check_response_with_attempt_id(this, fraud_check) + .await?; + if let Err(er) = self + .kafka_producer + .log_fraud_check(&frm, None, self.tenant_id.clone()) .await + { + logger::error!(message="Failed to log analytics event for fraud check {frm:?}", error_message=?er) + } + Ok(frm) } async fn find_fraud_check_by_payment_id( &self, payment_id: String, merchant_id: String, ) -> CustomResult { - self.diesel_store + let frm = self + .diesel_store .find_fraud_check_by_payment_id(payment_id, merchant_id) + .await?; + if let Err(er) = self + .kafka_producer + .log_fraud_check(&frm, None, self.tenant_id.clone()) .await + { + logger::error!(message="Failed to log analytics event for fraud check {frm:?}", error_message=?er) + } + Ok(frm) } async fn find_fraud_check_by_payment_id_if_present( &self, payment_id: String, merchant_id: String, ) -> CustomResult, StorageError> { - self.diesel_store + let frm = self + .diesel_store .find_fraud_check_by_payment_id_if_present(payment_id, merchant_id) - .await + .await?; + + if let Some(fraud_check) = frm.clone() { + if let Err(er) = self + .kafka_producer + .log_fraud_check(&fraud_check, None, self.tenant_id.clone()) + .await + { + logger::error!(message="Failed to log analytics event for frm {frm:?}", error_message=?er); + } + } + Ok(frm) } } diff --git a/crates/router/src/db/fraud_check.rs b/crates/router/src/db/fraud_check.rs index 3481826036..9e27f45101 100644 --- a/crates/router/src/db/fraud_check.rs +++ b/crates/router/src/db/fraud_check.rs @@ -116,33 +116,3 @@ impl FraudCheckInterface for MockDb { Err(errors::StorageError::MockDbError)? } } - -#[cfg(feature = "kafka_events")] -#[async_trait::async_trait] -impl FraudCheckInterface for super::KafkaStore { - #[instrument(skip_all)] - async fn insert_fraud_check_response( - &self, - _new: storage::FraudCheckNew, - ) -> CustomResult { - Err(errors::StorageError::MockDbError)? - } - - #[instrument(skip_all)] - async fn update_fraud_check_response_with_attempt_id( - &self, - _this: FraudCheck, - _fraud_check: FraudCheckUpdate, - ) -> CustomResult { - Err(errors::StorageError::MockDbError)? - } - - #[instrument(skip_all)] - async fn find_fraud_check_by_payment_id( - &self, - _payment_id: String, - _merchant_id: String, - ) -> CustomResult { - Err(errors::StorageError::MockDbError)? - } -} diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index e5686d616d..edf8a53346 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -83,7 +83,7 @@ pub struct TenantID(pub String); #[derive(Clone)] pub struct KafkaStore { - kafka_producer: KafkaProducer, + pub kafka_producer: KafkaProducer, pub diesel_store: Store, pub tenant_id: TenantID, } diff --git a/crates/router/src/events.rs b/crates/router/src/events.rs index 1bdd9016c4..b7ab4ebaa1 100644 --- a/crates/router/src/events.rs +++ b/crates/router/src/events.rs @@ -23,6 +23,7 @@ pub mod outgoing_webhook_logs; #[serde(rename_all = "snake_case")] pub enum EventType { PaymentIntent, + FraudCheck, PaymentAttempt, Refund, ApiLogs, diff --git a/crates/router/src/services/kafka.rs b/crates/router/src/services/kafka.rs index b245eb3fe7..d1abda2105 100644 --- a/crates/router/src/services/kafka.rs +++ b/crates/router/src/services/kafka.rs @@ -11,11 +11,15 @@ use rdkafka::{ }; #[cfg(feature = "payouts")] pub mod payout; -use crate::events::EventType; +use diesel_models::fraud_check::FraudCheck; + +use crate::{events::EventType, services::kafka::fraud_check_event::KafkaFraudCheckEvent}; mod authentication; mod authentication_event; mod dispute; mod dispute_event; +mod fraud_check; +mod fraud_check_event; mod payment_attempt; mod payment_attempt_event; mod payment_intent; @@ -36,7 +40,7 @@ use self::{ payment_intent_event::KafkaPaymentIntentEvent, refund::KafkaRefund, refund_event::KafkaRefundEvent, }; -use crate::types::storage::Dispute; +use crate::{services::kafka::fraud_check::KafkaFraudCheck, types::storage::Dispute}; // Using message queue result here to avoid confusion with Kafka result provided by library pub type MQResult = CustomResult; @@ -139,6 +143,7 @@ impl<'a, T: KafkaMessage> KafkaMessage for KafkaConsolidatedEvent<'a, T> { #[serde(default)] pub struct KafkaSettings { brokers: Vec, + fraud_check_analytics_topic: String, intent_analytics_topic: String, attempt_analytics_topic: String, refund_analytics_topic: String, @@ -246,6 +251,7 @@ impl KafkaSettings { pub struct KafkaProducer { producer: Arc, intent_analytics_topic: String, + fraud_check_analytics_topic: String, attempt_analytics_topic: String, refund_analytics_topic: String, api_logs_topic: String, @@ -288,6 +294,7 @@ impl KafkaProducer { .change_context(KafkaError::InitializationError)?, )), + fraud_check_analytics_topic: conf.fraud_check_analytics_topic.clone(), intent_analytics_topic: conf.intent_analytics_topic.clone(), attempt_analytics_topic: conf.attempt_analytics_topic.clone(), refund_analytics_topic: conf.refund_analytics_topic.clone(), @@ -321,6 +328,38 @@ impl KafkaProducer { .map_err(|(error, record)| report!(error).attach_printable(format!("{record:?}"))) .change_context(KafkaError::GenericError) } + pub async fn log_fraud_check( + &self, + attempt: &FraudCheck, + old_attempt: Option, + tenant_id: TenantID, + ) -> MQResult<()> { + if let Some(negative_event) = old_attempt { + self.log_event(&KafkaEvent::old( + &KafkaFraudCheck::from_storage(&negative_event), + tenant_id.clone(), + )) + .attach_printable_lazy(|| { + format!("Failed to add negative fraud check event {negative_event:?}") + })?; + }; + + self.log_event(&KafkaEvent::new( + &KafkaFraudCheck::from_storage(attempt), + tenant_id.clone(), + )) + .attach_printable_lazy(|| { + format!("Failed to add positive fraud check event {attempt:?}") + })?; + + self.log_event(&KafkaConsolidatedEvent::new( + &KafkaFraudCheckEvent::from_storage(attempt), + tenant_id.clone(), + )) + .attach_printable_lazy(|| { + format!("Failed to add consolidated fraud check event {attempt:?}") + }) + } pub async fn log_payment_attempt( &self, @@ -544,6 +583,7 @@ impl KafkaProducer { pub fn get_topic(&self, event: EventType) -> &str { match event { + EventType::FraudCheck => &self.fraud_check_analytics_topic, EventType::ApiLogs => &self.api_logs_topic, EventType::PaymentAttempt => &self.attempt_analytics_topic, EventType::PaymentIntent => &self.intent_analytics_topic, diff --git a/crates/router/src/services/kafka/fraud_check.rs b/crates/router/src/services/kafka/fraud_check.rs new file mode 100644 index 0000000000..e52fbbbc9a --- /dev/null +++ b/crates/router/src/services/kafka/fraud_check.rs @@ -0,0 +1,67 @@ +// use diesel_models::enums as storage_enums; +use diesel_models::{ + enums as storage_enums, + enums::{FraudCheckLastStep, FraudCheckStatus, FraudCheckType}, + fraud_check::FraudCheck, +}; +use time::OffsetDateTime; + +#[derive(serde::Serialize, Debug)] +pub struct KafkaFraudCheck<'a> { + pub frm_id: &'a String, + pub payment_id: &'a String, + pub merchant_id: &'a String, + pub attempt_id: &'a String, + #[serde(with = "time::serde::timestamp")] + pub created_at: OffsetDateTime, + pub frm_name: &'a String, + pub frm_transaction_id: Option<&'a String>, + pub frm_transaction_type: FraudCheckType, + pub frm_status: FraudCheckStatus, + pub frm_score: Option, + pub frm_reason: Option, + pub frm_error: Option<&'a String>, + pub payment_details: Option, + pub metadata: Option, + #[serde(with = "time::serde::timestamp")] + pub modified_at: OffsetDateTime, + pub last_step: FraudCheckLastStep, + pub payment_capture_method: Option, // In postFrm, we are updating capture method from automatic to manual. To store the merchant actual capture method, we are storing the actual capture method in payment_capture_method. It will be useful while approving the FRM decision. +} + +impl<'a> KafkaFraudCheck<'a> { + pub fn from_storage(check: &'a FraudCheck) -> Self { + Self { + frm_id: &check.frm_id, + payment_id: &check.payment_id, + merchant_id: &check.merchant_id, + attempt_id: &check.attempt_id, + created_at: check.created_at.assume_utc(), + frm_name: &check.frm_name, + frm_transaction_id: check.frm_transaction_id.as_ref(), + frm_transaction_type: check.frm_transaction_type, + frm_status: check.frm_status, + frm_score: check.frm_score, + frm_reason: check.frm_reason.clone(), + frm_error: check.frm_error.as_ref(), + payment_details: check.payment_details.clone(), + metadata: check.metadata.clone(), + modified_at: check.modified_at.assume_utc(), + last_step: check.last_step, + payment_capture_method: check.payment_capture_method, + } + } +} + +impl<'a> super::KafkaMessage for KafkaFraudCheck<'a> { + fn key(&self) -> String { + format!( + "{}_{}_{}_{}", + self.merchant_id, self.payment_id, self.attempt_id, self.frm_id + ) + } + + fn event_type(&self) -> crate::events::EventType { + crate::events::EventType::FraudCheck + } +} diff --git a/crates/router/src/services/kafka/fraud_check_event.rs b/crates/router/src/services/kafka/fraud_check_event.rs new file mode 100644 index 0000000000..57dc4d2087 --- /dev/null +++ b/crates/router/src/services/kafka/fraud_check_event.rs @@ -0,0 +1,66 @@ +use diesel_models::{ + enums as storage_enums, + enums::{FraudCheckLastStep, FraudCheckStatus, FraudCheckType}, + fraud_check::FraudCheck, +}; +use time::OffsetDateTime; + +#[derive(serde::Serialize, Debug)] +pub struct KafkaFraudCheckEvent<'a> { + pub frm_id: &'a String, + pub payment_id: &'a String, + pub merchant_id: &'a String, + pub attempt_id: &'a String, + #[serde(default, with = "time::serde::timestamp::milliseconds")] + pub created_at: OffsetDateTime, + pub frm_name: &'a String, + pub frm_transaction_id: Option<&'a String>, + pub frm_transaction_type: FraudCheckType, + pub frm_status: FraudCheckStatus, + pub frm_score: Option, + pub frm_reason: Option, + pub frm_error: Option<&'a String>, + pub payment_details: Option, + pub metadata: Option, + #[serde(default, with = "time::serde::timestamp::milliseconds")] + pub modified_at: OffsetDateTime, + pub last_step: FraudCheckLastStep, + pub payment_capture_method: Option, // In postFrm, we are updating capture method from automatic to manual. To store the merchant actual capture method, we are storing the actual capture method in payment_capture_method. It will be useful while approving the FRM decision. +} + +impl<'a> KafkaFraudCheckEvent<'a> { + pub fn from_storage(check: &'a FraudCheck) -> Self { + Self { + frm_id: &check.frm_id, + payment_id: &check.payment_id, + merchant_id: &check.merchant_id, + attempt_id: &check.attempt_id, + created_at: check.created_at.assume_utc(), + frm_name: &check.frm_name, + frm_transaction_id: check.frm_transaction_id.as_ref(), + frm_transaction_type: check.frm_transaction_type, + frm_status: check.frm_status, + frm_score: check.frm_score, + frm_reason: check.frm_reason.clone(), + frm_error: check.frm_error.as_ref(), + payment_details: check.payment_details.clone(), + metadata: check.metadata.clone(), + modified_at: check.modified_at.assume_utc(), + last_step: check.last_step, + payment_capture_method: check.payment_capture_method, + } + } +} + +impl<'a> super::KafkaMessage for KafkaFraudCheckEvent<'a> { + fn key(&self) -> String { + format!( + "{}_{}_{}_{}", + self.merchant_id, self.payment_id, self.attempt_id, self.frm_id + ) + } + + fn event_type(&self) -> crate::events::EventType { + crate::events::EventType::FraudCheck + } +}