feat(analytics): Add Clickhouse based analytics (#2988)

Co-authored-by: harsh_sharma_juspay <harsh.sharma@juspay.in>
Co-authored-by: Ivor Dsouza <ivor.dsouza@juspay.in>
Co-authored-by: Chethan Rao <70657455+Chethan-rao@users.noreply.github.com>
Co-authored-by: nain-F49FF806 <126972030+nain-F49FF806@users.noreply.github.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: akshay.s <akshay.s@juspay.in>
Co-authored-by: Gnanasundari24 <118818938+Gnanasundari24@users.noreply.github.com>
This commit is contained in:
Sampras Lopes
2023-11-29 17:04:53 +05:30
committed by GitHub
parent 2e57745352
commit 9df4e0193f
135 changed files with 12145 additions and 901 deletions

View File

@ -0,0 +1,176 @@
use std::collections::HashMap;
use api_models::analytics::{
api_event::{
ApiEventMetricsBucketIdentifier, ApiEventMetricsBucketValue, ApiLogsRequest,
ApiMetricsBucketResponse,
},
AnalyticsMetadata, ApiEventFiltersResponse, GetApiEventFiltersRequest,
GetApiEventMetricRequest, MetricsResponse,
};
use error_stack::{IntoReport, ResultExt};
use router_env::{
instrument, logger,
tracing::{self, Instrument},
};
use super::{
events::{get_api_event, ApiLogsResult},
metrics::ApiEventMetricRow,
};
use crate::{
errors::{AnalyticsError, AnalyticsResult},
metrics,
types::FiltersError,
AnalyticsProvider,
};
#[instrument(skip_all)]
pub async fn api_events_core(
pool: &AnalyticsProvider,
req: ApiLogsRequest,
merchant_id: String,
) -> AnalyticsResult<Vec<ApiLogsResult>> {
let data = match pool {
AnalyticsProvider::Sqlx(_) => Err(FiltersError::NotImplemented)
.into_report()
.attach_printable("SQL Analytics is not implemented for API Events"),
AnalyticsProvider::Clickhouse(pool) => get_api_event(&merchant_id, req, pool).await,
AnalyticsProvider::CombinedSqlx(_sqlx_pool, ckh_pool)
| AnalyticsProvider::CombinedCkh(_sqlx_pool, ckh_pool) => {
get_api_event(&merchant_id, req, ckh_pool).await
}
}
.change_context(AnalyticsError::UnknownError)?;
Ok(data)
}
pub async fn get_filters(
pool: &AnalyticsProvider,
req: GetApiEventFiltersRequest,
merchant_id: String,
) -> AnalyticsResult<ApiEventFiltersResponse> {
use api_models::analytics::{api_event::ApiEventDimensions, ApiEventFilterValue};
use super::filters::get_api_event_filter_for_dimension;
use crate::api_event::filters::ApiEventFilter;
let mut res = ApiEventFiltersResponse::default();
for dim in req.group_by_names {
let values = match pool {
AnalyticsProvider::Sqlx(_pool) => Err(FiltersError::NotImplemented)
.into_report()
.attach_printable("SQL Analytics is not implemented for API Events"),
AnalyticsProvider::Clickhouse(ckh_pool)
| AnalyticsProvider::CombinedSqlx(_, ckh_pool)
| AnalyticsProvider::CombinedCkh(_, ckh_pool) => {
get_api_event_filter_for_dimension(dim, &merchant_id, &req.time_range, ckh_pool)
.await
}
}
.change_context(AnalyticsError::UnknownError)?
.into_iter()
.filter_map(|fil: ApiEventFilter| match dim {
ApiEventDimensions::StatusCode => fil.status_code.map(|i| i.to_string()),
ApiEventDimensions::FlowType => fil.flow_type,
ApiEventDimensions::ApiFlow => fil.api_flow,
})
.collect::<Vec<String>>();
res.query_data.push(ApiEventFilterValue {
dimension: dim,
values,
})
}
Ok(res)
}
#[instrument(skip_all)]
pub async fn get_api_event_metrics(
pool: &AnalyticsProvider,
merchant_id: &str,
req: GetApiEventMetricRequest,
) -> AnalyticsResult<MetricsResponse<ApiMetricsBucketResponse>> {
let mut metrics_accumulator: HashMap<ApiEventMetricsBucketIdentifier, ApiEventMetricRow> =
HashMap::new();
let mut set = tokio::task::JoinSet::new();
for metric_type in req.metrics.iter().cloned() {
let req = req.clone();
let pool = pool.clone();
let task_span = tracing::debug_span!(
"analytics_api_metrics_query",
api_event_metric = metric_type.as_ref()
);
// TODO: lifetime issues with joinset,
// can be optimized away if joinset lifetime requirements are relaxed
let merchant_id_scoped = merchant_id.to_owned();
set.spawn(
async move {
let data = pool
.get_api_event_metrics(
&metric_type,
&req.group_by_names.clone(),
&merchant_id_scoped,
&req.filters,
&req.time_series.map(|t| t.granularity),
&req.time_range,
)
.await
.change_context(AnalyticsError::UnknownError);
(metric_type, data)
}
.instrument(task_span),
);
}
while let Some((metric, data)) = set
.join_next()
.await
.transpose()
.into_report()
.change_context(AnalyticsError::UnknownError)?
{
let data = data?;
let attributes = &[
metrics::request::add_attributes("metric_type", metric.to_string()),
metrics::request::add_attributes("source", pool.to_string()),
];
let value = u64::try_from(data.len());
if let Ok(val) = value {
metrics::BUCKETS_FETCHED.record(&metrics::CONTEXT, val, attributes);
logger::debug!("Attributes: {:?}, Buckets fetched: {}", attributes, val);
}
for (id, value) in data {
metrics_accumulator
.entry(id)
.and_modify(|data| {
data.api_count = data.api_count.or(value.api_count);
data.status_code_count = data.status_code_count.or(value.status_code_count);
data.latency = data.latency.or(value.latency);
})
.or_insert(value);
}
}
let query_data: Vec<ApiMetricsBucketResponse> = metrics_accumulator
.into_iter()
.map(|(id, val)| ApiMetricsBucketResponse {
values: ApiEventMetricsBucketValue {
latency: val.latency,
api_count: val.api_count,
status_code_count: val.status_code_count,
},
dimensions: id,
})
.collect();
Ok(MetricsResponse {
query_data,
meta_data: [AnalyticsMetadata {
current_time_range: req.time_range,
}],
})
}

View File

@ -0,0 +1,105 @@
use api_models::analytics::{
api_event::{ApiLogsRequest, QueryType},
Granularity,
};
use common_utils::errors::ReportSwitchExt;
use error_stack::ResultExt;
use router_env::Flow;
use time::PrimitiveDateTime;
use crate::{
query::{Aggregate, GroupByClause, QueryBuilder, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, FiltersError, FiltersResult, LoadRow},
};
pub trait ApiLogsFilterAnalytics: LoadRow<ApiLogsResult> {}
pub async fn get_api_event<T>(
merchant_id: &String,
query_param: ApiLogsRequest,
pool: &T,
) -> FiltersResult<Vec<ApiLogsResult>>
where
T: AnalyticsDataSource + ApiLogsFilterAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::ApiEvents);
query_builder.add_select_column("*").switch()?;
query_builder
.add_filter_clause("merchant_id", merchant_id)
.switch()?;
match query_param.query_param {
QueryType::Payment { payment_id } => query_builder
.add_filter_clause("payment_id", payment_id)
.switch()?,
QueryType::Refund {
payment_id,
refund_id,
} => {
query_builder
.add_filter_clause("payment_id", payment_id)
.switch()?;
query_builder
.add_filter_clause("refund_id", refund_id)
.switch()?;
}
}
if let Some(list_api_name) = query_param.api_name_filter {
query_builder
.add_filter_in_range_clause("api_flow", &list_api_name)
.switch()?;
} else {
query_builder
.add_filter_in_range_clause(
"api_flow",
&[
Flow::PaymentsCancel,
Flow::PaymentsCapture,
Flow::PaymentsConfirm,
Flow::PaymentsCreate,
Flow::PaymentsStart,
Flow::PaymentsUpdate,
Flow::RefundsCreate,
Flow::IncomingWebhookReceive,
],
)
.switch()?;
}
//TODO!: update the execute_query function to return reports instead of plain errors...
query_builder
.execute_query::<ApiLogsResult, _>(pool)
.await
.change_context(FiltersError::QueryBuildingError)?
.change_context(FiltersError::QueryExecutionFailure)
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct ApiLogsResult {
pub merchant_id: String,
pub payment_id: Option<String>,
pub refund_id: Option<String>,
pub payment_method_id: Option<String>,
pub payment_method: Option<String>,
pub payment_method_type: Option<String>,
pub customer_id: Option<String>,
pub user_id: Option<String>,
pub connector: Option<String>,
pub request_id: Option<String>,
pub flow_type: String,
pub api_flow: String,
pub api_auth_type: Option<String>,
pub request: String,
pub response: Option<String>,
pub error: Option<String>,
pub authentication_data: Option<String>,
pub status_code: u16,
pub latency: Option<u128>,
pub user_agent: Option<String>,
pub hs_latency: Option<u128>,
pub ip_addr: Option<String>,
#[serde(with = "common_utils::custom_serde::iso8601")]
pub created_at: PrimitiveDateTime,
}

View File

@ -0,0 +1,53 @@
use api_models::analytics::{api_event::ApiEventDimensions, Granularity, TimeRange};
use common_utils::errors::ReportSwitchExt;
use error_stack::ResultExt;
use time::PrimitiveDateTime;
use crate::{
query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, FiltersError, FiltersResult, LoadRow},
};
pub trait ApiEventFilterAnalytics: LoadRow<ApiEventFilter> {}
pub async fn get_api_event_filter_for_dimension<T>(
dimension: ApiEventDimensions,
merchant_id: &String,
time_range: &TimeRange,
pool: &T,
) -> FiltersResult<Vec<ApiEventFilter>>
where
T: AnalyticsDataSource + ApiEventFilterAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::ApiEvents);
query_builder.add_select_column(dimension).switch()?;
time_range
.set_filter_clause(&mut query_builder)
.attach_printable("Error filtering time range")
.switch()?;
query_builder
.add_filter_clause("merchant_id", merchant_id)
.switch()?;
query_builder.set_distinct();
query_builder
.execute_query::<ApiEventFilter, _>(pool)
.await
.change_context(FiltersError::QueryBuildingError)?
.change_context(FiltersError::QueryExecutionFailure)
}
#[derive(Debug, serde::Serialize, Eq, PartialEq, serde::Deserialize)]
pub struct ApiEventFilter {
pub status_code: Option<i32>,
pub flow_type: Option<String>,
pub api_flow: Option<String>,
}

View File

@ -0,0 +1,110 @@
use api_models::analytics::{
api_event::{
ApiEventDimensions, ApiEventFilters, ApiEventMetrics, ApiEventMetricsBucketIdentifier,
},
Granularity, TimeRange,
};
use time::PrimitiveDateTime;
use crate::{
query::{Aggregate, GroupByClause, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, LoadRow, MetricsResult},
};
mod api_count;
pub mod latency;
mod status_code_count;
use api_count::ApiCount;
use latency::MaxLatency;
use status_code_count::StatusCodeCount;
use self::latency::LatencyAvg;
#[derive(Debug, PartialEq, Eq, serde::Deserialize)]
pub struct ApiEventMetricRow {
pub latency: Option<u64>,
pub api_count: Option<u64>,
pub status_code_count: Option<u64>,
#[serde(with = "common_utils::custom_serde::iso8601::option")]
pub start_bucket: Option<PrimitiveDateTime>,
#[serde(with = "common_utils::custom_serde::iso8601::option")]
pub end_bucket: Option<PrimitiveDateTime>,
}
pub trait ApiEventMetricAnalytics: LoadRow<ApiEventMetricRow> + LoadRow<LatencyAvg> {}
#[async_trait::async_trait]
pub trait ApiEventMetric<T>
where
T: AnalyticsDataSource + ApiEventMetricAnalytics,
{
async fn load_metrics(
&self,
dimensions: &[ApiEventDimensions],
merchant_id: &str,
filters: &ApiEventFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(ApiEventMetricsBucketIdentifier, ApiEventMetricRow)>>;
}
#[async_trait::async_trait]
impl<T> ApiEventMetric<T> for ApiEventMetrics
where
T: AnalyticsDataSource + ApiEventMetricAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_metrics(
&self,
dimensions: &[ApiEventDimensions],
merchant_id: &str,
filters: &ApiEventFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(ApiEventMetricsBucketIdentifier, ApiEventMetricRow)>> {
match self {
Self::Latency => {
MaxLatency
.load_metrics(
dimensions,
merchant_id,
filters,
granularity,
time_range,
pool,
)
.await
}
Self::ApiCount => {
ApiCount
.load_metrics(
dimensions,
merchant_id,
filters,
granularity,
time_range,
pool,
)
.await
}
Self::StatusCodeCount => {
StatusCodeCount
.load_metrics(
dimensions,
merchant_id,
filters,
granularity,
time_range,
pool,
)
.await
}
}
}
}

View File

@ -0,0 +1,106 @@
use api_models::analytics::{
api_event::{ApiEventDimensions, ApiEventFilters, ApiEventMetricsBucketIdentifier},
Granularity, TimeRange,
};
use common_utils::errors::ReportSwitchExt;
use error_stack::ResultExt;
use time::PrimitiveDateTime;
use super::ApiEventMetricRow;
use crate::{
query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult},
};
#[derive(Default)]
pub(super) struct ApiCount;
#[async_trait::async_trait]
impl<T> super::ApiEventMetric<T> for ApiCount
where
T: AnalyticsDataSource + super::ApiEventMetricAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_metrics(
&self,
_dimensions: &[ApiEventDimensions],
merchant_id: &str,
filters: &ApiEventFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(ApiEventMetricsBucketIdentifier, ApiEventMetricRow)>> {
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::ApiEvents);
query_builder
.add_select_column(Aggregate::Count {
field: None,
alias: Some("api_count"),
})
.switch()?;
if !filters.flow_type.is_empty() {
query_builder
.add_filter_in_range_clause(ApiEventDimensions::FlowType, &filters.flow_type)
.attach_printable("Error adding flow_type filter")
.switch()?;
}
query_builder
.add_select_column(Aggregate::Min {
field: "created_at",
alias: Some("start_bucket"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Max {
field: "created_at",
alias: Some("end_bucket"),
})
.switch()?;
if let Some(granularity) = granularity.as_ref() {
granularity
.set_group_by_clause(&mut query_builder)
.attach_printable("Error adding granularity")
.switch()?;
}
query_builder
.add_filter_clause("merchant_id", merchant_id)
.switch()?;
time_range
.set_filter_clause(&mut query_builder)
.attach_printable("Error filtering time range")
.switch()?;
query_builder
.execute_query::<ApiEventMetricRow, _>(pool)
.await
.change_context(MetricsError::QueryBuildingError)?
.change_context(MetricsError::QueryExecutionFailure)?
.into_iter()
.map(|i| {
Ok((
ApiEventMetricsBucketIdentifier::new(TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,
_ => time_range.start_time,
},
end_time: granularity.as_ref().map_or_else(
|| Ok(time_range.end_time),
|g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(),
)?,
}),
i,
))
})
.collect::<error_stack::Result<
Vec<(ApiEventMetricsBucketIdentifier, ApiEventMetricRow)>,
crate::query::PostProcessingError,
>>()
.change_context(MetricsError::PostProcessingFailure)
}
}

View File

@ -0,0 +1,138 @@
use api_models::analytics::{
api_event::{ApiEventDimensions, ApiEventFilters, ApiEventMetricsBucketIdentifier},
Granularity, TimeRange,
};
use common_utils::errors::ReportSwitchExt;
use error_stack::ResultExt;
use time::PrimitiveDateTime;
use super::ApiEventMetricRow;
use crate::{
query::{
Aggregate, FilterTypes, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql,
Window,
},
types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult},
};
#[derive(Default)]
pub(super) struct MaxLatency;
#[async_trait::async_trait]
impl<T> super::ApiEventMetric<T> for MaxLatency
where
T: AnalyticsDataSource + super::ApiEventMetricAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_metrics(
&self,
_dimensions: &[ApiEventDimensions],
merchant_id: &str,
filters: &ApiEventFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(ApiEventMetricsBucketIdentifier, ApiEventMetricRow)>> {
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::ApiEvents);
query_builder
.add_select_column(Aggregate::Sum {
field: "latency",
alias: Some("latency_sum"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Count {
field: Some("latency"),
alias: Some("latency_count"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Min {
field: "created_at",
alias: Some("start_bucket"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Max {
field: "created_at",
alias: Some("end_bucket"),
})
.switch()?;
if let Some(granularity) = granularity.as_ref() {
granularity
.set_group_by_clause(&mut query_builder)
.attach_printable("Error adding granularity")
.switch()?;
}
filters.set_filter_clause(&mut query_builder).switch()?;
query_builder
.add_filter_clause("merchant_id", merchant_id)
.switch()?;
time_range
.set_filter_clause(&mut query_builder)
.attach_printable("Error filtering time range")
.switch()?;
query_builder
.add_custom_filter_clause("request", "10.63.134.6", FilterTypes::NotLike)
.attach_printable("Error filtering out locker IP")
.switch()?;
query_builder
.execute_query::<LatencyAvg, _>(pool)
.await
.change_context(MetricsError::QueryBuildingError)?
.change_context(MetricsError::QueryExecutionFailure)?
.into_iter()
.map(|i| {
Ok((
ApiEventMetricsBucketIdentifier::new(TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,
_ => time_range.start_time,
},
end_time: granularity.as_ref().map_or_else(
|| Ok(time_range.end_time),
|g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(),
)?,
}),
ApiEventMetricRow {
latency: if i.latency_count != 0 {
Some(i.latency_sum.unwrap_or(0) / i.latency_count)
} else {
None
},
api_count: None,
status_code_count: None,
start_bucket: i.start_bucket,
end_bucket: i.end_bucket,
},
))
})
.collect::<error_stack::Result<
Vec<(ApiEventMetricsBucketIdentifier, ApiEventMetricRow)>,
crate::query::PostProcessingError,
>>()
.change_context(MetricsError::PostProcessingFailure)
}
}
#[derive(Debug, PartialEq, Eq, serde::Deserialize)]
pub struct LatencyAvg {
latency_sum: Option<u64>,
latency_count: u64,
#[serde(with = "common_utils::custom_serde::iso8601::option")]
pub start_bucket: Option<PrimitiveDateTime>,
#[serde(with = "common_utils::custom_serde::iso8601::option")]
pub end_bucket: Option<PrimitiveDateTime>,
}

View File

@ -0,0 +1,103 @@
use api_models::analytics::{
api_event::{ApiEventDimensions, ApiEventFilters, ApiEventMetricsBucketIdentifier},
Granularity, TimeRange,
};
use common_utils::errors::ReportSwitchExt;
use error_stack::ResultExt;
use time::PrimitiveDateTime;
use super::ApiEventMetricRow;
use crate::{
query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult},
};
#[derive(Default)]
pub(super) struct StatusCodeCount;
#[async_trait::async_trait]
impl<T> super::ApiEventMetric<T> for StatusCodeCount
where
T: AnalyticsDataSource + super::ApiEventMetricAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_metrics(
&self,
_dimensions: &[ApiEventDimensions],
merchant_id: &str,
filters: &ApiEventFilters,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(ApiEventMetricsBucketIdentifier, ApiEventMetricRow)>> {
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::ApiEvents);
query_builder
.add_select_column(Aggregate::Count {
field: Some("status_code"),
alias: Some("status_code_count"),
})
.switch()?;
filters.set_filter_clause(&mut query_builder).switch()?;
query_builder
.add_filter_clause("merchant_id", merchant_id)
.switch()?;
time_range
.set_filter_clause(&mut query_builder)
.attach_printable("Error filtering time range")
.switch()?;
query_builder
.add_select_column(Aggregate::Min {
field: "created_at",
alias: Some("start_bucket"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Max {
field: "created_at",
alias: Some("end_bucket"),
})
.switch()?;
if let Some(granularity) = granularity.as_ref() {
granularity
.set_group_by_clause(&mut query_builder)
.attach_printable("Error adding granularity")
.switch()?;
}
query_builder
.execute_query::<ApiEventMetricRow, _>(pool)
.await
.change_context(MetricsError::QueryBuildingError)?
.change_context(MetricsError::QueryExecutionFailure)?
.into_iter()
.map(|i| {
Ok((
ApiEventMetricsBucketIdentifier::new(TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,
_ => time_range.start_time,
},
end_time: granularity.as_ref().map_or_else(
|| Ok(time_range.end_time),
|g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(),
)?,
}),
i,
))
})
.collect::<error_stack::Result<
Vec<(ApiEventMetricsBucketIdentifier, ApiEventMetricRow)>,
crate::query::PostProcessingError,
>>()
.change_context(MetricsError::PostProcessingFailure)
}
}

View File

@ -0,0 +1,33 @@
use api_models::analytics::api_event::{ApiEventDimensions, ApiEventFilters};
use error_stack::ResultExt;
use crate::{
query::{QueryBuilder, QueryFilter, QueryResult, ToSql},
types::{AnalyticsCollection, AnalyticsDataSource},
};
impl<T> QueryFilter<T> for ApiEventFilters
where
T: AnalyticsDataSource,
AnalyticsCollection: ToSql<T>,
{
fn set_filter_clause(&self, builder: &mut QueryBuilder<T>) -> QueryResult<()> {
if !self.status_code.is_empty() {
builder
.add_filter_in_range_clause(ApiEventDimensions::StatusCode, &self.status_code)
.attach_printable("Error adding status_code filter")?;
}
if !self.flow_type.is_empty() {
builder
.add_filter_in_range_clause(ApiEventDimensions::FlowType, &self.flow_type)
.attach_printable("Error adding flow_type filter")?;
}
if !self.api_flow.is_empty() {
builder
.add_filter_in_range_clause(ApiEventDimensions::ApiFlow, &self.api_flow)
.attach_printable("Error adding api_name filter")?;
}
Ok(())
}
}