feat(analytics): adding outgoing webhooks kafka event (#3140)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
harsh-sharma-juspay
2024-01-05 16:01:56 +05:30
committed by GitHub
parent 34318bc1f1
commit 1d26df28bc
9 changed files with 268 additions and 11 deletions

View File

@ -530,3 +530,4 @@ attempt_analytics_topic = "topic" # Kafka topic to be used for PaymentAttempt e
refund_analytics_topic = "topic" # Kafka topic to be used for Refund events
api_logs_topic = "topic" # Kafka topic to be used for incoming api events
connector_logs_topic = "topic" # Kafka topic to be used for connector api events
outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webhook events

View File

@ -519,6 +519,7 @@ attempt_analytics_topic = "hyperswitch-payment-attempt-events"
refund_analytics_topic = "hyperswitch-refund-events"
api_logs_topic = "hyperswitch-api-log-events"
connector_logs_topic = "hyperswitch-connector-api-events"
outgoing_webhook_logs_topic = "hyperswitch-outgoing-webhook-events"
[analytics]
source = "sqlx"

View File

@ -366,6 +366,7 @@ attempt_analytics_topic = "hyperswitch-payment-attempt-events"
refund_analytics_topic = "hyperswitch-refund-events"
api_logs_topic = "hyperswitch-api-log-events"
connector_logs_topic = "hyperswitch-connector-api-events"
outgoing_webhook_logs_topic = "hyperswitch-outgoing-webhook-events"
[analytics]
source = "sqlx"

View File

@ -0,0 +1,109 @@
CREATE TABLE
outgoing_webhook_events_queue (
`merchant_id` String,
`event_id` Nullable(String),
`event_type` LowCardinality(String),
`outgoing_webhook_event_type` LowCardinality(String),
`payment_id` Nullable(String),
`refund_id` Nullable(String),
`attempt_id` Nullable(String),
`dispute_id` Nullable(String),
`payment_method_id` Nullable(String),
`mandate_id` Nullable(String),
`content` Nullable(String),
`is_error` Bool,
`error` Nullable(String),
`created_at_timestamp` DateTime64(3)
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
kafka_topic_list = 'hyperswitch-outgoing-webhook-events',
kafka_group_name = 'hyper-c1',
kafka_format = 'JSONEachRow',
kafka_handle_error_mode = 'stream';
CREATE TABLE
outgoing_webhook_events_cluster (
`merchant_id` String,
`event_id` String,
`event_type` LowCardinality(String),
`outgoing_webhook_event_type` LowCardinality(String),
`payment_id` Nullable(String),
`refund_id` Nullable(String),
`attempt_id` Nullable(String),
`dispute_id` Nullable(String),
`payment_method_id` Nullable(String),
`mandate_id` Nullable(String),
`content` Nullable(String),
`is_error` Bool,
`error` Nullable(String),
`created_at_timestamp` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
INDEX eventIndex event_type TYPE bloom_filter GRANULARITY 1,
INDEX webhookeventIndex outgoing_webhook_event_type TYPE bloom_filter GRANULARITY 1
) ENGINE = MergeTree PARTITION BY toStartOfDay(created_at_timestamp)
ORDER BY (
created_at_timestamp,
merchant_id,
event_id,
event_type,
outgoing_webhook_event_type
) TTL inserted_at + toIntervalMonth(6);
CREATE MATERIALIZED VIEW outgoing_webhook_events_mv TO outgoing_webhook_events_cluster (
`merchant_id` String,
`event_id` Nullable(String),
`event_type` LowCardinality(String),
`outgoing_webhook_event_type` LowCardinality(String),
`payment_id` Nullable(String),
`refund_id` Nullable(String),
`attempt_id` Nullable(String),
`dispute_id` Nullable(String),
`payment_method_id` Nullable(String),
`mandate_id` Nullable(String),
`content` Nullable(String),
`is_error` Bool,
`error` Nullable(String),
`created_at_timestamp` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
) AS
SELECT
merchant_id,
event_id,
event_type,
outgoing_webhook_event_type,
payment_id,
refund_id,
attempt_id,
dispute_id,
payment_method_id,
mandate_id,
content,
is_error,
error,
created_at_timestamp,
now() AS inserted_at
FROM
outgoing_webhook_events_queue
where length(_error) = 0;
CREATE MATERIALIZED VIEW outgoing_webhook_parse_errors (
`topic` String,
`partition` Int64,
`offset` Int64,
`raw` String,
`error` String
) ENGINE = MergeTree
ORDER BY (
topic, partition,
offset
) SETTINGS index_granularity = 8192 AS
SELECT
_topic AS topic,
_partition AS partition,
_offset AS
offset
,
_raw_message AS raw,
_error AS error
FROM
outgoing_webhook_events_queue
WHERE length(_error) > 0;

View File

@ -226,7 +226,7 @@ pub enum KmsError {
Utf8DecodingFailed,
}
#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, serde::Serialize)]
pub enum WebhooksFlowError {
#[error("Merchant webhook config not found")]
MerchantConfigNotFound,

View File

@ -25,7 +25,10 @@ use crate::{
payments, refunds,
},
db::StorageInterface,
events::api_logs::ApiEvent,
events::{
api_logs::ApiEvent,
outgoing_webhook_logs::{OutgoingWebhookEvent, OutgoingWebhookEventMetric},
},
logger,
routes::{app::AppStateInfo, lock_utils, metrics::request::add_attributes, AppState},
services::{self, authentication as auth},
@ -731,21 +734,47 @@ pub async fn create_event_and_trigger_outgoing_webhook<W: types::OutgoingWebhook
if state.conf.webhooks.outgoing_enabled {
let outgoing_webhook = api::OutgoingWebhook {
merchant_id: merchant_account.merchant_id.clone(),
event_id: event.event_id,
event_id: event.event_id.clone(),
event_type: event.event_type,
content,
content: content.clone(),
timestamp: event.created_at,
};
let state_clone = state.clone();
// Using a tokio spawn here and not arbiter because not all caller of this function
// may have an actix arbiter
tokio::spawn(async move {
let mut error = None;
let result =
trigger_webhook_to_merchant::<W>(business_profile, outgoing_webhook, state).await;
if let Err(e) = result {
error.replace(
serde_json::to_value(e.current_context())
.into_report()
.attach_printable("Failed to serialize json error response")
.change_context(errors::ApiErrorResponse::WebhookProcessingFailure)
.ok()
.into(),
);
logger::error!(?e);
}
let outgoing_webhook_event_type = content.get_outgoing_webhook_event_type();
let webhook_event = OutgoingWebhookEvent::new(
merchant_account.merchant_id.clone(),
event.event_id.clone(),
event_type,
outgoing_webhook_event_type,
error.is_some(),
error,
);
match webhook_event.clone().try_into() {
Ok(event) => {
state_clone.event_handler().log_event(event);
}
Err(err) => {
logger::error!(error=?err, event=?webhook_event, "Error Logging Outgoing Webhook Event");
}
}
});
}

View File

@ -9,6 +9,7 @@ pub mod api_logs;
pub mod connector_api_logs;
pub mod event_logger;
pub mod kafka_handler;
pub mod outgoing_webhook_logs;
pub(super) trait EventHandler: Sync + Send + dyn_clone::DynClone {
fn log_event(&self, event: RawEvent);
@ -31,6 +32,7 @@ pub enum EventType {
Refund,
ApiLogs,
ConnectorApiLogs,
OutgoingWebhookLogs,
}
#[derive(Debug, Default, Deserialize, Clone)]

View File

@ -0,0 +1,110 @@
use api_models::{enums::EventType as OutgoingWebhookEventType, webhooks::OutgoingWebhookContent};
use serde::Serialize;
use serde_json::Value;
use time::OffsetDateTime;
use super::{EventType, RawEvent};
#[derive(Clone, Debug, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub struct OutgoingWebhookEvent {
merchant_id: String,
event_id: String,
event_type: OutgoingWebhookEventType,
#[serde(flatten)]
content: Option<OutgoingWebhookEventContent>,
is_error: bool,
error: Option<Value>,
created_at_timestamp: i128,
}
#[derive(Clone, Debug, PartialEq, Serialize)]
#[serde(tag = "outgoing_webhook_event_type", rename_all = "snake_case")]
pub enum OutgoingWebhookEventContent {
Payment {
payment_id: Option<String>,
content: Value,
},
Refund {
payment_id: String,
refund_id: String,
content: Value,
},
Dispute {
payment_id: String,
attempt_id: String,
dispute_id: String,
content: Value,
},
Mandate {
payment_method_id: String,
mandate_id: String,
content: Value,
},
}
pub trait OutgoingWebhookEventMetric {
fn get_outgoing_webhook_event_type(&self) -> Option<OutgoingWebhookEventContent>;
}
impl OutgoingWebhookEventMetric for OutgoingWebhookContent {
fn get_outgoing_webhook_event_type(&self) -> Option<OutgoingWebhookEventContent> {
match self {
Self::PaymentDetails(payment_payload) => Some(OutgoingWebhookEventContent::Payment {
payment_id: payment_payload.payment_id.clone(),
content: masking::masked_serialize(&payment_payload)
.unwrap_or(serde_json::json!({"error":"failed to serialize"})),
}),
Self::RefundDetails(refund_payload) => Some(OutgoingWebhookEventContent::Refund {
payment_id: refund_payload.payment_id.clone(),
refund_id: refund_payload.refund_id.clone(),
content: masking::masked_serialize(&refund_payload)
.unwrap_or(serde_json::json!({"error":"failed to serialize"})),
}),
Self::DisputeDetails(dispute_payload) => Some(OutgoingWebhookEventContent::Dispute {
payment_id: dispute_payload.payment_id.clone(),
attempt_id: dispute_payload.attempt_id.clone(),
dispute_id: dispute_payload.dispute_id.clone(),
content: masking::masked_serialize(&dispute_payload)
.unwrap_or(serde_json::json!({"error":"failed to serialize"})),
}),
Self::MandateDetails(mandate_payload) => Some(OutgoingWebhookEventContent::Mandate {
payment_method_id: mandate_payload.payment_method_id.clone(),
mandate_id: mandate_payload.mandate_id.clone(),
content: masking::masked_serialize(&mandate_payload)
.unwrap_or(serde_json::json!({"error":"failed to serialize"})),
}),
}
}
}
impl OutgoingWebhookEvent {
pub fn new(
merchant_id: String,
event_id: String,
event_type: OutgoingWebhookEventType,
content: Option<OutgoingWebhookEventContent>,
is_error: bool,
error: Option<Value>,
) -> Self {
Self {
merchant_id,
event_id,
event_type,
content,
is_error,
error,
created_at_timestamp: OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000,
}
}
}
impl TryFrom<OutgoingWebhookEvent> for RawEvent {
type Error = serde_json::Error;
fn try_from(value: OutgoingWebhookEvent) -> Result<Self, Self::Error> {
Ok(Self {
event_type: EventType::OutgoingWebhookLogs,
key: value.merchant_id.clone(),
payload: serde_json::to_value(value)?,
})
}
}

View File

@ -84,6 +84,7 @@ pub struct KafkaSettings {
refund_analytics_topic: String,
api_logs_topic: String,
connector_logs_topic: String,
outgoing_webhook_logs_topic: String,
}
impl KafkaSettings {
@ -140,6 +141,7 @@ pub struct KafkaProducer {
refund_analytics_topic: String,
api_logs_topic: String,
connector_logs_topic: String,
outgoing_webhook_logs_topic: String,
}
struct RdKafkaProducer(ThreadedProducer<DefaultProducerContext>);
@ -177,6 +179,7 @@ impl KafkaProducer {
refund_analytics_topic: conf.refund_analytics_topic.clone(),
api_logs_topic: conf.api_logs_topic.clone(),
connector_logs_topic: conf.connector_logs_topic.clone(),
outgoing_webhook_logs_topic: conf.outgoing_webhook_logs_topic.clone(),
})
}
@ -309,6 +312,7 @@ impl KafkaProducer {
EventType::PaymentIntent => &self.intent_analytics_topic,
EventType::Refund => &self.refund_analytics_topic,
EventType::ConnectorApiLogs => &self.connector_logs_topic,
EventType::OutgoingWebhookLogs => &self.outgoing_webhook_logs_topic,
}
}
}