feat(analytics): populate status_code, initial_attempt_id & delivery_attempt on clickhouse for outgoing webhook events (#5383)

This commit is contained in:
Loaki07
2024-08-12 12:03:12 +05:30
committed by GitHub
parent ff430c983c
commit f9c29b084b
3 changed files with 84 additions and 5 deletions

View File

@ -12,6 +12,9 @@ CREATE TABLE outgoing_webhook_events_queue (
`content` Nullable(String), `content` Nullable(String),
`is_error` Bool, `is_error` Bool,
`error` Nullable(String), `error` Nullable(String),
`initial_attempt_id` Nullable(String),
`status_code` Nullable(UInt16),
`delivery_attempt` LowCardinality(String),
`created_at_timestamp` DateTime64(3) `created_at_timestamp` DateTime64(3)
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092', ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
kafka_topic_list = 'hyperswitch-outgoing-webhook-events', kafka_topic_list = 'hyperswitch-outgoing-webhook-events',
@ -33,6 +36,9 @@ CREATE TABLE outgoing_webhook_events (
`content` Nullable(String), `content` Nullable(String),
`is_error` Bool, `is_error` Bool,
`error` Nullable(String), `error` Nullable(String),
`initial_attempt_id` Nullable(String),
`status_code` Nullable(UInt16),
`delivery_attempt` LowCardinality(String),
`created_at` DateTime64(3), `created_at` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4), `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
INDEX eventIndex event_type TYPE bloom_filter GRANULARITY 1, INDEX eventIndex event_type TYPE bloom_filter GRANULARITY 1,
@ -61,6 +67,9 @@ CREATE TABLE outgoing_webhook_events_audit (
`content` Nullable(String), `content` Nullable(String),
`is_error` Bool, `is_error` Bool,
`error` Nullable(String), `error` Nullable(String),
`initial_attempt_id` Nullable(String),
`status_code` Nullable(UInt16),
`delivery_attempt` LowCardinality(String),
`created_at` DateTime64(3), `created_at` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4) `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4)
) ENGINE = MergeTree PARTITION BY merchant_id ) ENGINE = MergeTree PARTITION BY merchant_id
@ -81,6 +90,9 @@ CREATE MATERIALIZED VIEW outgoing_webhook_events_mv TO outgoing_webhook_events (
`content` Nullable(String), `content` Nullable(String),
`is_error` Bool, `is_error` Bool,
`error` Nullable(String), `error` Nullable(String),
`initial_attempt_id` Nullable(String),
`status_code` Nullable(UInt16),
`delivery_attempt` LowCardinality(String),
`created_at` DateTime64(3), `created_at` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4) `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4)
) AS ) AS
@ -98,6 +110,9 @@ SELECT
content, content,
is_error, is_error,
error, error,
initial_attempt_id,
status_code,
delivery_attempt,
created_at_timestamp AS created_at, created_at_timestamp AS created_at,
now() AS inserted_at now() AS inserted_at
FROM FROM
@ -119,6 +134,9 @@ CREATE MATERIALIZED VIEW outgoing_webhook_events_audit_mv TO outgoing_webhook_ev
`content` Nullable(String), `content` Nullable(String),
`is_error` Bool, `is_error` Bool,
`error` Nullable(String), `error` Nullable(String),
`initial_attempt_id` Nullable(String),
`status_code` Nullable(UInt16),
`delivery_attempt` LowCardinality(String),
`created_at` DateTime64(3), `created_at` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4) `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4)
) AS ) AS
@ -136,6 +154,9 @@ SELECT
content, content,
is_error, is_error,
error, error,
initial_attempt_id,
status_code,
delivery_attempt,
created_at_timestamp AS created_at, created_at_timestamp AS created_at,
now() AS inserted_at now() AS inserted_at
FROM FROM

View File

@ -5,7 +5,10 @@ use api_models::{
webhooks, webhooks,
}; };
use common_utils::{ use common_utils::{
ext_traits::Encode, request::RequestContent, type_name, types::keymanager::Identifier, ext_traits::{Encode, StringExt},
request::RequestContent,
type_name,
types::keymanager::{Identifier, KeyManagerState},
}; };
use diesel_models::process_tracker::business_status; use diesel_models::process_tracker::business_status;
use error_stack::{report, ResultExt}; use error_stack::{report, ResultExt};
@ -33,7 +36,8 @@ use crate::{
routes::{app::SessionStateInfo, SessionState}, routes::{app::SessionStateInfo, SessionState},
services, services,
types::{ types::{
api, domain, api,
domain::{self},
storage::{self, enums}, storage::{self, enums},
transformers::ForeignFrom, transformers::ForeignFrom,
}, },
@ -220,7 +224,15 @@ pub(crate) async fn trigger_webhook_and_raise_event(
) )
.await; .await;
raise_webhooks_analytics_event(state, trigger_webhook_result, content, merchant_id, event); let _ = raise_webhooks_analytics_event(
state,
trigger_webhook_result,
content,
merchant_id,
event,
merchant_key_store,
)
.await;
} }
async fn trigger_webhook_to_merchant( async fn trigger_webhook_to_merchant(
@ -430,13 +442,17 @@ async fn trigger_webhook_to_merchant(
Ok(()) Ok(())
} }
fn raise_webhooks_analytics_event( async fn raise_webhooks_analytics_event(
state: SessionState, state: SessionState,
trigger_webhook_result: CustomResult<(), errors::WebhooksFlowError>, trigger_webhook_result: CustomResult<(), errors::WebhooksFlowError>,
content: Option<api::OutgoingWebhookContent>, content: Option<api::OutgoingWebhookContent>,
merchant_id: common_utils::id_type::MerchantId, merchant_id: common_utils::id_type::MerchantId,
event: domain::Event, event: domain::Event,
merchant_key_store: &domain::MerchantKeyStore,
) { ) {
let key_manager_state: &KeyManagerState = &(&state).into();
let event_id = event.event_id;
let error = if let Err(error) = trigger_webhook_result { let error = if let Err(error) = trigger_webhook_result {
logger::error!(?error, "Failed to send webhook to merchant"); logger::error!(?error, "Failed to send webhook to merchant");
@ -456,13 +472,47 @@ fn raise_webhooks_analytics_event(
.and_then(api::OutgoingWebhookContent::get_outgoing_webhook_event_content) .and_then(api::OutgoingWebhookContent::get_outgoing_webhook_event_content)
.or_else(|| get_outgoing_webhook_event_content_from_event_metadata(event.metadata)); .or_else(|| get_outgoing_webhook_event_content_from_event_metadata(event.metadata));
// Fetch updated_event from db
let updated_event = state
.store
.find_event_by_merchant_id_event_id(
key_manager_state,
&merchant_id,
&event_id,
merchant_key_store,
)
.await
.attach_printable_lazy(|| format!("event not found for id: {}", &event_id))
.map_err(|error| {
logger::error!(?error);
error
})
.ok();
// Get status_code from webhook response
let status_code = updated_event.and_then(|updated_event| {
let webhook_response: Option<OutgoingWebhookResponseContent> =
updated_event.response.and_then(|res| {
res.peek()
.parse_struct("OutgoingWebhookResponseContent")
.map_err(|error| {
logger::error!(?error, "Error deserializing webhook response");
error
})
.ok()
});
webhook_response.and_then(|res| res.status_code)
});
let webhook_event = OutgoingWebhookEvent::new( let webhook_event = OutgoingWebhookEvent::new(
merchant_id, merchant_id,
event.event_id, event_id,
event.event_type, event.event_type,
outgoing_webhook_event_content, outgoing_webhook_event_content,
error, error,
event.initial_attempt_id, event.initial_attempt_id,
status_code,
event.delivery_attempt,
); );
state.event_handler().log_event(&webhook_event); state.event_handler().log_event(&webhook_event);
} }

View File

@ -1,4 +1,5 @@
use api_models::{enums::EventType as OutgoingWebhookEventType, webhooks::OutgoingWebhookContent}; use api_models::{enums::EventType as OutgoingWebhookEventType, webhooks::OutgoingWebhookContent};
use common_enums::WebhookDeliveryAttempt;
use serde::Serialize; use serde::Serialize;
use serde_json::Value; use serde_json::Value;
use time::OffsetDateTime; use time::OffsetDateTime;
@ -18,6 +19,8 @@ pub struct OutgoingWebhookEvent {
error: Option<Value>, error: Option<Value>,
created_at_timestamp: i128, created_at_timestamp: i128,
initial_attempt_id: Option<String>, initial_attempt_id: Option<String>,
status_code: Option<u16>,
delivery_attempt: Option<WebhookDeliveryAttempt>,
} }
#[derive(Clone, Debug, PartialEq, Serialize)] #[derive(Clone, Debug, PartialEq, Serialize)]
@ -89,6 +92,7 @@ impl OutgoingWebhookEventMetric for OutgoingWebhookContent {
} }
impl OutgoingWebhookEvent { impl OutgoingWebhookEvent {
#[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
merchant_id: common_utils::id_type::MerchantId, merchant_id: common_utils::id_type::MerchantId,
event_id: String, event_id: String,
@ -96,6 +100,8 @@ impl OutgoingWebhookEvent {
content: Option<OutgoingWebhookEventContent>, content: Option<OutgoingWebhookEventContent>,
error: Option<Value>, error: Option<Value>,
initial_attempt_id: Option<String>, initial_attempt_id: Option<String>,
status_code: Option<u16>,
delivery_attempt: Option<WebhookDeliveryAttempt>,
) -> Self { ) -> Self {
Self { Self {
merchant_id, merchant_id,
@ -106,6 +112,8 @@ impl OutgoingWebhookEvent {
error, error,
created_at_timestamp: OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000, created_at_timestamp: OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000,
initial_attempt_id, initial_attempt_id,
status_code,
delivery_attempt,
} }
} }
} }