fix: realtime user analytics (#5129)

This commit is contained in:
Vrishab Srivatsa
2024-07-02 15:51:08 +05:30
committed by GitHub
parent a343f69dc4
commit 5d86002ce7
6 changed files with 33 additions and 7 deletions

View File

@ -35,4 +35,5 @@ global_search=true
dispute_analytics=true dispute_analytics=true
configure_pmts=false configure_pmts=false
branding=false branding=false
totp=false totp=false
live_users_counter=true

View File

@ -210,6 +210,7 @@ CREATE TABLE active_payments (
`created_at` DateTime64, `created_at` DateTime64,
`flow_type` LowCardinality(Nullable(String)), `flow_type` LowCardinality(Nullable(String)),
INDEX merchantIndex merchant_id TYPE bloom_filter GRANULARITY 1 INDEX merchantIndex merchant_id TYPE bloom_filter GRANULARITY 1
INDEX flowTypeIndex flow_type TYPE bloom_filter GRANULARITY 1
) ENGINE = MergeTree ) ENGINE = MergeTree
PARTITION BY toStartOfSecond(created_at) PARTITION BY toStartOfSecond(created_at)
ORDER BY ORDER BY

View File

@ -41,6 +41,7 @@ pub async fn get_metrics(
&metric_type, &metric_type,
&merchant_id_scoped, &merchant_id_scoped,
&publishable_key_scoped, &publishable_key_scoped,
&req.time_range,
) )
.await .await
.change_context(AnalyticsError::UnknownError); .change_context(AnalyticsError::UnknownError);

View File

@ -1,6 +1,6 @@
use api_models::analytics::{ use api_models::analytics::{
active_payments::{ActivePaymentsMetrics, ActivePaymentsMetricsBucketIdentifier}, active_payments::{ActivePaymentsMetrics, ActivePaymentsMetricsBucketIdentifier},
Granularity, Granularity, TimeRange,
}; };
use time::PrimitiveDateTime; use time::PrimitiveDateTime;
@ -29,6 +29,7 @@ where
&self, &self,
merchant_id: &str, merchant_id: &str,
publishable_key: &str, publishable_key: &str,
time_range: &TimeRange,
pool: &T, pool: &T,
) -> MetricsResult< ) -> MetricsResult<
Vec<( Vec<(
@ -52,6 +53,7 @@ where
&self, &self,
merchant_id: &str, merchant_id: &str,
publishable_key: &str, publishable_key: &str,
time_range: &TimeRange,
pool: &T, pool: &T,
) -> MetricsResult< ) -> MetricsResult<
Vec<( Vec<(
@ -62,7 +64,7 @@ where
match self { match self {
Self::ActivePayments => { Self::ActivePayments => {
ActivePayments ActivePayments
.load_metrics(publishable_key, merchant_id, pool) .load_metrics(publishable_key, merchant_id, time_range, pool)
.await .await
} }
} }

View File

@ -1,11 +1,13 @@
use api_models::analytics::{active_payments::ActivePaymentsMetricsBucketIdentifier, Granularity}; use api_models::analytics::{
active_payments::ActivePaymentsMetricsBucketIdentifier, Granularity, TimeRange,
};
use common_utils::errors::ReportSwitchExt; use common_utils::errors::ReportSwitchExt;
use error_stack::ResultExt; use error_stack::ResultExt;
use time::PrimitiveDateTime; use time::PrimitiveDateTime;
use super::ActivePaymentsMetricRow; use super::ActivePaymentsMetricRow;
use crate::{ use crate::{
query::{Aggregate, FilterTypes, GroupByClause, QueryBuilder, ToSql, Window}, query::{Aggregate, FilterTypes, GroupByClause, QueryBuilder, QueryFilter, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult}, types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult},
}; };
@ -26,6 +28,7 @@ where
&self, &self,
merchant_id: &str, merchant_id: &str,
publishable_key: &str, publishable_key: &str,
time_range: &TimeRange,
pool: &T, pool: &T,
) -> MetricsResult< ) -> MetricsResult<
Vec<( Vec<(
@ -51,6 +54,23 @@ where
) )
.switch()?; .switch()?;
query_builder
.add_negative_filter_clause("payment_id", "")
.switch()?;
query_builder
.add_custom_filter_clause(
"flow_type",
"'sdk', 'payment', 'payment_redirection_response'",
FilterTypes::In,
)
.switch()?;
time_range
.set_filter_clause(&mut query_builder)
.attach_printable("Error filtering time range")
.switch()?;
query_builder query_builder
.execute_query::<ActivePaymentsMetricRow, _>(pool) .execute_query::<ActivePaymentsMetricRow, _>(pool)
.await .await

View File

@ -668,6 +668,7 @@ impl AnalyticsProvider {
metric: &ActivePaymentsMetrics, metric: &ActivePaymentsMetrics,
merchant_id: &str, merchant_id: &str,
publishable_key: &str, publishable_key: &str,
time_range: &TimeRange,
) -> types::MetricsResult< ) -> types::MetricsResult<
Vec<( Vec<(
ActivePaymentsMetricsBucketIdentifier, ActivePaymentsMetricsBucketIdentifier,
@ -678,12 +679,12 @@ impl AnalyticsProvider {
Self::Sqlx(_pool) => Err(report!(MetricsError::NotImplemented)), Self::Sqlx(_pool) => Err(report!(MetricsError::NotImplemented)),
Self::Clickhouse(pool) => { Self::Clickhouse(pool) => {
metric metric
.load_metrics(merchant_id, publishable_key, pool) .load_metrics(merchant_id, publishable_key, time_range, pool)
.await .await
} }
Self::CombinedCkh(_sqlx_pool, ckh_pool) | Self::CombinedSqlx(_sqlx_pool, ckh_pool) => { Self::CombinedCkh(_sqlx_pool, ckh_pool) | Self::CombinedSqlx(_sqlx_pool, ckh_pool) => {
metric metric
.load_metrics(merchant_id, publishable_key, ckh_pool) .load_metrics(merchant_id, publishable_key, time_range, ckh_pool)
.await .await
} }
} }