feat(payout-events): add kafka events for payout analytics (#4211)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
ivor-juspay
2024-04-04 12:54:23 +05:30
committed by GitHub
parent 9453e8fcfa
commit bc25f3fa40
9 changed files with 251 additions and 3 deletions

View File

@ -0,0 +1,109 @@
CREATE TABLE payout_queue (
`payout_id` String,
`merchant_id` String,
`customer_id` String,
`address_id` String,
`payout_type` LowCardinality(String),
`payout_method_id` Nullable(String),
`amount` UInt64,
`destination_currency` LowCardinality(String),
`source_currency` LowCardinality(String),
`description` Nullable(String),
`recurring` Bool,
`auto_fulfill` Bool,
`return_url` Nullable(String),
`entity_type` LowCardinality(String),
`metadata` Nullable(String),
`created_at` DateTime CODEC(T64, LZ4),
`last_modified_at` DateTime CODEC(T64, LZ4),
`attempt_count` UInt16,
`profile_id` String,
`status` LowCardinality(String),
`sign_flag` Int8
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
kafka_topic_list = 'hyperswitch-payout-events',
kafka_group_name = 'hyper-c1',
kafka_format = 'JSONEachRow',
kafka_handle_error_mode = 'stream';
CREATE TABLE payout (
`payout_id` String,
`merchant_id` String,
`customer_id` String,
`address_id` String,
`payout_type` LowCardinality(String),
`payout_method_id` Nullable(String),
`amount` UInt64,
`destination_currency` LowCardinality(String),
`source_currency` LowCardinality(String),
`description` Nullable(String),
`recurring` Bool,
`auto_fulfill` Bool,
`return_url` Nullable(String),
`entity_type` LowCardinality(String),
`metadata` Nullable(String),
`created_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`last_modified_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`attempt_count` UInt16,
`profile_id` String,
`status` LowCardinality(String),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`sign_flag` Int8,
INDEX payoutTypeIndex payout_type TYPE bloom_filter GRANULARITY 1,
INDEX destinationCurrencyIndex destination_currency TYPE bloom_filter GRANULARITY 1,
INDEX sourceCurrencyIndex source_currency TYPE bloom_filter GRANULARITY 1,
INDEX entityTypeIndex entity_type TYPE bloom_filter GRANULARITY 1,
INDEX statusIndex status TYPE bloom_filter GRANULARITY 1
) ENGINE = CollapsingMergeTree(sign_flag) PARTITION BY toStartOfDay(created_at)
ORDER BY
(created_at, merchant_id, payout_id) TTL created_at + toIntervalMonth(6);
CREATE MATERIALIZED VIEW kafka_parse_payout TO payout (
`payout_id` String,
`merchant_id` String,
`customer_id` String,
`address_id` String,
`payout_type` LowCardinality(String),
`payout_method_id` Nullable(String),
`amount` UInt64,
`destination_currency` LowCardinality(String),
`source_currency` LowCardinality(String),
`description` Nullable(String),
`recurring` Bool,
`auto_fulfill` Bool,
`return_url` Nullable(String),
`entity_type` LowCardinality(String),
`metadata` Nullable(String),
`created_at` DateTime64(3),
`last_modified_at` DateTime64(3),
`attempt_count` UInt16,
`profile_id` String,
`status` LowCardinality(String),
`inserted_at` DateTime64(3),
`sign_flag` Int8
) AS
SELECT
payout_id,
merchant_id,
customer_id,
address_id,
payout_type,
payout_method_id,
amount,
destination_currency,
source_currency,
description,
recurring,
auto_fulfill,
return_url,
entity_type,
metadata,
created_at,
last_modified_at,
attempt_count,
profile_id,
status,
now() as inserted_at,
sign_flag
FROM
payout_queue;

View File

@ -1546,9 +1546,20 @@ impl PayoutsInterface for KafkaStore {
payout_update: storage::PayoutsUpdate,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage::Payouts, errors::DataStorageError> {
self.diesel_store
let payout = self
.diesel_store
.update_payout(this, payout_update, storage_scheme)
.await?;
if let Err(er) = self
.kafka_producer
.log_payout(&payout, Some(this.clone()))
.await
{
logger::error!(message="Failed to add analytics entry for Payout {payout:?}", error_message=?er);
};
Ok(payout)
}
async fn insert_payout(
@ -1556,9 +1567,16 @@ impl PayoutsInterface for KafkaStore {
payout: storage::PayoutsNew,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage::Payouts, errors::DataStorageError> {
self.diesel_store
let payout = self
.diesel_store
.insert_payout(payout, storage_scheme)
.await
.await?;
if let Err(er) = self.kafka_producer.log_payout(&payout, None).await {
logger::error!(message="Failed to add analytics entry for Payout {payout:?}", error_message=?er);
};
Ok(payout)
}
async fn find_optional_payout_by_merchant_id_payout_id(

View File

@ -26,6 +26,7 @@ pub enum EventType {
OutgoingWebhookLogs,
Dispute,
AuditEvent,
Payout,
}
#[derive(Debug, Default, Deserialize, Clone)]
@ -39,6 +40,7 @@ pub enum EventsConfig {
Logs,
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone)]
pub enum EventsHandler {
Kafka(KafkaProducer),

View File

@ -11,17 +11,23 @@ use crate::events::EventType;
mod dispute;
mod payment_attempt;
mod payment_intent;
mod payout;
mod refund;
use data_models::payments::{payment_attempt::PaymentAttempt, PaymentIntent};
use diesel_models::refund::Refund;
use serde::Serialize;
use time::OffsetDateTime;
#[cfg(feature = "payouts")]
use self::payout::KafkaPayout;
use self::{
dispute::KafkaDispute, payment_attempt::KafkaPaymentAttempt,
payment_intent::KafkaPaymentIntent, refund::KafkaRefund,
};
use crate::types::storage::Dispute;
#[cfg(feature = "payouts")]
use crate::types::storage::Payouts;
// Using message queue result here to avoid confusion with Kafka result provided by library
pub type MQResult<T> = CustomResult<T, KafkaError>;
@ -91,6 +97,7 @@ pub struct KafkaSettings {
outgoing_webhook_logs_topic: String,
dispute_analytics_topic: String,
audit_events_topic: String,
payout_analytics_topic: String,
}
impl KafkaSettings {
@ -156,6 +163,12 @@ impl KafkaSettings {
))
})?;
common_utils::fp_utils::when(self.payout_analytics_topic.is_default_or_empty(), || {
Err(ApplicationError::InvalidConfigurationValueError(
"Kafka Payout Analytics topic must not be empty".into(),
))
})?;
Ok(())
}
}
@ -171,6 +184,7 @@ pub struct KafkaProducer {
outgoing_webhook_logs_topic: String,
dispute_analytics_topic: String,
audit_events_topic: String,
payout_analytics_topic: String,
}
struct RdKafkaProducer(ThreadedProducer<DefaultProducerContext>);
@ -210,6 +224,7 @@ impl KafkaProducer {
outgoing_webhook_logs_topic: conf.outgoing_webhook_logs_topic.clone(),
dispute_analytics_topic: conf.dispute_analytics_topic.clone(),
audit_events_topic: conf.audit_events_topic.clone(),
payout_analytics_topic: conf.payout_analytics_topic.clone(),
})
}
@ -224,6 +239,7 @@ impl KafkaProducer {
EventType::OutgoingWebhookLogs => &self.outgoing_webhook_logs_topic,
EventType::Dispute => &self.dispute_analytics_topic,
EventType::AuditEvent => &self.audit_events_topic,
EventType::Payout => &self.payout_analytics_topic,
};
self.producer
.0
@ -340,6 +356,30 @@ impl KafkaProducer {
.attach_printable_lazy(|| format!("Failed to add positive dispute event {dispute:?}"))
}
#[cfg(feature = "payouts")]
pub async fn log_payout(&self, payout: &Payouts, old_payout: Option<Payouts>) -> MQResult<()> {
if let Some(negative_event) = old_payout {
self.log_event(&KafkaEvent::old(&KafkaPayout::from_storage(
&negative_event,
)))
.attach_printable_lazy(|| {
format!("Failed to add negative payout event {negative_event:?}")
})?;
};
self.log_event(&KafkaEvent::new(&KafkaPayout::from_storage(payout)))
.attach_printable_lazy(|| format!("Failed to add positive payout event {payout:?}"))
}
#[cfg(feature = "payouts")]
pub async fn log_payout_delete(&self, delete_old_payout: &Payouts) -> MQResult<()> {
self.log_event(&KafkaEvent::old(&KafkaPayout::from_storage(
delete_old_payout,
)))
.attach_printable_lazy(|| {
format!("Failed to add negative payout event {delete_old_payout:?}")
})
}
pub fn get_topic(&self, event: EventType) -> &str {
match event {
EventType::ApiLogs => &self.api_logs_topic,
@ -350,6 +390,7 @@ impl KafkaProducer {
EventType::OutgoingWebhookLogs => &self.outgoing_webhook_logs_topic,
EventType::Dispute => &self.dispute_analytics_topic,
EventType::AuditEvent => &self.audit_events_topic,
EventType::Payout => &self.payout_analytics_topic,
}
}
}

View File

@ -0,0 +1,74 @@
use common_utils::pii;
use diesel_models::enums as storage_enums;
use time::OffsetDateTime;
#[cfg(feature = "payouts")]
use crate::types::storage::Payouts;
#[derive(serde::Serialize, Debug)]
pub struct KafkaPayout<'a> {
pub payout_id: &'a String,
pub merchant_id: &'a String,
pub customer_id: &'a String,
pub address_id: &'a String,
pub payout_type: &'a storage_enums::PayoutType,
pub payout_method_id: Option<&'a String>,
pub amount: i64,
pub destination_currency: &'a storage_enums::Currency,
pub source_currency: &'a storage_enums::Currency,
pub description: Option<&'a String>,
pub recurring: bool,
pub auto_fulfill: bool,
pub return_url: Option<&'a String>,
pub entity_type: &'a storage_enums::PayoutEntityType,
pub metadata: Option<&'a pii::SecretSerdeValue>,
#[serde(default, with = "time::serde::timestamp")]
pub created_at: OffsetDateTime,
#[serde(default, with = "time::serde::timestamp")]
pub last_modified_at: OffsetDateTime,
pub attempt_count: i16,
pub profile_id: &'a String,
pub status: &'a storage_enums::PayoutStatus,
}
#[cfg(feature = "payouts")]
impl<'a> KafkaPayout<'a> {
pub fn from_storage(payout: &'a Payouts) -> Self {
Self {
payout_id: &payout.payout_id,
merchant_id: &payout.merchant_id,
customer_id: &payout.customer_id,
address_id: &payout.address_id,
payout_type: &payout.payout_type,
payout_method_id: payout.payout_method_id.as_ref(),
amount: payout.amount,
destination_currency: &payout.destination_currency,
source_currency: &payout.source_currency,
description: payout.description.as_ref(),
recurring: payout.recurring,
auto_fulfill: payout.auto_fulfill,
return_url: payout.return_url.as_ref(),
entity_type: &payout.entity_type,
metadata: payout.metadata.as_ref(),
created_at: payout.created_at.assume_utc(),
last_modified_at: payout.last_modified_at.assume_utc(),
attempt_count: payout.attempt_count,
profile_id: &payout.profile_id,
status: &payout.status,
}
}
}
impl<'a> super::KafkaMessage for KafkaPayout<'a> {
fn key(&self) -> String {
format!("{}_{}", self.merchant_id, self.payout_id)
}
fn creation_timestamp(&self) -> Option<i64> {
Some(self.last_modified_at.unix_timestamp())
}
fn event_type(&self) -> crate::events::EventType {
crate::events::EventType::Payout
}
}