feat(outgoingwebhookevent): adding api for query to fetch outgoing webhook events log (#3310)

Co-authored-by: Sampras Lopes <lsampras@pm.me>
This commit is contained in:
harsh-sharma-juspay
2024-01-11 18:42:09 +05:30
committed by GitHub
parent bb096138b5
commit 54d44bef73
12 changed files with 192 additions and 3 deletions

View File

@ -21,6 +21,7 @@ use crate::{
filters::ApiEventFilter,
metrics::{latency::LatencyAvg, ApiEventMetricRow},
},
outgoing_webhook_event::events::OutgoingWebhookLogsResult,
sdk_events::events::SdkEventsResult,
types::TableEngine,
};
@ -120,6 +121,7 @@ impl AnalyticsDataSource for ClickhouseClient {
}
AnalyticsCollection::SdkEvents => TableEngine::BasicTree,
AnalyticsCollection::ApiEvents => TableEngine::BasicTree,
AnalyticsCollection::OutgoingWebhookEvent => TableEngine::BasicTree,
}
}
}
@ -145,6 +147,10 @@ impl super::sdk_events::events::SdkEventsFilterAnalytics for ClickhouseClient {}
impl super::api_event::events::ApiLogsFilterAnalytics for ClickhouseClient {}
impl super::api_event::filters::ApiEventFilterAnalytics for ClickhouseClient {}
impl super::api_event::metrics::ApiEventMetricAnalytics for ClickhouseClient {}
impl super::outgoing_webhook_event::events::OutgoingWebhookLogsFilterAnalytics
for ClickhouseClient
{
}
#[derive(Debug, serde::Serialize)]
struct CkhQuery {
@ -302,6 +308,18 @@ impl TryInto<ApiEventFilter> for serde_json::Value {
}
}
impl TryInto<OutgoingWebhookLogsResult> for serde_json::Value {
type Error = Report<ParsingError>;
fn try_into(self) -> Result<OutgoingWebhookLogsResult, Self::Error> {
serde_json::from_value(self)
.into_report()
.change_context(ParsingError::StructParseFailure(
"Failed to parse OutgoingWebhookLogsResult in clickhouse results",
))
}
}
impl ToSql<ClickhouseClient> for PrimitiveDateTime {
fn to_sql(&self, _table_engine: &TableEngine) -> error_stack::Result<String, ParsingError> {
let format =
@ -326,6 +344,7 @@ impl ToSql<ClickhouseClient> for AnalyticsCollection {
Self::SdkEvents => Ok("sdk_events_dist".to_string()),
Self::ApiEvents => Ok("api_audit_log".to_string()),
Self::PaymentIntent => Ok("payment_intents_dist".to_string()),
Self::OutgoingWebhookEvent => Ok("outgoing_webhook_events_audit".to_string()),
}
}
}

View File

@ -7,6 +7,7 @@ mod query;
pub mod refunds;
pub mod api_event;
pub mod outgoing_webhook_event;
pub mod sdk_events;
mod sqlx;
mod types;

View File

@ -0,0 +1,6 @@
mod core;
pub mod events;
pub trait OutgoingWebhookEventAnalytics: events::OutgoingWebhookLogsFilterAnalytics {}
pub use self::core::outgoing_webhook_events_core;

View File

@ -0,0 +1,27 @@
use api_models::analytics::outgoing_webhook_event::OutgoingWebhookLogsRequest;
use common_utils::errors::ReportSwitchExt;
use error_stack::{IntoReport, ResultExt};
use super::events::{get_outgoing_webhook_event, OutgoingWebhookLogsResult};
use crate::{errors::AnalyticsResult, types::FiltersError, AnalyticsProvider};
pub async fn outgoing_webhook_events_core(
pool: &AnalyticsProvider,
req: OutgoingWebhookLogsRequest,
merchant_id: String,
) -> AnalyticsResult<Vec<OutgoingWebhookLogsResult>> {
let data = match pool {
AnalyticsProvider::Sqlx(_) => Err(FiltersError::NotImplemented(
"Outgoing Webhook Events Logs not implemented for SQLX",
))
.into_report()
.attach_printable("SQL Analytics is not implemented for Outgoing Webhook Events"),
AnalyticsProvider::Clickhouse(ckh_pool)
| AnalyticsProvider::CombinedSqlx(_, ckh_pool)
| AnalyticsProvider::CombinedCkh(_, ckh_pool) => {
get_outgoing_webhook_event(&merchant_id, req, ckh_pool).await
}
}
.switch()?;
Ok(data)
}

View File

@ -0,0 +1,90 @@
use api_models::analytics::{outgoing_webhook_event::OutgoingWebhookLogsRequest, Granularity};
use common_utils::errors::ReportSwitchExt;
use error_stack::ResultExt;
use time::PrimitiveDateTime;
use crate::{
query::{Aggregate, GroupByClause, QueryBuilder, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, FiltersError, FiltersResult, LoadRow},
};
pub trait OutgoingWebhookLogsFilterAnalytics: LoadRow<OutgoingWebhookLogsResult> {}
pub async fn get_outgoing_webhook_event<T>(
merchant_id: &String,
query_param: OutgoingWebhookLogsRequest,
pool: &T,
) -> FiltersResult<Vec<OutgoingWebhookLogsResult>>
where
T: AnalyticsDataSource + OutgoingWebhookLogsFilterAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::OutgoingWebhookEvent);
query_builder.add_select_column("*").switch()?;
query_builder
.add_filter_clause("merchant_id", merchant_id)
.switch()?;
query_builder
.add_filter_clause("payment_id", query_param.payment_id)
.switch()?;
if let Some(event_id) = query_param.event_id {
query_builder
.add_filter_clause("event_id", &event_id)
.switch()?;
}
if let Some(refund_id) = query_param.refund_id {
query_builder
.add_filter_clause("refund_id", &refund_id)
.switch()?;
}
if let Some(dispute_id) = query_param.dispute_id {
query_builder
.add_filter_clause("dispute_id", &dispute_id)
.switch()?;
}
if let Some(mandate_id) = query_param.mandate_id {
query_builder
.add_filter_clause("mandate_id", &mandate_id)
.switch()?;
}
if let Some(payment_method_id) = query_param.payment_method_id {
query_builder
.add_filter_clause("payment_method_id", &payment_method_id)
.switch()?;
}
if let Some(attempt_id) = query_param.attempt_id {
query_builder
.add_filter_clause("attempt_id", &attempt_id)
.switch()?;
}
//TODO!: update the execute_query function to return reports instead of plain errors...
query_builder
.execute_query::<OutgoingWebhookLogsResult, _>(pool)
.await
.change_context(FiltersError::QueryBuildingError)?
.change_context(FiltersError::QueryExecutionFailure)
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct OutgoingWebhookLogsResult {
pub merchant_id: String,
pub event_id: String,
pub event_type: String,
pub outgoing_webhook_event_type: String,
pub payment_id: String,
pub refund_id: Option<String>,
pub attempt_id: Option<String>,
pub dispute_id: Option<String>,
pub payment_method_id: Option<String>,
pub mandate_id: Option<String>,
pub content: Option<String>,
pub is_error: bool,
pub error: Option<String>,
#[serde(with = "common_utils::custom_serde::iso8601")]
pub created_at: PrimitiveDateTime,
}

View File

@ -429,6 +429,8 @@ impl ToSql<SqlxClient> for AnalyticsCollection {
Self::ApiEvents => Err(error_stack::report!(ParsingError::UnknownError)
.attach_printable("ApiEvents table is not implemented for Sqlx"))?,
Self::PaymentIntent => Ok("payment_intent".to_string()),
Self::OutgoingWebhookEvent => Err(error_stack::report!(ParsingError::UnknownError)
.attach_printable("OutgoingWebhookEvents table is not implemented for Sqlx"))?,
}
}
}

View File

@ -26,6 +26,7 @@ pub enum AnalyticsCollection {
SdkEvents,
ApiEvents,
PaymentIntent,
OutgoingWebhookEvent,
}
#[allow(dead_code)]