feat: realtime user analytics (#5098)

Co-authored-by: Sampras Lopes <Sampras.lopes@juspay.in>
This commit is contained in:
Vrishab Srivatsa
2024-06-26 17:23:39 +05:30
committed by GitHub
parent 86f4060d33
commit cd5a1a34c5
32 changed files with 826 additions and 26 deletions

View File

@ -25,8 +25,8 @@ test_processors=true
feedback=false
mixpanel=false
generate_report=false
user_journey_analytics=false
authentication_analytics=false
user_journey_analytics=true
authentication_analytics=true
surcharge=false
dispute_evidence_upload=false
paypal_automatic_flow=false

View File

@ -29,6 +29,11 @@ sources:
node_metrics:
type: host_metrics
sdk_source:
type: http_server
address: 0.0.0.0:3103
encoding: json
transforms:
plus_1_events:
type: filter
@ -56,6 +61,14 @@ transforms:
source: |-
.timestamp = from_unix_timestamp!(.created_at, unit: "seconds")
sdk_transformed:
type: throttle
inputs:
- sdk_source
key_field: "{{ .payment_id }}{{ .merchant_id }}"
threshold: 1000
window_secs: 60
sinks:
opensearch_events:
type: elasticsearch
@ -132,3 +145,16 @@ sinks:
inputs:
- vector_metrics
- node_metrics
sdk_sink:
type: kafka
encoding:
codec: json
except_fields:
- "path"
- "source_type"
inputs:
- "sdk_transformed"
bootstrap_servers: kafka0:29092
topic: hyper-sdk-logs
key_field: ".merchant_id"

View File

@ -0,0 +1,250 @@
CREATE TABLE sdk_events_queue (
`payment_id` Nullable(String),
`merchant_id` String,
`remote_ip` Nullable(String),
`log_type` LowCardinality(Nullable(String)),
`event_name` LowCardinality(Nullable(String)),
`first_event` LowCardinality(Nullable(String)),
`latency` Nullable(UInt32),
`timestamp` String,
`browser_name` LowCardinality(Nullable(String)),
`browser_version` Nullable(String),
`platform` LowCardinality(Nullable(String)),
`source` LowCardinality(Nullable(String)),
`category` LowCardinality(Nullable(String)),
`version` LowCardinality(Nullable(String)),
`value` Nullable(String),
`component` LowCardinality(Nullable(String)),
`payment_method` LowCardinality(Nullable(String)),
`payment_experience` LowCardinality(Nullable(String))
) ENGINE = Kafka SETTINGS
kafka_broker_list = 'kafka0:29092',
kafka_topic_list = 'hyper-sdk-logs',
kafka_group_name = 'hyper-ckh',
kafka_format = 'JSONEachRow',
kafka_handle_error_mode = 'stream';
CREATE TABLE sdk_events (
`payment_id` Nullable(String),
`merchant_id` String,
`remote_ip` Nullable(String),
`log_type` LowCardinality(Nullable(String)),
`event_name` LowCardinality(Nullable(String)),
`first_event` Bool DEFAULT 1,
`browser_name` LowCardinality(Nullable(String)),
`browser_version` Nullable(String),
`platform` LowCardinality(Nullable(String)),
`source` LowCardinality(Nullable(String)),
`category` LowCardinality(Nullable(String)),
`version` LowCardinality(Nullable(String)),
`component` LowCardinality(Nullable(String)),
`payment_method` LowCardinality(Nullable(String)),
`payment_experience` LowCardinality(Nullable(String)) DEFAULT '',
`created_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`latency` Nullable(UInt32) DEFAULT 0,
`value` Nullable(String),
`created_at_precise` DateTime64(3),
INDEX paymentMethodIndex payment_method TYPE bloom_filter GRANULARITY 1,
INDEX eventIndex event_name TYPE bloom_filter GRANULARITY 1,
INDEX platformIndex platform TYPE bloom_filter GRANULARITY 1,
INDEX logTypeIndex log_type TYPE bloom_filter GRANULARITY 1,
INDEX categoryIndex category TYPE bloom_filter GRANULARITY 1,
INDEX sourceIndex source TYPE bloom_filter GRANULARITY 1,
INDEX componentIndex component TYPE bloom_filter GRANULARITY 1,
INDEX firstEventIndex first_event TYPE bloom_filter GRANULARITY 1
) ENGINE = MergeTree
PARTITION BY
toStartOfDay(created_at)
ORDER BY
(created_at, merchant_id)
TTL
toDateTime(created_at) + toIntervalMonth(6)
SETTINGS
index_granularity = 8192
;
CREATE MATERIALIZED VIEW sdk_events_mv TO sdk_events (
`payment_id` Nullable(String),
`merchant_id` String,
`remote_ip` Nullable(String),
`log_type` LowCardinality(Nullable(String)),
`event_name` LowCardinality(Nullable(String)),
`first_event` Bool,
`latency` Nullable(UInt32),
`browser_name` LowCardinality(Nullable(String)),
`browser_version` Nullable(String),
`platform` LowCardinality(Nullable(String)),
`source` LowCardinality(Nullable(String)),
`category` LowCardinality(Nullable(String)),
`version` LowCardinality(Nullable(String)),
`value` Nullable(String),
`component` LowCardinality(Nullable(String)),
`payment_method` LowCardinality(Nullable(String)),
`payment_experience` LowCardinality(Nullable(String)),
`created_at` DateTime64(3),
`created_at_precise` DateTime64(3)
) AS
SELECT
payment_id,
merchant_id,
remote_ip,
log_type,
event_name,
multiIf(first_event = 'true', 1, 0) AS first_event,
latency,
browser_name,
browser_version,
platform,
source,
category,
version,
value,
component,
payment_method,
payment_experience,
toDateTime64(timestamp, 3) AS created_at,
toDateTime64(timestamp, 3) AS created_at_precise
FROM
sdk_events_queue
WHERE length(_error) = 0;
CREATE TABLE sdk_events_audit (
`payment_id` String,
`merchant_id` String,
`remote_ip` Nullable(String),
`log_type` LowCardinality(Nullable(String)),
`event_name` LowCardinality(Nullable(String)),
`first_event` Bool DEFAULT 1,
`browser_name` LowCardinality(Nullable(String)),
`browser_version` Nullable(String),
`platform` LowCardinality(Nullable(String)),
`source` LowCardinality(Nullable(String)),
`category` LowCardinality(Nullable(String)),
`version` LowCardinality(Nullable(String)),
`value` Nullable(String),
`component` LowCardinality(Nullable(String)),
`payment_method` LowCardinality(Nullable(String)),
`payment_experience` LowCardinality(Nullable(String)) DEFAULT '',
`created_at` DateTime64(3) DEFAULT now64() CODEC(T64, LZ4),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`latency` Nullable(UInt32) DEFAULT 0
) ENGINE = MergeTree PARTITION BY merchant_id
ORDER BY
(merchant_id, payment_id)
TTL inserted_at + toIntervalMonth(18)
SETTINGS index_granularity = 8192;
CREATE MATERIALIZED VIEW sdk_events_parse_errors (
`topic` String,
`partition` Int64,
`offset` Int64,
`raw` String,
`error` String
) ENGINE = MergeTree
ORDER BY
(topic, partition, offset) SETTINGS index_granularity = 8192 AS
SELECT
_topic AS topic,
_partition AS partition,
_offset AS offset,
_raw_message AS raw,
_error AS error
FROM
sdk_events_queue
WHERE
length(_error) > 0;
CREATE MATERIALIZED VIEW sdk_events_audit_mv TO sdk_events_audit (
`payment_id` Nullable(String),
`merchant_id` String,
`remote_ip` Nullable(String),
`log_type` LowCardinality(Nullable(String)),
`event_name` LowCardinality(Nullable(String)),
`first_event` Bool,
`latency` Nullable(UInt32),
`browser_name` LowCardinality(Nullable(String)),
`browser_version` Nullable(String),
`platform` LowCardinality(Nullable(String)),
`source` LowCardinality(Nullable(String)),
`category` LowCardinality(Nullable(String)),
`version` LowCardinality(Nullable(String)),
`value` Nullable(String),
`component` LowCardinality(Nullable(String)),
`payment_method` LowCardinality(Nullable(String)),
`payment_experience` LowCardinality(Nullable(String)),
`created_at` DateTime64(3),
`created_at_precise` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4)
) AS
SELECT
payment_id,
merchant_id,
remote_ip,
log_type,
event_name,
multiIf(first_event = 'true', 1, 0) AS first_event,
latency,
browser_name,
browser_version,
platform,
source,
category,
version,
value,
component,
payment_method,
payment_experience,
toDateTime64(timestamp, 3) AS created_at,
toDateTime64(timestamp, 3) AS created_at_precise,
now() AS inserted_at
FROM
sdk_events_queue
WHERE
(length(_error) = 0)
AND (payment_id IS NOT NULL);
CREATE TABLE active_payments (
`payment_id` Nullable(String),
`merchant_id` String,
`created_at` DateTime64,
`flow_type` LowCardinality(Nullable(String)),
INDEX merchantIndex merchant_id TYPE bloom_filter GRANULARITY 1
) ENGINE = MergeTree
PARTITION BY toStartOfSecond(created_at)
ORDER BY
merchant_id
TTL
toDateTime(created_at) + INTERVAL 60 SECOND
SETTINGS
index_granularity = 8192;
CREATE MATERIALIZED VIEW sdk_active_payments_mv TO active_payments (
`payment_id` Nullable(String),
`merchant_id` String,
`created_at` DateTime64,
`flow_type` LowCardinality(Nullable(String))
) AS
SELECT
payment_id,
merchant_id,
toDateTime64(timestamp, 3) AS created_at,
'sdk' AS flow_type
FROM
sdk_events_queue
WHERE length(_error) = 0;
CREATE MATERIALIZED VIEW api_active_payments_mv TO active_payments (
`payment_id` Nullable(String),
`merchant_id` String,
`created_at` DateTime64,
`flow_type` LowCardinality(Nullable(String))
) AS
SELECT
payment_id,
merchant_id,
created_at_timestamp AS created_at,
flow_type
FROM
api_events_queue
WHERE length(_error) = 0;

View File

@ -0,0 +1,6 @@
pub mod accumulator;
mod core;
pub mod metrics;
pub use accumulator::{ActivePaymentsMetricAccumulator, ActivePaymentsMetricsAccumulator};
pub use self::core::get_metrics;

View File

@ -0,0 +1,47 @@
use api_models::analytics::active_payments::ActivePaymentsMetricsBucketValue;
use super::metrics::ActivePaymentsMetricRow;
#[derive(Debug, Default)]
pub struct ActivePaymentsMetricsAccumulator {
pub active_payments: CountAccumulator,
}
#[derive(Debug, Default)]
#[repr(transparent)]
pub struct CountAccumulator {
pub count: Option<i64>,
}
pub trait ActivePaymentsMetricAccumulator {
type MetricOutput;
fn add_metrics_bucket(&mut self, metrics: &ActivePaymentsMetricRow);
fn collect(self) -> Self::MetricOutput;
}
impl ActivePaymentsMetricAccumulator for CountAccumulator {
type MetricOutput = Option<u64>;
#[inline]
fn add_metrics_bucket(&mut self, metrics: &ActivePaymentsMetricRow) {
self.count = match (self.count, metrics.count) {
(None, None) => None,
(None, i @ Some(_)) | (i @ Some(_), None) => i,
(Some(a), Some(b)) => Some(a + b),
}
}
#[inline]
fn collect(self) -> Self::MetricOutput {
self.count.and_then(|i| u64::try_from(i).ok())
}
}
impl ActivePaymentsMetricsAccumulator {
#[allow(dead_code)]
pub fn collect(self) -> ActivePaymentsMetricsBucketValue {
ActivePaymentsMetricsBucketValue {
active_payments: self.active_payments.collect(),
}
}
}

View File

@ -0,0 +1,106 @@
use std::collections::HashMap;
use api_models::analytics::{
active_payments::{
ActivePaymentsMetrics, ActivePaymentsMetricsBucketIdentifier, MetricsBucketResponse,
},
AnalyticsMetadata, GetActivePaymentsMetricRequest, MetricsResponse,
};
use error_stack::ResultExt;
use router_env::{instrument, logger, tracing};
use super::ActivePaymentsMetricsAccumulator;
use crate::{
active_payments::ActivePaymentsMetricAccumulator,
errors::{AnalyticsError, AnalyticsResult},
AnalyticsProvider,
};
#[instrument(skip_all)]
pub async fn get_metrics(
pool: &AnalyticsProvider,
publishable_key: Option<&String>,
merchant_id: Option<&String>,
req: GetActivePaymentsMetricRequest,
) -> AnalyticsResult<MetricsResponse<MetricsBucketResponse>> {
let mut metrics_accumulator: HashMap<
ActivePaymentsMetricsBucketIdentifier,
ActivePaymentsMetricsAccumulator,
> = HashMap::new();
if let Some(publishable_key) = publishable_key {
if let Some(merchant_id) = merchant_id {
let mut set = tokio::task::JoinSet::new();
for metric_type in req.metrics.iter().cloned() {
let publishable_key_scoped = publishable_key.to_owned();
let merchant_id_scoped = merchant_id.to_owned();
let pool = pool.clone();
set.spawn(async move {
let data = pool
.get_active_payments_metrics(
&metric_type,
&merchant_id_scoped,
&publishable_key_scoped,
)
.await
.change_context(AnalyticsError::UnknownError);
(metric_type, data)
});
}
while let Some((metric, data)) = set
.join_next()
.await
.transpose()
.change_context(AnalyticsError::UnknownError)?
{
logger::info!("Logging metric: {metric} Result: {:?}", data);
for (id, value) in data? {
let metrics_builder = metrics_accumulator.entry(id).or_default();
match metric {
ActivePaymentsMetrics::ActivePayments => {
metrics_builder.active_payments.add_metrics_bucket(&value)
}
}
}
logger::debug!(
"Analytics Accumulated Results: metric: {}, results: {:#?}",
metric,
metrics_accumulator
);
}
let query_data: Vec<MetricsBucketResponse> = metrics_accumulator
.into_iter()
.map(|(id, val)| MetricsBucketResponse {
values: val.collect(),
dimensions: id,
})
.collect();
Ok(MetricsResponse {
query_data,
meta_data: [AnalyticsMetadata {
current_time_range: req.time_range,
}],
})
} else {
logger::error!("Merchant ID not present");
Ok(MetricsResponse {
query_data: vec![],
meta_data: [AnalyticsMetadata {
current_time_range: req.time_range,
}],
})
}
} else {
logger::error!("Publishable key not present for merchant ID");
Ok(MetricsResponse {
query_data: vec![],
meta_data: [AnalyticsMetadata {
current_time_range: req.time_range,
}],
})
}
}

View File

@ -0,0 +1,70 @@
use api_models::analytics::{
active_payments::{ActivePaymentsMetrics, ActivePaymentsMetricsBucketIdentifier},
Granularity,
};
use time::PrimitiveDateTime;
use crate::{
query::{Aggregate, GroupByClause, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, LoadRow, MetricsResult},
};
mod active_payments;
use active_payments::ActivePayments;
#[derive(Debug, PartialEq, Eq, serde::Deserialize)]
pub struct ActivePaymentsMetricRow {
pub count: Option<i64>,
}
pub trait ActivePaymentsMetricAnalytics: LoadRow<ActivePaymentsMetricRow> {}
#[async_trait::async_trait]
pub trait ActivePaymentsMetric<T>
where
T: AnalyticsDataSource + ActivePaymentsMetricAnalytics,
{
async fn load_metrics(
&self,
merchant_id: &str,
publishable_key: &str,
pool: &T,
) -> MetricsResult<
Vec<(
ActivePaymentsMetricsBucketIdentifier,
ActivePaymentsMetricRow,
)>,
>;
}
#[async_trait::async_trait]
impl<T> ActivePaymentsMetric<T> for ActivePaymentsMetrics
where
T: AnalyticsDataSource + ActivePaymentsMetricAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_metrics(
&self,
merchant_id: &str,
publishable_key: &str,
pool: &T,
) -> MetricsResult<
Vec<(
ActivePaymentsMetricsBucketIdentifier,
ActivePaymentsMetricRow,
)>,
> {
match self {
Self::ActivePayments => {
ActivePayments
.load_metrics(publishable_key, merchant_id, pool)
.await
}
}
}
}

View File

@ -0,0 +1,70 @@
use api_models::analytics::{active_payments::ActivePaymentsMetricsBucketIdentifier, Granularity};
use common_utils::errors::ReportSwitchExt;
use error_stack::ResultExt;
use time::PrimitiveDateTime;
use super::ActivePaymentsMetricRow;
use crate::{
query::{Aggregate, FilterTypes, GroupByClause, QueryBuilder, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult},
};
#[derive(Default)]
pub(super) struct ActivePayments;
#[async_trait::async_trait]
impl<T> super::ActivePaymentsMetric<T> for ActivePayments
where
T: AnalyticsDataSource + super::ActivePaymentsMetricAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_metrics(
&self,
merchant_id: &str,
publishable_key: &str,
pool: &T,
) -> MetricsResult<
Vec<(
ActivePaymentsMetricsBucketIdentifier,
ActivePaymentsMetricRow,
)>,
> {
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::ActivePaymentsAnalytics);
query_builder
.add_select_column(Aggregate::DistinctCount {
field: "payment_id",
alias: Some("count"),
})
.switch()?;
query_builder
.add_custom_filter_clause(
"merchant_id",
format!("'{}','{}'", merchant_id, publishable_key),
FilterTypes::In,
)
.switch()?;
query_builder
.execute_query::<ActivePaymentsMetricRow, _>(pool)
.await
.change_context(MetricsError::QueryBuildingError)?
.change_context(MetricsError::QueryExecutionFailure)?
.into_iter()
.map(|i| Ok((ActivePaymentsMetricsBucketIdentifier::new(None), i)))
.collect::<error_stack::Result<
Vec<(
ActivePaymentsMetricsBucketIdentifier,
ActivePaymentsMetricRow,
)>,
crate::query::PostProcessingError,
>>()
.change_context(MetricsError::PostProcessingFailure)
}
}

View File

@ -33,7 +33,8 @@ where
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(AuthEventMetricsBucketIdentifier, AuthEventMetricRow)>> {
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::SdkEvents);
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics);
query_builder
.add_select_column(Aggregate::Count {

View File

@ -33,7 +33,8 @@ where
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(AuthEventMetricsBucketIdentifier, AuthEventMetricRow)>> {
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::SdkEvents);
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics);
query_builder
.add_select_column(Aggregate::Count {

View File

@ -33,7 +33,8 @@ where
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(AuthEventMetricsBucketIdentifier, AuthEventMetricRow)>> {
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::SdkEvents);
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics);
query_builder
.add_select_column(Aggregate::Count {

View File

@ -33,7 +33,8 @@ where
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(AuthEventMetricsBucketIdentifier, AuthEventMetricRow)>> {
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::SdkEvents);
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics);
query_builder
.add_select_column(Aggregate::Count {

View File

@ -33,7 +33,8 @@ where
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(AuthEventMetricsBucketIdentifier, AuthEventMetricRow)>> {
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::SdkEvents);
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics);
query_builder
.add_select_column(Aggregate::Count {

View File

@ -7,6 +7,7 @@ use router_env::logger;
use time::PrimitiveDateTime;
use super::{
active_payments::metrics::ActivePaymentsMetricRow,
auth_events::metrics::AuthEventMetricRow,
health_check::HealthCheck,
payments::{
@ -133,10 +134,12 @@ impl AnalyticsDataSource for ClickhouseClient {
TableEngine::CollapsingMergeTree { sign: "sign_flag" }
}
AnalyticsCollection::SdkEvents
| AnalyticsCollection::SdkEventsAnalytics
| AnalyticsCollection::ApiEvents
| AnalyticsCollection::ConnectorEvents
| AnalyticsCollection::ApiEventsAnalytics
| AnalyticsCollection::OutgoingWebhookEvent => TableEngine::BasicTree,
| AnalyticsCollection::OutgoingWebhookEvent
| AnalyticsCollection::ActivePaymentsAnalytics => TableEngine::BasicTree,
}
}
}
@ -159,6 +162,7 @@ impl super::refunds::filters::RefundFilterAnalytics for ClickhouseClient {}
impl super::sdk_events::filters::SdkEventFilterAnalytics for ClickhouseClient {}
impl super::sdk_events::metrics::SdkEventMetricAnalytics for ClickhouseClient {}
impl super::sdk_events::events::SdkEventsFilterAnalytics for ClickhouseClient {}
impl super::active_payments::metrics::ActivePaymentsMetricAnalytics for ClickhouseClient {}
impl super::auth_events::metrics::AuthEventMetricAnalytics for ClickhouseClient {}
impl super::api_event::events::ApiLogsFilterAnalytics for ClickhouseClient {}
impl super::api_event::filters::ApiEventFilterAnalytics for ClickhouseClient {}
@ -353,6 +357,16 @@ impl TryInto<OutgoingWebhookLogsResult> for serde_json::Value {
}
}
impl TryInto<ActivePaymentsMetricRow> for serde_json::Value {
type Error = Report<ParsingError>;
fn try_into(self) -> Result<ActivePaymentsMetricRow, Self::Error> {
serde_json::from_value(self).change_context(ParsingError::StructParseFailure(
"Failed to parse ActivePaymentsMetricRow in clickhouse results",
))
}
}
impl ToSql<ClickhouseClient> for PrimitiveDateTime {
fn to_sql(&self, _table_engine: &TableEngine) -> error_stack::Result<String, ParsingError> {
let format =
@ -373,12 +387,14 @@ impl ToSql<ClickhouseClient> for AnalyticsCollection {
Self::Payment => Ok("payment_attempts".to_string()),
Self::Refund => Ok("refunds".to_string()),
Self::SdkEvents => Ok("sdk_events_audit".to_string()),
Self::SdkEventsAnalytics => Ok("sdk_events".to_string()),
Self::ApiEvents => Ok("api_events_audit".to_string()),
Self::ApiEventsAnalytics => Ok("api_events".to_string()),
Self::PaymentIntent => Ok("payment_intents".to_string()),
Self::ConnectorEvents => Ok("connector_events_audit".to_string()),
Self::OutgoingWebhookEvent => Ok("outgoing_webhook_events_audit".to_string()),
Self::Dispute => Ok("dispute".to_string()),
Self::ActivePaymentsAnalytics => Ok("active_payments".to_string()),
}
}
}
@ -451,6 +467,15 @@ where
alias.map_or_else(|| "".to_owned(), |alias| format!(" as {}", alias))
)
}
Self::DistinctCount { field, alias } => {
format!(
"count(distinct {}){}",
field
.to_sql(table_engine)
.attach_printable("Failed to percentile aggregate")?,
alias.map_or_else(|| "".to_owned(), |alias| format!(" as {}", alias))
)
}
})
}
}

View File

@ -7,6 +7,7 @@ pub mod payments;
mod query;
pub mod refunds;
pub mod active_payments;
pub mod api_event;
pub mod auth_events;
pub mod connector_events;
@ -32,6 +33,7 @@ pub mod utils;
use std::sync::Arc;
use api_models::analytics::{
active_payments::{ActivePaymentsMetrics, ActivePaymentsMetricsBucketIdentifier},
api_event::{
ApiEventDimensions, ApiEventFilters, ApiEventMetrics, ApiEventMetricsBucketIdentifier,
},
@ -56,6 +58,7 @@ use storage_impl::config::Database;
use strum::Display;
use self::{
active_payments::metrics::{ActivePaymentsMetric, ActivePaymentsMetricRow},
auth_events::metrics::{AuthEventMetric, AuthEventMetricRow},
payments::{
distribution::{PaymentDistribution, PaymentDistributionRow},
@ -514,7 +517,7 @@ impl AnalyticsProvider {
&self,
metric: &SdkEventMetrics,
dimensions: &[SdkEventDimensions],
pub_key: &str,
publishable_key: &str,
filters: &SdkEventFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
@ -523,14 +526,21 @@ impl AnalyticsProvider {
Self::Sqlx(_pool) => Err(report!(MetricsError::NotImplemented)),
Self::Clickhouse(pool) => {
metric
.load_metrics(dimensions, pub_key, filters, granularity, time_range, pool)
.load_metrics(
dimensions,
publishable_key,
filters,
granularity,
time_range,
pool,
)
.await
}
Self::CombinedCkh(_sqlx_pool, ckh_pool) | Self::CombinedSqlx(_sqlx_pool, ckh_pool) => {
metric
.load_metrics(
dimensions,
pub_key,
publishable_key,
filters,
granularity,
// Since SDK events are ckh only use ckh here
@ -542,6 +552,32 @@ impl AnalyticsProvider {
}
}
pub async fn get_active_payments_metrics(
&self,
metric: &ActivePaymentsMetrics,
merchant_id: &str,
publishable_key: &str,
) -> types::MetricsResult<
Vec<(
ActivePaymentsMetricsBucketIdentifier,
ActivePaymentsMetricRow,
)>,
> {
match self {
Self::Sqlx(_pool) => Err(report!(MetricsError::NotImplemented)),
Self::Clickhouse(pool) => {
metric
.load_metrics(merchant_id, publishable_key, pool)
.await
}
Self::CombinedCkh(_sqlx_pool, ckh_pool) | Self::CombinedSqlx(_sqlx_pool, ckh_pool) => {
metric
.load_metrics(merchant_id, publishable_key, ckh_pool)
.await
}
}
}
pub async fn get_auth_event_metrics(
&self,
metric: &AuthEventMetrics,
@ -723,6 +759,7 @@ pub enum AnalyticsFlow {
GetRefundsMetrics,
GetSdkMetrics,
GetAuthMetrics,
GetActivePaymentsMetrics,
GetPaymentFilters,
GetRefundFilters,
GetSdkEventFilters,

View File

@ -253,6 +253,10 @@ pub enum Aggregate<R> {
alias: Option<&'static str>,
percentile: Option<&'static u8>,
},
DistinctCount {
field: R,
alias: Option<&'static str>,
},
}
// Window functions in query

View File

@ -24,7 +24,8 @@ where
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::SdkEvents);
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics);
query_builder.add_select_column(dimension).switch()?;
time_range

View File

@ -36,7 +36,8 @@ where
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(SdkEventMetricsBucketIdentifier, SdkEventMetricRow)>> {
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::SdkEvents);
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics);
let dimensions = dimensions.to_vec();
for dim in dimensions.iter() {

View File

@ -36,7 +36,8 @@ where
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(SdkEventMetricsBucketIdentifier, SdkEventMetricRow)>> {
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::SdkEvents);
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics);
let dimensions = dimensions.to_vec();
for dim in dimensions.iter() {

View File

@ -36,7 +36,8 @@ where
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(SdkEventMetricsBucketIdentifier, SdkEventMetricRow)>> {
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::SdkEvents);
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics);
let dimensions = dimensions.to_vec();
for dim in dimensions.iter() {

View File

@ -36,7 +36,8 @@ where
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(SdkEventMetricsBucketIdentifier, SdkEventMetricRow)>> {
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::SdkEvents);
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics);
let dimensions = dimensions.to_vec();
for dim in dimensions.iter() {

View File

@ -36,7 +36,8 @@ where
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(SdkEventMetricsBucketIdentifier, SdkEventMetricRow)>> {
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::SdkEvents);
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics);
let dimensions = dimensions.to_vec();
for dim in dimensions.iter() {

View File

@ -36,7 +36,8 @@ where
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(SdkEventMetricsBucketIdentifier, SdkEventMetricRow)>> {
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::SdkEvents);
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics);
let dimensions = dimensions.to_vec();
for dim in dimensions.iter() {

View File

@ -36,7 +36,8 @@ where
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(SdkEventMetricsBucketIdentifier, SdkEventMetricRow)>> {
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::SdkEvents);
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics);
let dimensions = dimensions.to_vec();
for dim in dimensions.iter() {

View File

@ -36,7 +36,8 @@ where
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(SdkEventMetricsBucketIdentifier, SdkEventMetricRow)>> {
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::SdkEvents);
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::SdkEventsAnalytics);
let dimensions = dimensions.to_vec();
for dim in dimensions.iter() {

View File

@ -542,6 +542,8 @@ impl ToSql<SqlxClient> for AnalyticsCollection {
Self::Payment => Ok("payment_attempt".to_string()),
Self::Refund => Ok("refund".to_string()),
Self::SdkEvents => Err(error_stack::report!(ParsingError::UnknownError)
.attach_printable("SdkEventsAudit table is not implemented for Sqlx"))?,
Self::SdkEventsAnalytics => Err(error_stack::report!(ParsingError::UnknownError)
.attach_printable("SdkEvents table is not implemented for Sqlx"))?,
Self::ApiEvents => Err(error_stack::report!(ParsingError::UnknownError)
.attach_printable("ApiEvents table is not implemented for Sqlx"))?,
@ -550,6 +552,8 @@ impl ToSql<SqlxClient> for AnalyticsCollection {
.attach_printable("ConnectorEvents table is not implemented for Sqlx"))?,
Self::ApiEventsAnalytics => Err(error_stack::report!(ParsingError::UnknownError)
.attach_printable("ApiEvents table is not implemented for Sqlx"))?,
Self::ActivePaymentsAnalytics => Err(error_stack::report!(ParsingError::UnknownError)
.attach_printable("ActivePaymentsAnalytics table is not implemented for Sqlx"))?,
Self::OutgoingWebhookEvent => Err(error_stack::report!(ParsingError::UnknownError)
.attach_printable("OutgoingWebhookEvents table is not implemented for Sqlx"))?,
Self::Dispute => Ok("dispute".to_string()),
@ -610,6 +614,15 @@ where
alias.map_or_else(|| "".to_owned(), |alias| format!(" as {}", alias))
)
}
Self::DistinctCount { field, alias } => {
format!(
"count(distinct {}){}",
field
.to_sql(table_engine)
.attach_printable("Failed to distinct count aggregate")?,
alias.map_or_else(|| "".to_owned(), |alias| format!(" as {}", alias))
)
}
})
}
}

View File

@ -26,12 +26,14 @@ pub enum AnalyticsCollection {
Payment,
Refund,
SdkEvents,
SdkEventsAnalytics,
ApiEvents,
PaymentIntent,
ConnectorEvents,
OutgoingWebhookEvent,
Dispute,
ApiEventsAnalytics,
ActivePaymentsAnalytics,
}
#[allow(dead_code)]

View File

@ -4,6 +4,7 @@ use common_utils::pii::EmailStrategy;
use masking::Secret;
use self::{
active_payments::ActivePaymentsMetrics,
api_event::{ApiEventDimensions, ApiEventMetrics},
auth_events::AuthEventMetrics,
disputes::{DisputeDimensions, DisputeMetrics},
@ -13,6 +14,7 @@ use self::{
};
pub use crate::payments::TimeRange;
pub mod active_payments;
pub mod api_event;
pub mod auth_events;
pub mod connector_events;
@ -151,6 +153,14 @@ pub struct GetAuthEventMetricRequest {
pub delta: bool,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct GetActivePaymentsMetricRequest {
#[serde(default)]
pub metrics: HashSet<ActivePaymentsMetrics>,
pub time_range: TimeRange,
}
#[derive(Debug, serde::Serialize)]
pub struct AnalyticsMetadata {
pub current_time_range: TimeRange,

View File

@ -0,0 +1,77 @@
use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
};
use super::NameDescription;
#[derive(
Clone,
Debug,
Hash,
PartialEq,
Eq,
serde::Serialize,
serde::Deserialize,
strum::Display,
strum::EnumIter,
strum::AsRefStr,
)]
#[strum(serialize_all = "snake_case")]
#[serde(rename_all = "snake_case")]
pub enum ActivePaymentsMetrics {
ActivePayments,
}
pub mod metric_behaviour {
pub struct ActivePayments;
}
impl From<ActivePaymentsMetrics> for NameDescription {
fn from(value: ActivePaymentsMetrics) -> Self {
Self {
name: value.to_string(),
desc: String::new(),
}
}
}
#[derive(Debug, serde::Serialize, Eq)]
pub struct ActivePaymentsMetricsBucketIdentifier {
pub time_bucket: Option<String>,
}
impl ActivePaymentsMetricsBucketIdentifier {
pub fn new(time_bucket: Option<String>) -> Self {
Self { time_bucket }
}
}
impl Hash for ActivePaymentsMetricsBucketIdentifier {
fn hash<H: Hasher>(&self, state: &mut H) {
self.time_bucket.hash(state);
}
}
impl PartialEq for ActivePaymentsMetricsBucketIdentifier {
fn eq(&self, other: &Self) -> bool {
let mut left = DefaultHasher::new();
self.hash(&mut left);
let mut right = DefaultHasher::new();
other.hash(&mut right);
left.finish() == right.finish()
}
}
#[derive(Debug, serde::Serialize)]
pub struct ActivePaymentsMetricsBucketValue {
pub active_payments: Option<u64>,
}
#[derive(Debug, serde::Serialize)]
pub struct MetricsBucketResponse {
#[serde(flatten)]
pub values: ActivePaymentsMetricsBucketValue,
#[serde(flatten)]
pub dimensions: ActivePaymentsMetricsBucketIdentifier,
}

View File

@ -86,6 +86,7 @@ impl_misc_api_event_type!(
GetInfoResponse,
GetPaymentMetricRequest,
GetRefundMetricRequest,
GetActivePaymentsMetricRequest,
GetSdkEventMetricRequest,
GetAuthEventMetricRequest,
GetPaymentFiltersRequest,

View File

@ -12,10 +12,10 @@ pub mod routes {
search::{
GetGlobalSearchRequest, GetSearchRequest, GetSearchRequestWithIndex, SearchIndex,
},
GenerateReportRequest, GetApiEventFiltersRequest, GetApiEventMetricRequest,
GetAuthEventMetricRequest, GetDisputeMetricRequest, GetPaymentFiltersRequest,
GetPaymentMetricRequest, GetRefundFilterRequest, GetRefundMetricRequest,
GetSdkEventFiltersRequest, GetSdkEventMetricRequest, ReportRequest,
GenerateReportRequest, GetActivePaymentsMetricRequest, GetApiEventFiltersRequest,
GetApiEventMetricRequest, GetAuthEventMetricRequest, GetDisputeMetricRequest,
GetPaymentFiltersRequest, GetPaymentMetricRequest, GetRefundFilterRequest,
GetRefundMetricRequest, GetSdkEventFiltersRequest, GetSdkEventMetricRequest, ReportRequest,
};
use error_stack::ResultExt;
@ -70,6 +70,10 @@ pub mod routes {
web::resource("metrics/sdk_events")
.route(web::post().to(get_sdk_event_metrics)),
)
.service(
web::resource("metrics/active_payments")
.route(web::post().to(get_active_payments_metrics)),
)
.service(
web::resource("filters/sdk_events")
.route(web::post().to(get_sdk_event_filters)),
@ -245,6 +249,43 @@ pub mod routes {
.await
}
/// # Panics
///
/// Panics if `json_payload` array does not contain one `GetActivePaymentsMetricRequest` element.
pub async fn get_active_payments_metrics(
state: web::Data<AppState>,
req: actix_web::HttpRequest,
json_payload: web::Json<[GetActivePaymentsMetricRequest; 1]>,
) -> impl Responder {
// safety: This shouldn't panic owing to the data type
#[allow(clippy::expect_used)]
let payload = json_payload
.into_inner()
.to_vec()
.pop()
.expect("Couldn't get GetActivePaymentsMetricRequest");
let flow = AnalyticsFlow::GetActivePaymentsMetrics;
Box::pin(api::server_wrap(
flow,
state,
&req,
payload,
|state, auth: AuthenticationData, req, _| async move {
analytics::active_payments::get_metrics(
&state.pool,
auth.merchant_account.publishable_key.as_ref(),
Some(&auth.merchant_account.merchant_id),
req,
)
.await
.map(ApplicationResponse::Json)
},
&auth::JWTAuth(Permission::Analytics),
api_locking::LockAction::NotApplicable,
))
.await
}
/// # Panics
///
/// Panics if `json_payload` array does not contain one `GetAuthEventMetricRequest` element.

View File

@ -164,8 +164,8 @@ services:
- HYPERSWITCH_CLIENT_URL=http://localhost:9050
- SELF_SERVER_URL=http://localhost:5252
- SDK_ENV=local
- ENV_LOGGING_URL=http://localhost:3100
labels:
- ENV_LOGGING_URL=http://localhost:3103
labels:
logs: "promtail"
### Control Center
@ -405,6 +405,7 @@ services:
ports:
- "8686"
- "9598"
- "3103:3103"
profiles:
- olap
environment: