diff --git a/config/dashboard.toml b/config/dashboard.toml index c00b167315..142cc96650 100644 --- a/config/dashboard.toml +++ b/config/dashboard.toml @@ -25,8 +25,8 @@ test_processors=true feedback=false mixpanel=false generate_report=false -user_journey_analytics=false -authentication_analytics=false +user_journey_analytics=true +authentication_analytics=true surcharge=false dispute_evidence_upload=false paypal_automatic_flow=false diff --git a/config/vector.yaml b/config/vector.yaml index d8f5da3466..1511e09a05 100644 --- a/config/vector.yaml +++ b/config/vector.yaml @@ -29,6 +29,11 @@ sources: node_metrics: type: host_metrics + sdk_source: + type: http_server + address: 0.0.0.0:3103 + encoding: json + transforms: plus_1_events: type: filter @@ -56,6 +61,14 @@ transforms: source: |- .timestamp = from_unix_timestamp!(.created_at, unit: "seconds") + sdk_transformed: + type: throttle + inputs: + - sdk_source + key_field: "{{ .payment_id }}{{ .merchant_id }}" + threshold: 1000 + window_secs: 60 + sinks: opensearch_events: type: elasticsearch @@ -132,3 +145,16 @@ sinks: inputs: - vector_metrics - node_metrics + + sdk_sink: + type: kafka + encoding: + codec: json + except_fields: + - "path" + - "source_type" + inputs: + - "sdk_transformed" + bootstrap_servers: kafka0:29092 + topic: hyper-sdk-logs + key_field: ".merchant_id" diff --git a/crates/analytics/docs/clickhouse/scripts/sdk_events.sql b/crates/analytics/docs/clickhouse/scripts/sdk_events.sql new file mode 100644 index 0000000000..54480fa99b --- /dev/null +++ b/crates/analytics/docs/clickhouse/scripts/sdk_events.sql @@ -0,0 +1,250 @@ +CREATE TABLE sdk_events_queue ( + `payment_id` Nullable(String), + `merchant_id` String, + `remote_ip` Nullable(String), + `log_type` LowCardinality(Nullable(String)), + `event_name` LowCardinality(Nullable(String)), + `first_event` LowCardinality(Nullable(String)), + `latency` Nullable(UInt32), + `timestamp` String, + `browser_name` LowCardinality(Nullable(String)), + `browser_version` Nullable(String), + `platform` LowCardinality(Nullable(String)), + `source` LowCardinality(Nullable(String)), + `category` LowCardinality(Nullable(String)), + `version` LowCardinality(Nullable(String)), + `value` Nullable(String), + `component` LowCardinality(Nullable(String)), + `payment_method` LowCardinality(Nullable(String)), + `payment_experience` LowCardinality(Nullable(String)) +) ENGINE = Kafka SETTINGS + kafka_broker_list = 'kafka0:29092', + kafka_topic_list = 'hyper-sdk-logs', + kafka_group_name = 'hyper-ckh', + kafka_format = 'JSONEachRow', + kafka_handle_error_mode = 'stream'; + +CREATE TABLE sdk_events ( + `payment_id` Nullable(String), + `merchant_id` String, + `remote_ip` Nullable(String), + `log_type` LowCardinality(Nullable(String)), + `event_name` LowCardinality(Nullable(String)), + `first_event` Bool DEFAULT 1, + `browser_name` LowCardinality(Nullable(String)), + `browser_version` Nullable(String), + `platform` LowCardinality(Nullable(String)), + `source` LowCardinality(Nullable(String)), + `category` LowCardinality(Nullable(String)), + `version` LowCardinality(Nullable(String)), + `component` LowCardinality(Nullable(String)), + `payment_method` LowCardinality(Nullable(String)), + `payment_experience` LowCardinality(Nullable(String)) DEFAULT '', + `created_at` DateTime DEFAULT now() CODEC(T64, LZ4), + `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4), + `latency` Nullable(UInt32) DEFAULT 0, + `value` Nullable(String), + `created_at_precise` DateTime64(3), + INDEX paymentMethodIndex payment_method TYPE bloom_filter GRANULARITY 1, + INDEX eventIndex event_name TYPE bloom_filter GRANULARITY 1, + INDEX platformIndex platform TYPE bloom_filter GRANULARITY 1, + INDEX logTypeIndex log_type TYPE bloom_filter GRANULARITY 1, + INDEX categoryIndex category TYPE bloom_filter GRANULARITY 1, + INDEX sourceIndex source TYPE bloom_filter GRANULARITY 1, + INDEX componentIndex component TYPE bloom_filter GRANULARITY 1, + INDEX firstEventIndex first_event TYPE bloom_filter GRANULARITY 1 +) ENGINE = MergeTree +PARTITION BY + toStartOfDay(created_at) +ORDER BY + (created_at, merchant_id) +TTL + toDateTime(created_at) + toIntervalMonth(6) +SETTINGS + index_granularity = 8192 +; + +CREATE MATERIALIZED VIEW sdk_events_mv TO sdk_events ( + `payment_id` Nullable(String), + `merchant_id` String, + `remote_ip` Nullable(String), + `log_type` LowCardinality(Nullable(String)), + `event_name` LowCardinality(Nullable(String)), + `first_event` Bool, + `latency` Nullable(UInt32), + `browser_name` LowCardinality(Nullable(String)), + `browser_version` Nullable(String), + `platform` LowCardinality(Nullable(String)), + `source` LowCardinality(Nullable(String)), + `category` LowCardinality(Nullable(String)), + `version` LowCardinality(Nullable(String)), + `value` Nullable(String), + `component` LowCardinality(Nullable(String)), + `payment_method` LowCardinality(Nullable(String)), + `payment_experience` LowCardinality(Nullable(String)), + `created_at` DateTime64(3), + `created_at_precise` DateTime64(3) +) AS +SELECT + payment_id, + merchant_id, + remote_ip, + log_type, + event_name, + multiIf(first_event = 'true', 1, 0) AS first_event, + latency, + browser_name, + browser_version, + platform, + source, + category, + version, + value, + component, + payment_method, + payment_experience, + toDateTime64(timestamp, 3) AS created_at, + toDateTime64(timestamp, 3) AS created_at_precise +FROM + sdk_events_queue +WHERE length(_error) = 0; + +CREATE TABLE sdk_events_audit ( + `payment_id` String, + `merchant_id` String, + `remote_ip` Nullable(String), + `log_type` LowCardinality(Nullable(String)), + `event_name` LowCardinality(Nullable(String)), + `first_event` Bool DEFAULT 1, + `browser_name` LowCardinality(Nullable(String)), + `browser_version` Nullable(String), + `platform` LowCardinality(Nullable(String)), + `source` LowCardinality(Nullable(String)), + `category` LowCardinality(Nullable(String)), + `version` LowCardinality(Nullable(String)), + `value` Nullable(String), + `component` LowCardinality(Nullable(String)), + `payment_method` LowCardinality(Nullable(String)), + `payment_experience` LowCardinality(Nullable(String)) DEFAULT '', + `created_at` DateTime64(3) DEFAULT now64() CODEC(T64, LZ4), + `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4), + `latency` Nullable(UInt32) DEFAULT 0 +) ENGINE = MergeTree PARTITION BY merchant_id +ORDER BY + (merchant_id, payment_id) + TTL inserted_at + toIntervalMonth(18) +SETTINGS index_granularity = 8192; + +CREATE MATERIALIZED VIEW sdk_events_parse_errors ( + `topic` String, + `partition` Int64, + `offset` Int64, + `raw` String, + `error` String +) ENGINE = MergeTree +ORDER BY + (topic, partition, offset) SETTINGS index_granularity = 8192 AS +SELECT + _topic AS topic, + _partition AS partition, + _offset AS offset, + _raw_message AS raw, + _error AS error +FROM + sdk_events_queue +WHERE + length(_error) > 0; + +CREATE MATERIALIZED VIEW sdk_events_audit_mv TO sdk_events_audit ( + `payment_id` Nullable(String), + `merchant_id` String, + `remote_ip` Nullable(String), + `log_type` LowCardinality(Nullable(String)), + `event_name` LowCardinality(Nullable(String)), + `first_event` Bool, + `latency` Nullable(UInt32), + `browser_name` LowCardinality(Nullable(String)), + `browser_version` Nullable(String), + `platform` LowCardinality(Nullable(String)), + `source` LowCardinality(Nullable(String)), + `category` LowCardinality(Nullable(String)), + `version` LowCardinality(Nullable(String)), + `value` Nullable(String), + `component` LowCardinality(Nullable(String)), + `payment_method` LowCardinality(Nullable(String)), + `payment_experience` LowCardinality(Nullable(String)), + `created_at` DateTime64(3), + `created_at_precise` DateTime64(3), + `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4) +) AS +SELECT + payment_id, + merchant_id, + remote_ip, + log_type, + event_name, + multiIf(first_event = 'true', 1, 0) AS first_event, + latency, + browser_name, + browser_version, + platform, + source, + category, + version, + value, + component, + payment_method, + payment_experience, + toDateTime64(timestamp, 3) AS created_at, + toDateTime64(timestamp, 3) AS created_at_precise, + now() AS inserted_at +FROM + sdk_events_queue +WHERE + (length(_error) = 0) + AND (payment_id IS NOT NULL); + +CREATE TABLE active_payments ( + `payment_id` Nullable(String), + `merchant_id` String, + `created_at` DateTime64, + `flow_type` LowCardinality(Nullable(String)), + INDEX merchantIndex merchant_id TYPE bloom_filter GRANULARITY 1 +) ENGINE = MergeTree +PARTITION BY toStartOfSecond(created_at) +ORDER BY + merchant_id +TTL + toDateTime(created_at) + INTERVAL 60 SECOND +SETTINGS + index_granularity = 8192; + +CREATE MATERIALIZED VIEW sdk_active_payments_mv TO active_payments ( + `payment_id` Nullable(String), + `merchant_id` String, + `created_at` DateTime64, + `flow_type` LowCardinality(Nullable(String)) +) AS +SELECT + payment_id, + merchant_id, + toDateTime64(timestamp, 3) AS created_at, + 'sdk' AS flow_type +FROM + sdk_events_queue +WHERE length(_error) = 0; + +CREATE MATERIALIZED VIEW api_active_payments_mv TO active_payments ( + `payment_id` Nullable(String), + `merchant_id` String, + `created_at` DateTime64, + `flow_type` LowCardinality(Nullable(String)) +) AS +SELECT + payment_id, + merchant_id, + created_at_timestamp AS created_at, + flow_type +FROM + api_events_queue +WHERE length(_error) = 0; \ No newline at end of file diff --git a/crates/analytics/src/active_payments.rs b/crates/analytics/src/active_payments.rs new file mode 100644 index 0000000000..518e15b7c0 --- /dev/null +++ b/crates/analytics/src/active_payments.rs @@ -0,0 +1,6 @@ +pub mod accumulator; +mod core; +pub mod metrics; +pub use accumulator::{ActivePaymentsMetricAccumulator, ActivePaymentsMetricsAccumulator}; + +pub use self::core::get_metrics; diff --git a/crates/analytics/src/active_payments/accumulator.rs b/crates/analytics/src/active_payments/accumulator.rs new file mode 100644 index 0000000000..2576a8a308 --- /dev/null +++ b/crates/analytics/src/active_payments/accumulator.rs @@ -0,0 +1,47 @@ +use api_models::analytics::active_payments::ActivePaymentsMetricsBucketValue; + +use super::metrics::ActivePaymentsMetricRow; + +#[derive(Debug, Default)] +pub struct ActivePaymentsMetricsAccumulator { + pub active_payments: CountAccumulator, +} + +#[derive(Debug, Default)] +#[repr(transparent)] +pub struct CountAccumulator { + pub count: Option, +} + +pub trait ActivePaymentsMetricAccumulator { + type MetricOutput; + + fn add_metrics_bucket(&mut self, metrics: &ActivePaymentsMetricRow); + + fn collect(self) -> Self::MetricOutput; +} + +impl ActivePaymentsMetricAccumulator for CountAccumulator { + type MetricOutput = Option; + #[inline] + fn add_metrics_bucket(&mut self, metrics: &ActivePaymentsMetricRow) { + self.count = match (self.count, metrics.count) { + (None, None) => None, + (None, i @ Some(_)) | (i @ Some(_), None) => i, + (Some(a), Some(b)) => Some(a + b), + } + } + #[inline] + fn collect(self) -> Self::MetricOutput { + self.count.and_then(|i| u64::try_from(i).ok()) + } +} + +impl ActivePaymentsMetricsAccumulator { + #[allow(dead_code)] + pub fn collect(self) -> ActivePaymentsMetricsBucketValue { + ActivePaymentsMetricsBucketValue { + active_payments: self.active_payments.collect(), + } + } +} diff --git a/crates/analytics/src/active_payments/core.rs b/crates/analytics/src/active_payments/core.rs new file mode 100644 index 0000000000..0024581bf8 --- /dev/null +++ b/crates/analytics/src/active_payments/core.rs @@ -0,0 +1,106 @@ +use std::collections::HashMap; + +use api_models::analytics::{ + active_payments::{ + ActivePaymentsMetrics, ActivePaymentsMetricsBucketIdentifier, MetricsBucketResponse, + }, + AnalyticsMetadata, GetActivePaymentsMetricRequest, MetricsResponse, +}; +use error_stack::ResultExt; +use router_env::{instrument, logger, tracing}; + +use super::ActivePaymentsMetricsAccumulator; +use crate::{ + active_payments::ActivePaymentsMetricAccumulator, + errors::{AnalyticsError, AnalyticsResult}, + AnalyticsProvider, +}; + +#[instrument(skip_all)] +pub async fn get_metrics( + pool: &AnalyticsProvider, + publishable_key: Option<&String>, + merchant_id: Option<&String>, + req: GetActivePaymentsMetricRequest, +) -> AnalyticsResult> { + let mut metrics_accumulator: HashMap< + ActivePaymentsMetricsBucketIdentifier, + ActivePaymentsMetricsAccumulator, + > = HashMap::new(); + + if let Some(publishable_key) = publishable_key { + if let Some(merchant_id) = merchant_id { + let mut set = tokio::task::JoinSet::new(); + for metric_type in req.metrics.iter().cloned() { + let publishable_key_scoped = publishable_key.to_owned(); + let merchant_id_scoped = merchant_id.to_owned(); + let pool = pool.clone(); + set.spawn(async move { + let data = pool + .get_active_payments_metrics( + &metric_type, + &merchant_id_scoped, + &publishable_key_scoped, + ) + .await + .change_context(AnalyticsError::UnknownError); + (metric_type, data) + }); + } + + while let Some((metric, data)) = set + .join_next() + .await + .transpose() + .change_context(AnalyticsError::UnknownError)? + { + logger::info!("Logging metric: {metric} Result: {:?}", data); + for (id, value) in data? { + let metrics_builder = metrics_accumulator.entry(id).or_default(); + match metric { + ActivePaymentsMetrics::ActivePayments => { + metrics_builder.active_payments.add_metrics_bucket(&value) + } + } + } + + logger::debug!( + "Analytics Accumulated Results: metric: {}, results: {:#?}", + metric, + metrics_accumulator + ); + } + + let query_data: Vec = metrics_accumulator + .into_iter() + .map(|(id, val)| MetricsBucketResponse { + values: val.collect(), + dimensions: id, + }) + .collect(); + + Ok(MetricsResponse { + query_data, + meta_data: [AnalyticsMetadata { + current_time_range: req.time_range, + }], + }) + } else { + logger::error!("Merchant ID not present"); + Ok(MetricsResponse { + query_data: vec![], + meta_data: [AnalyticsMetadata { + current_time_range: req.time_range, + }], + }) + } + } else { + logger::error!("Publishable key not present for merchant ID"); + Ok(MetricsResponse { + query_data: vec![], + meta_data: [AnalyticsMetadata { + current_time_range: req.time_range, + }], + }) + } +} diff --git a/crates/analytics/src/active_payments/metrics.rs b/crates/analytics/src/active_payments/metrics.rs new file mode 100644 index 0000000000..43a0b229c8 --- /dev/null +++ b/crates/analytics/src/active_payments/metrics.rs @@ -0,0 +1,70 @@ +use api_models::analytics::{ + active_payments::{ActivePaymentsMetrics, ActivePaymentsMetricsBucketIdentifier}, + Granularity, +}; +use time::PrimitiveDateTime; + +use crate::{ + query::{Aggregate, GroupByClause, ToSql, Window}, + types::{AnalyticsCollection, AnalyticsDataSource, LoadRow, MetricsResult}, +}; + +mod active_payments; + +use active_payments::ActivePayments; + +#[derive(Debug, PartialEq, Eq, serde::Deserialize)] +pub struct ActivePaymentsMetricRow { + pub count: Option, +} + +pub trait ActivePaymentsMetricAnalytics: LoadRow {} + +#[async_trait::async_trait] +pub trait ActivePaymentsMetric +where + T: AnalyticsDataSource + ActivePaymentsMetricAnalytics, +{ + async fn load_metrics( + &self, + merchant_id: &str, + publishable_key: &str, + pool: &T, + ) -> MetricsResult< + Vec<( + ActivePaymentsMetricsBucketIdentifier, + ActivePaymentsMetricRow, + )>, + >; +} + +#[async_trait::async_trait] +impl ActivePaymentsMetric for ActivePaymentsMetrics +where + T: AnalyticsDataSource + ActivePaymentsMetricAnalytics, + PrimitiveDateTime: ToSql, + AnalyticsCollection: ToSql, + Granularity: GroupByClause, + Aggregate<&'static str>: ToSql, + Window<&'static str>: ToSql, +{ + async fn load_metrics( + &self, + merchant_id: &str, + publishable_key: &str, + pool: &T, + ) -> MetricsResult< + Vec<( + ActivePaymentsMetricsBucketIdentifier, + ActivePaymentsMetricRow, + )>, + > { + match self { + Self::ActivePayments => { + ActivePayments + .load_metrics(publishable_key, merchant_id, pool) + .await + } + } + } +} diff --git a/crates/analytics/src/active_payments/metrics/active_payments.rs b/crates/analytics/src/active_payments/metrics/active_payments.rs new file mode 100644 index 0000000000..97b3e69d1c --- /dev/null +++ b/crates/analytics/src/active_payments/metrics/active_payments.rs @@ -0,0 +1,70 @@ +use api_models::analytics::{active_payments::ActivePaymentsMetricsBucketIdentifier, Granularity}; +use common_utils::errors::ReportSwitchExt; +use error_stack::ResultExt; +use time::PrimitiveDateTime; + +use super::ActivePaymentsMetricRow; +use crate::{ + query::{Aggregate, FilterTypes, GroupByClause, QueryBuilder, ToSql, Window}, + types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult}, +}; + +#[derive(Default)] +pub(super) struct ActivePayments; + +#[async_trait::async_trait] +impl super::ActivePaymentsMetric for ActivePayments +where + T: AnalyticsDataSource + super::ActivePaymentsMetricAnalytics, + PrimitiveDateTime: ToSql, + AnalyticsCollection: ToSql, + Granularity: GroupByClause, + Aggregate<&'static str>: ToSql, + Window<&'static str>: ToSql, +{ + async fn load_metrics( + &self, + merchant_id: &str, + publishable_key: &str, + pool: &T, + ) -> MetricsResult< + Vec<( + ActivePaymentsMetricsBucketIdentifier, + ActivePaymentsMetricRow, + )>, + > { + let mut query_builder: QueryBuilder = + QueryBuilder::new(AnalyticsCollection::ActivePaymentsAnalytics); + + query_builder + .add_select_column(Aggregate::DistinctCount { + field: "payment_id", + alias: Some("count"), + }) + .switch()?; + + query_builder + .add_custom_filter_clause( + "merchant_id", + format!("'{}','{}'", merchant_id, publishable_key), + FilterTypes::In, + ) + .switch()?; + + query_builder + .execute_query::(pool) + .await + .change_context(MetricsError::QueryBuildingError)? + .change_context(MetricsError::QueryExecutionFailure)? + .into_iter() + .map(|i| Ok((ActivePaymentsMetricsBucketIdentifier::new(None), i))) + .collect::, + crate::query::PostProcessingError, + >>() + .change_context(MetricsError::PostProcessingFailure) + } +} diff --git a/crates/analytics/src/auth_events/metrics/authentication_attempt_count.rs b/crates/analytics/src/auth_events/metrics/authentication_attempt_count.rs index dd65458b1e..0111085880 100644 --- a/crates/analytics/src/auth_events/metrics/authentication_attempt_count.rs +++ b/crates/analytics/src/auth_events/metrics/authentication_attempt_count.rs @@ -33,7 +33,8 @@ where time_range: &TimeRange, pool: &T, ) -> MetricsResult> { - let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::SdkEvents); + let mut query_builder: QueryBuilder = + QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics); query_builder .add_select_column(Aggregate::Count { diff --git a/crates/analytics/src/auth_events/metrics/authentication_success_count.rs b/crates/analytics/src/auth_events/metrics/authentication_success_count.rs index 7559abe8e2..6f0580bb0b 100644 --- a/crates/analytics/src/auth_events/metrics/authentication_success_count.rs +++ b/crates/analytics/src/auth_events/metrics/authentication_success_count.rs @@ -33,7 +33,8 @@ where time_range: &TimeRange, pool: &T, ) -> MetricsResult> { - let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::SdkEvents); + let mut query_builder: QueryBuilder = + QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics); query_builder .add_select_column(Aggregate::Count { diff --git a/crates/analytics/src/auth_events/metrics/challenge_flow_count.rs b/crates/analytics/src/auth_events/metrics/challenge_flow_count.rs index 51a3b8ed88..49fef8941f 100644 --- a/crates/analytics/src/auth_events/metrics/challenge_flow_count.rs +++ b/crates/analytics/src/auth_events/metrics/challenge_flow_count.rs @@ -33,7 +33,8 @@ where time_range: &TimeRange, pool: &T, ) -> MetricsResult> { - let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::SdkEvents); + let mut query_builder: QueryBuilder = + QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics); query_builder .add_select_column(Aggregate::Count { diff --git a/crates/analytics/src/auth_events/metrics/frictionless_flow_count.rs b/crates/analytics/src/auth_events/metrics/frictionless_flow_count.rs index b5760c9dfa..7ae9d77367 100644 --- a/crates/analytics/src/auth_events/metrics/frictionless_flow_count.rs +++ b/crates/analytics/src/auth_events/metrics/frictionless_flow_count.rs @@ -33,7 +33,8 @@ where time_range: &TimeRange, pool: &T, ) -> MetricsResult> { - let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::SdkEvents); + let mut query_builder: QueryBuilder = + QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics); query_builder .add_select_column(Aggregate::Count { diff --git a/crates/analytics/src/auth_events/metrics/three_ds_sdk_count.rs b/crates/analytics/src/auth_events/metrics/three_ds_sdk_count.rs index be9d7ab0dc..b08d48b550 100644 --- a/crates/analytics/src/auth_events/metrics/three_ds_sdk_count.rs +++ b/crates/analytics/src/auth_events/metrics/three_ds_sdk_count.rs @@ -33,7 +33,8 @@ where time_range: &TimeRange, pool: &T, ) -> MetricsResult> { - let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::SdkEvents); + let mut query_builder: QueryBuilder = + QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics); query_builder .add_select_column(Aggregate::Count { diff --git a/crates/analytics/src/clickhouse.rs b/crates/analytics/src/clickhouse.rs index 47294c108f..ab47397b8a 100644 --- a/crates/analytics/src/clickhouse.rs +++ b/crates/analytics/src/clickhouse.rs @@ -7,6 +7,7 @@ use router_env::logger; use time::PrimitiveDateTime; use super::{ + active_payments::metrics::ActivePaymentsMetricRow, auth_events::metrics::AuthEventMetricRow, health_check::HealthCheck, payments::{ @@ -133,10 +134,12 @@ impl AnalyticsDataSource for ClickhouseClient { TableEngine::CollapsingMergeTree { sign: "sign_flag" } } AnalyticsCollection::SdkEvents + | AnalyticsCollection::SdkEventsAnalytics | AnalyticsCollection::ApiEvents | AnalyticsCollection::ConnectorEvents | AnalyticsCollection::ApiEventsAnalytics - | AnalyticsCollection::OutgoingWebhookEvent => TableEngine::BasicTree, + | AnalyticsCollection::OutgoingWebhookEvent + | AnalyticsCollection::ActivePaymentsAnalytics => TableEngine::BasicTree, } } } @@ -159,6 +162,7 @@ impl super::refunds::filters::RefundFilterAnalytics for ClickhouseClient {} impl super::sdk_events::filters::SdkEventFilterAnalytics for ClickhouseClient {} impl super::sdk_events::metrics::SdkEventMetricAnalytics for ClickhouseClient {} impl super::sdk_events::events::SdkEventsFilterAnalytics for ClickhouseClient {} +impl super::active_payments::metrics::ActivePaymentsMetricAnalytics for ClickhouseClient {} impl super::auth_events::metrics::AuthEventMetricAnalytics for ClickhouseClient {} impl super::api_event::events::ApiLogsFilterAnalytics for ClickhouseClient {} impl super::api_event::filters::ApiEventFilterAnalytics for ClickhouseClient {} @@ -353,6 +357,16 @@ impl TryInto for serde_json::Value { } } +impl TryInto for serde_json::Value { + type Error = Report; + + fn try_into(self) -> Result { + serde_json::from_value(self).change_context(ParsingError::StructParseFailure( + "Failed to parse ActivePaymentsMetricRow in clickhouse results", + )) + } +} + impl ToSql for PrimitiveDateTime { fn to_sql(&self, _table_engine: &TableEngine) -> error_stack::Result { let format = @@ -373,12 +387,14 @@ impl ToSql for AnalyticsCollection { Self::Payment => Ok("payment_attempts".to_string()), Self::Refund => Ok("refunds".to_string()), Self::SdkEvents => Ok("sdk_events_audit".to_string()), + Self::SdkEventsAnalytics => Ok("sdk_events".to_string()), Self::ApiEvents => Ok("api_events_audit".to_string()), Self::ApiEventsAnalytics => Ok("api_events".to_string()), Self::PaymentIntent => Ok("payment_intents".to_string()), Self::ConnectorEvents => Ok("connector_events_audit".to_string()), Self::OutgoingWebhookEvent => Ok("outgoing_webhook_events_audit".to_string()), Self::Dispute => Ok("dispute".to_string()), + Self::ActivePaymentsAnalytics => Ok("active_payments".to_string()), } } } @@ -451,6 +467,15 @@ where alias.map_or_else(|| "".to_owned(), |alias| format!(" as {}", alias)) ) } + Self::DistinctCount { field, alias } => { + format!( + "count(distinct {}){}", + field + .to_sql(table_engine) + .attach_printable("Failed to percentile aggregate")?, + alias.map_or_else(|| "".to_owned(), |alias| format!(" as {}", alias)) + ) + } }) } } diff --git a/crates/analytics/src/lib.rs b/crates/analytics/src/lib.rs index 5c269a4bb1..d3db03a697 100644 --- a/crates/analytics/src/lib.rs +++ b/crates/analytics/src/lib.rs @@ -7,6 +7,7 @@ pub mod payments; mod query; pub mod refunds; +pub mod active_payments; pub mod api_event; pub mod auth_events; pub mod connector_events; @@ -32,6 +33,7 @@ pub mod utils; use std::sync::Arc; use api_models::analytics::{ + active_payments::{ActivePaymentsMetrics, ActivePaymentsMetricsBucketIdentifier}, api_event::{ ApiEventDimensions, ApiEventFilters, ApiEventMetrics, ApiEventMetricsBucketIdentifier, }, @@ -56,6 +58,7 @@ use storage_impl::config::Database; use strum::Display; use self::{ + active_payments::metrics::{ActivePaymentsMetric, ActivePaymentsMetricRow}, auth_events::metrics::{AuthEventMetric, AuthEventMetricRow}, payments::{ distribution::{PaymentDistribution, PaymentDistributionRow}, @@ -514,7 +517,7 @@ impl AnalyticsProvider { &self, metric: &SdkEventMetrics, dimensions: &[SdkEventDimensions], - pub_key: &str, + publishable_key: &str, filters: &SdkEventFilters, granularity: &Option, time_range: &TimeRange, @@ -523,14 +526,21 @@ impl AnalyticsProvider { Self::Sqlx(_pool) => Err(report!(MetricsError::NotImplemented)), Self::Clickhouse(pool) => { metric - .load_metrics(dimensions, pub_key, filters, granularity, time_range, pool) + .load_metrics( + dimensions, + publishable_key, + filters, + granularity, + time_range, + pool, + ) .await } Self::CombinedCkh(_sqlx_pool, ckh_pool) | Self::CombinedSqlx(_sqlx_pool, ckh_pool) => { metric .load_metrics( dimensions, - pub_key, + publishable_key, filters, granularity, // Since SDK events are ckh only use ckh here @@ -542,6 +552,32 @@ impl AnalyticsProvider { } } + pub async fn get_active_payments_metrics( + &self, + metric: &ActivePaymentsMetrics, + merchant_id: &str, + publishable_key: &str, + ) -> types::MetricsResult< + Vec<( + ActivePaymentsMetricsBucketIdentifier, + ActivePaymentsMetricRow, + )>, + > { + match self { + Self::Sqlx(_pool) => Err(report!(MetricsError::NotImplemented)), + Self::Clickhouse(pool) => { + metric + .load_metrics(merchant_id, publishable_key, pool) + .await + } + Self::CombinedCkh(_sqlx_pool, ckh_pool) | Self::CombinedSqlx(_sqlx_pool, ckh_pool) => { + metric + .load_metrics(merchant_id, publishable_key, ckh_pool) + .await + } + } + } + pub async fn get_auth_event_metrics( &self, metric: &AuthEventMetrics, @@ -723,6 +759,7 @@ pub enum AnalyticsFlow { GetRefundsMetrics, GetSdkMetrics, GetAuthMetrics, + GetActivePaymentsMetrics, GetPaymentFilters, GetRefundFilters, GetSdkEventFilters, diff --git a/crates/analytics/src/query.rs b/crates/analytics/src/query.rs index 1ab79f07e4..2fda8fc57c 100644 --- a/crates/analytics/src/query.rs +++ b/crates/analytics/src/query.rs @@ -253,6 +253,10 @@ pub enum Aggregate { alias: Option<&'static str>, percentile: Option<&'static u8>, }, + DistinctCount { + field: R, + alias: Option<&'static str>, + }, } // Window functions in query diff --git a/crates/analytics/src/sdk_events/filters.rs b/crates/analytics/src/sdk_events/filters.rs index 9963f51ef9..367db62b18 100644 --- a/crates/analytics/src/sdk_events/filters.rs +++ b/crates/analytics/src/sdk_events/filters.rs @@ -24,7 +24,8 @@ where Aggregate<&'static str>: ToSql, Window<&'static str>: ToSql, { - let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::SdkEvents); + let mut query_builder: QueryBuilder = + QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics); query_builder.add_select_column(dimension).switch()?; time_range diff --git a/crates/analytics/src/sdk_events/metrics/average_payment_time.rs b/crates/analytics/src/sdk_events/metrics/average_payment_time.rs index a81a55a2af..88c96d33ef 100644 --- a/crates/analytics/src/sdk_events/metrics/average_payment_time.rs +++ b/crates/analytics/src/sdk_events/metrics/average_payment_time.rs @@ -36,7 +36,8 @@ where time_range: &TimeRange, pool: &T, ) -> MetricsResult> { - let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::SdkEvents); + let mut query_builder: QueryBuilder = + QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics); let dimensions = dimensions.to_vec(); for dim in dimensions.iter() { diff --git a/crates/analytics/src/sdk_events/metrics/load_time.rs b/crates/analytics/src/sdk_events/metrics/load_time.rs index 07168069b5..4624f30429 100644 --- a/crates/analytics/src/sdk_events/metrics/load_time.rs +++ b/crates/analytics/src/sdk_events/metrics/load_time.rs @@ -36,7 +36,8 @@ where time_range: &TimeRange, pool: &T, ) -> MetricsResult> { - let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::SdkEvents); + let mut query_builder: QueryBuilder = + QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics); let dimensions = dimensions.to_vec(); for dim in dimensions.iter() { diff --git a/crates/analytics/src/sdk_events/metrics/payment_attempts.rs b/crates/analytics/src/sdk_events/metrics/payment_attempts.rs index b2a78188c4..7cbe93056a 100644 --- a/crates/analytics/src/sdk_events/metrics/payment_attempts.rs +++ b/crates/analytics/src/sdk_events/metrics/payment_attempts.rs @@ -36,7 +36,8 @@ where time_range: &TimeRange, pool: &T, ) -> MetricsResult> { - let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::SdkEvents); + let mut query_builder: QueryBuilder = + QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics); let dimensions = dimensions.to_vec(); for dim in dimensions.iter() { diff --git a/crates/analytics/src/sdk_events/metrics/payment_data_filled_count.rs b/crates/analytics/src/sdk_events/metrics/payment_data_filled_count.rs index a3c94baeda..aff78808d1 100644 --- a/crates/analytics/src/sdk_events/metrics/payment_data_filled_count.rs +++ b/crates/analytics/src/sdk_events/metrics/payment_data_filled_count.rs @@ -36,7 +36,8 @@ where time_range: &TimeRange, pool: &T, ) -> MetricsResult> { - let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::SdkEvents); + let mut query_builder: QueryBuilder = + QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics); let dimensions = dimensions.to_vec(); for dim in dimensions.iter() { diff --git a/crates/analytics/src/sdk_events/metrics/payment_method_selected_count.rs b/crates/analytics/src/sdk_events/metrics/payment_method_selected_count.rs index 11aeac5e6f..284519441a 100644 --- a/crates/analytics/src/sdk_events/metrics/payment_method_selected_count.rs +++ b/crates/analytics/src/sdk_events/metrics/payment_method_selected_count.rs @@ -36,7 +36,8 @@ where time_range: &TimeRange, pool: &T, ) -> MetricsResult> { - let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::SdkEvents); + let mut query_builder: QueryBuilder = + QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics); let dimensions = dimensions.to_vec(); for dim in dimensions.iter() { diff --git a/crates/analytics/src/sdk_events/metrics/payment_methods_call_count.rs b/crates/analytics/src/sdk_events/metrics/payment_methods_call_count.rs index 7570f1292e..e7dc650403 100644 --- a/crates/analytics/src/sdk_events/metrics/payment_methods_call_count.rs +++ b/crates/analytics/src/sdk_events/metrics/payment_methods_call_count.rs @@ -36,7 +36,8 @@ where time_range: &TimeRange, pool: &T, ) -> MetricsResult> { - let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::SdkEvents); + let mut query_builder: QueryBuilder = + QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics); let dimensions = dimensions.to_vec(); for dim in dimensions.iter() { diff --git a/crates/analytics/src/sdk_events/metrics/sdk_initiated_count.rs b/crates/analytics/src/sdk_events/metrics/sdk_initiated_count.rs index ecbca81acf..82c2096a2e 100644 --- a/crates/analytics/src/sdk_events/metrics/sdk_initiated_count.rs +++ b/crates/analytics/src/sdk_events/metrics/sdk_initiated_count.rs @@ -36,7 +36,8 @@ where time_range: &TimeRange, pool: &T, ) -> MetricsResult> { - let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::SdkEvents); + let mut query_builder: QueryBuilder = + QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics); let dimensions = dimensions.to_vec(); for dim in dimensions.iter() { diff --git a/crates/analytics/src/sdk_events/metrics/sdk_rendered_count.rs b/crates/analytics/src/sdk_events/metrics/sdk_rendered_count.rs index ed9e776423..eba94f0d9a 100644 --- a/crates/analytics/src/sdk_events/metrics/sdk_rendered_count.rs +++ b/crates/analytics/src/sdk_events/metrics/sdk_rendered_count.rs @@ -36,7 +36,8 @@ where time_range: &TimeRange, pool: &T, ) -> MetricsResult> { - let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::SdkEvents); + let mut query_builder: QueryBuilder = + QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics); let dimensions = dimensions.to_vec(); for dim in dimensions.iter() { diff --git a/crates/analytics/src/sqlx.rs b/crates/analytics/src/sqlx.rs index 3a1b7f2d46..76ad9c254b 100644 --- a/crates/analytics/src/sqlx.rs +++ b/crates/analytics/src/sqlx.rs @@ -542,6 +542,8 @@ impl ToSql for AnalyticsCollection { Self::Payment => Ok("payment_attempt".to_string()), Self::Refund => Ok("refund".to_string()), Self::SdkEvents => Err(error_stack::report!(ParsingError::UnknownError) + .attach_printable("SdkEventsAudit table is not implemented for Sqlx"))?, + Self::SdkEventsAnalytics => Err(error_stack::report!(ParsingError::UnknownError) .attach_printable("SdkEvents table is not implemented for Sqlx"))?, Self::ApiEvents => Err(error_stack::report!(ParsingError::UnknownError) .attach_printable("ApiEvents table is not implemented for Sqlx"))?, @@ -550,6 +552,8 @@ impl ToSql for AnalyticsCollection { .attach_printable("ConnectorEvents table is not implemented for Sqlx"))?, Self::ApiEventsAnalytics => Err(error_stack::report!(ParsingError::UnknownError) .attach_printable("ApiEvents table is not implemented for Sqlx"))?, + Self::ActivePaymentsAnalytics => Err(error_stack::report!(ParsingError::UnknownError) + .attach_printable("ActivePaymentsAnalytics table is not implemented for Sqlx"))?, Self::OutgoingWebhookEvent => Err(error_stack::report!(ParsingError::UnknownError) .attach_printable("OutgoingWebhookEvents table is not implemented for Sqlx"))?, Self::Dispute => Ok("dispute".to_string()), @@ -610,6 +614,15 @@ where alias.map_or_else(|| "".to_owned(), |alias| format!(" as {}", alias)) ) } + Self::DistinctCount { field, alias } => { + format!( + "count(distinct {}){}", + field + .to_sql(table_engine) + .attach_printable("Failed to distinct count aggregate")?, + alias.map_or_else(|| "".to_owned(), |alias| format!(" as {}", alias)) + ) + } }) } } diff --git a/crates/analytics/src/types.rs b/crates/analytics/src/types.rs index 86a9ec86ef..5370fbc25a 100644 --- a/crates/analytics/src/types.rs +++ b/crates/analytics/src/types.rs @@ -26,12 +26,14 @@ pub enum AnalyticsCollection { Payment, Refund, SdkEvents, + SdkEventsAnalytics, ApiEvents, PaymentIntent, ConnectorEvents, OutgoingWebhookEvent, Dispute, ApiEventsAnalytics, + ActivePaymentsAnalytics, } #[allow(dead_code)] diff --git a/crates/api_models/src/analytics.rs b/crates/api_models/src/analytics.rs index d697c8ab27..491420cfc0 100644 --- a/crates/api_models/src/analytics.rs +++ b/crates/api_models/src/analytics.rs @@ -4,6 +4,7 @@ use common_utils::pii::EmailStrategy; use masking::Secret; use self::{ + active_payments::ActivePaymentsMetrics, api_event::{ApiEventDimensions, ApiEventMetrics}, auth_events::AuthEventMetrics, disputes::{DisputeDimensions, DisputeMetrics}, @@ -13,6 +14,7 @@ use self::{ }; pub use crate::payments::TimeRange; +pub mod active_payments; pub mod api_event; pub mod auth_events; pub mod connector_events; @@ -151,6 +153,14 @@ pub struct GetAuthEventMetricRequest { pub delta: bool, } +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct GetActivePaymentsMetricRequest { + #[serde(default)] + pub metrics: HashSet, + pub time_range: TimeRange, +} + #[derive(Debug, serde::Serialize)] pub struct AnalyticsMetadata { pub current_time_range: TimeRange, diff --git a/crates/api_models/src/analytics/active_payments.rs b/crates/api_models/src/analytics/active_payments.rs new file mode 100644 index 0000000000..e7cb2553e0 --- /dev/null +++ b/crates/api_models/src/analytics/active_payments.rs @@ -0,0 +1,77 @@ +use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, +}; + +use super::NameDescription; + +#[derive( + Clone, + Debug, + Hash, + PartialEq, + Eq, + serde::Serialize, + serde::Deserialize, + strum::Display, + strum::EnumIter, + strum::AsRefStr, +)] +#[strum(serialize_all = "snake_case")] +#[serde(rename_all = "snake_case")] +pub enum ActivePaymentsMetrics { + ActivePayments, +} + +pub mod metric_behaviour { + pub struct ActivePayments; +} + +impl From for NameDescription { + fn from(value: ActivePaymentsMetrics) -> Self { + Self { + name: value.to_string(), + desc: String::new(), + } + } +} + +#[derive(Debug, serde::Serialize, Eq)] +pub struct ActivePaymentsMetricsBucketIdentifier { + pub time_bucket: Option, +} + +impl ActivePaymentsMetricsBucketIdentifier { + pub fn new(time_bucket: Option) -> Self { + Self { time_bucket } + } +} + +impl Hash for ActivePaymentsMetricsBucketIdentifier { + fn hash(&self, state: &mut H) { + self.time_bucket.hash(state); + } +} + +impl PartialEq for ActivePaymentsMetricsBucketIdentifier { + fn eq(&self, other: &Self) -> bool { + let mut left = DefaultHasher::new(); + self.hash(&mut left); + let mut right = DefaultHasher::new(); + other.hash(&mut right); + left.finish() == right.finish() + } +} + +#[derive(Debug, serde::Serialize)] +pub struct ActivePaymentsMetricsBucketValue { + pub active_payments: Option, +} + +#[derive(Debug, serde::Serialize)] +pub struct MetricsBucketResponse { + #[serde(flatten)] + pub values: ActivePaymentsMetricsBucketValue, + #[serde(flatten)] + pub dimensions: ActivePaymentsMetricsBucketIdentifier, +} diff --git a/crates/api_models/src/events.rs b/crates/api_models/src/events.rs index 078c27e6db..bed46f01f1 100644 --- a/crates/api_models/src/events.rs +++ b/crates/api_models/src/events.rs @@ -86,6 +86,7 @@ impl_misc_api_event_type!( GetInfoResponse, GetPaymentMetricRequest, GetRefundMetricRequest, + GetActivePaymentsMetricRequest, GetSdkEventMetricRequest, GetAuthEventMetricRequest, GetPaymentFiltersRequest, diff --git a/crates/router/src/analytics.rs b/crates/router/src/analytics.rs index b471448368..80b4e18807 100644 --- a/crates/router/src/analytics.rs +++ b/crates/router/src/analytics.rs @@ -12,10 +12,10 @@ pub mod routes { search::{ GetGlobalSearchRequest, GetSearchRequest, GetSearchRequestWithIndex, SearchIndex, }, - GenerateReportRequest, GetApiEventFiltersRequest, GetApiEventMetricRequest, - GetAuthEventMetricRequest, GetDisputeMetricRequest, GetPaymentFiltersRequest, - GetPaymentMetricRequest, GetRefundFilterRequest, GetRefundMetricRequest, - GetSdkEventFiltersRequest, GetSdkEventMetricRequest, ReportRequest, + GenerateReportRequest, GetActivePaymentsMetricRequest, GetApiEventFiltersRequest, + GetApiEventMetricRequest, GetAuthEventMetricRequest, GetDisputeMetricRequest, + GetPaymentFiltersRequest, GetPaymentMetricRequest, GetRefundFilterRequest, + GetRefundMetricRequest, GetSdkEventFiltersRequest, GetSdkEventMetricRequest, ReportRequest, }; use error_stack::ResultExt; @@ -70,6 +70,10 @@ pub mod routes { web::resource("metrics/sdk_events") .route(web::post().to(get_sdk_event_metrics)), ) + .service( + web::resource("metrics/active_payments") + .route(web::post().to(get_active_payments_metrics)), + ) .service( web::resource("filters/sdk_events") .route(web::post().to(get_sdk_event_filters)), @@ -245,6 +249,43 @@ pub mod routes { .await } + /// # Panics + /// + /// Panics if `json_payload` array does not contain one `GetActivePaymentsMetricRequest` element. + pub async fn get_active_payments_metrics( + state: web::Data, + req: actix_web::HttpRequest, + json_payload: web::Json<[GetActivePaymentsMetricRequest; 1]>, + ) -> impl Responder { + // safety: This shouldn't panic owing to the data type + #[allow(clippy::expect_used)] + let payload = json_payload + .into_inner() + .to_vec() + .pop() + .expect("Couldn't get GetActivePaymentsMetricRequest"); + let flow = AnalyticsFlow::GetActivePaymentsMetrics; + Box::pin(api::server_wrap( + flow, + state, + &req, + payload, + |state, auth: AuthenticationData, req, _| async move { + analytics::active_payments::get_metrics( + &state.pool, + auth.merchant_account.publishable_key.as_ref(), + Some(&auth.merchant_account.merchant_id), + req, + ) + .await + .map(ApplicationResponse::Json) + }, + &auth::JWTAuth(Permission::Analytics), + api_locking::LockAction::NotApplicable, + )) + .await + } + /// # Panics /// /// Panics if `json_payload` array does not contain one `GetAuthEventMetricRequest` element. diff --git a/docker-compose.yml b/docker-compose.yml index aff7ab54f4..569ef04506 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -164,8 +164,8 @@ services: - HYPERSWITCH_CLIENT_URL=http://localhost:9050 - SELF_SERVER_URL=http://localhost:5252 - SDK_ENV=local - - ENV_LOGGING_URL=http://localhost:3100 - labels: + - ENV_LOGGING_URL=http://localhost:3103 + labels: logs: "promtail" ### Control Center @@ -405,6 +405,7 @@ services: ports: - "8686" - "9598" + - "3103:3103" profiles: - olap environment: