feat(connector_events): added api to fetch connector event logs (#3319)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: harsh-sharma-juspay <125131007+harsh-sharma-juspay@users.noreply.github.com>
This commit is contained in:
ivor-juspay
2024-01-17 15:07:41 +05:30
committed by GitHub
parent 01c2de223f
commit 68a3a28067
12 changed files with 160 additions and 2 deletions

View File

@ -21,6 +21,7 @@ use crate::{
filters::ApiEventFilter,
metrics::{latency::LatencyAvg, ApiEventMetricRow},
},
connector_events::events::ConnectorEventsResult,
outgoing_webhook_event::events::OutgoingWebhookLogsResult,
sdk_events::events::SdkEventsResult,
types::TableEngine,
@ -121,6 +122,7 @@ impl AnalyticsDataSource for ClickhouseClient {
}
AnalyticsCollection::SdkEvents => TableEngine::BasicTree,
AnalyticsCollection::ApiEvents => TableEngine::BasicTree,
AnalyticsCollection::ConnectorEvents => TableEngine::BasicTree,
AnalyticsCollection::OutgoingWebhookEvent => TableEngine::BasicTree,
}
}
@ -147,6 +149,7 @@ impl super::sdk_events::events::SdkEventsFilterAnalytics for ClickhouseClient {}
impl super::api_event::events::ApiLogsFilterAnalytics for ClickhouseClient {}
impl super::api_event::filters::ApiEventFilterAnalytics for ClickhouseClient {}
impl super::api_event::metrics::ApiEventMetricAnalytics for ClickhouseClient {}
impl super::connector_events::events::ConnectorEventLogAnalytics for ClickhouseClient {}
impl super::outgoing_webhook_event::events::OutgoingWebhookLogsFilterAnalytics
for ClickhouseClient
{
@ -188,6 +191,18 @@ impl TryInto<SdkEventsResult> for serde_json::Value {
}
}
impl TryInto<ConnectorEventsResult> for serde_json::Value {
type Error = Report<ParsingError>;
fn try_into(self) -> Result<ConnectorEventsResult, Self::Error> {
serde_json::from_value(self)
.into_report()
.change_context(ParsingError::StructParseFailure(
"Failed to parse ConnectorEventsResult in clickhouse results",
))
}
}
impl TryInto<PaymentMetricRow> for serde_json::Value {
type Error = Report<ParsingError>;
@ -344,6 +359,7 @@ impl ToSql<ClickhouseClient> for AnalyticsCollection {
Self::SdkEvents => Ok("sdk_events_dist".to_string()),
Self::ApiEvents => Ok("api_audit_log".to_string()),
Self::PaymentIntent => Ok("payment_intents_dist".to_string()),
Self::ConnectorEvents => Ok("connector_events_audit".to_string()),
Self::OutgoingWebhookEvent => Ok("outgoing_webhook_events_audit".to_string()),
}
}

View File

@ -0,0 +1,5 @@
mod core;
pub mod events;
pub trait ConnectorEventAnalytics: events::ConnectorEventLogAnalytics {}
pub use self::core::connector_events_core;

View File

@ -0,0 +1,27 @@
use api_models::analytics::connector_events::ConnectorEventsRequest;
use common_utils::errors::ReportSwitchExt;
use error_stack::{IntoReport, ResultExt};
use super::events::{get_connector_events, ConnectorEventsResult};
use crate::{errors::AnalyticsResult, types::FiltersError, AnalyticsProvider};
pub async fn connector_events_core(
pool: &AnalyticsProvider,
req: ConnectorEventsRequest,
merchant_id: String,
) -> AnalyticsResult<Vec<ConnectorEventsResult>> {
let data = match pool {
AnalyticsProvider::Sqlx(_) => Err(FiltersError::NotImplemented(
"Connector Events not implemented for SQLX",
))
.into_report()
.attach_printable("SQL Analytics is not implemented for Connector Events"),
AnalyticsProvider::Clickhouse(ckh_pool)
| AnalyticsProvider::CombinedSqlx(_, ckh_pool)
| AnalyticsProvider::CombinedCkh(_, ckh_pool) => {
get_connector_events(&merchant_id, req, ckh_pool).await
}
}
.switch()?;
Ok(data)
}

View File

@ -0,0 +1,63 @@
use api_models::analytics::{
connector_events::{ConnectorEventsRequest, QueryType},
Granularity,
};
use common_utils::errors::ReportSwitchExt;
use error_stack::ResultExt;
use time::PrimitiveDateTime;
use crate::{
query::{Aggregate, GroupByClause, QueryBuilder, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, FiltersError, FiltersResult, LoadRow},
};
pub trait ConnectorEventLogAnalytics: LoadRow<ConnectorEventsResult> {}
pub async fn get_connector_events<T>(
merchant_id: &String,
query_param: ConnectorEventsRequest,
pool: &T,
) -> FiltersResult<Vec<ConnectorEventsResult>>
where
T: AnalyticsDataSource + ConnectorEventLogAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::ConnectorEvents);
query_builder.add_select_column("*").switch()?;
query_builder
.add_filter_clause("merchant_id", merchant_id)
.switch()?;
match query_param.query_param {
QueryType::Payment { payment_id } => query_builder
.add_filter_clause("payment_id", payment_id)
.switch()?,
}
//TODO!: update the execute_query function to return reports instead of plain errors...
query_builder
.execute_query::<ConnectorEventsResult, _>(pool)
.await
.change_context(FiltersError::QueryBuildingError)?
.change_context(FiltersError::QueryExecutionFailure)
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct ConnectorEventsResult {
pub merchant_id: String,
pub payment_id: String,
pub connector_name: Option<String>,
pub request_id: Option<String>,
pub flow: String,
pub request: String,
pub response: Option<String>,
pub error: Option<String>,
pub status_code: u16,
pub latency: Option<u128>,
#[serde(with = "common_utils::custom_serde::iso8601")]
pub created_at: PrimitiveDateTime,
pub method: Option<String>,
}

View File

@ -7,6 +7,7 @@ mod query;
pub mod refunds;
pub mod api_event;
pub mod connector_events;
pub mod outgoing_webhook_event;
pub mod sdk_events;
mod sqlx;

View File

@ -429,6 +429,8 @@ impl ToSql<SqlxClient> for AnalyticsCollection {
Self::ApiEvents => Err(error_stack::report!(ParsingError::UnknownError)
.attach_printable("ApiEvents table is not implemented for Sqlx"))?,
Self::PaymentIntent => Ok("payment_intent".to_string()),
Self::ConnectorEvents => Err(error_stack::report!(ParsingError::UnknownError)
.attach_printable("ConnectorEvents table is not implemented for Sqlx"))?,
Self::OutgoingWebhookEvent => Err(error_stack::report!(ParsingError::UnknownError)
.attach_printable("OutgoingWebhookEvents table is not implemented for Sqlx"))?,
}

View File

@ -26,6 +26,7 @@ pub enum AnalyticsCollection {
SdkEvents,
ApiEvents,
PaymentIntent,
ConnectorEvents,
OutgoingWebhookEvent,
}