From bc25f3fa40e807cc92d2d53a2287b92eff727d3c Mon Sep 17 00:00:00 2001 From: ivor-juspay <138492857+ivor-juspay@users.noreply.github.com> Date: Thu, 4 Apr 2024 12:54:23 +0530 Subject: [PATCH] feat(payout-events): add kafka events for payout analytics (#4211) Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> --- config/config.example.toml | 1 + config/deployments/env_specific.toml | 1 + config/development.toml | 1 + config/docker_compose.toml | 1 + .../docs/clickhouse/scripts/payouts.sql | 109 ++++++++++++++++++ crates/router/src/db/kafka_store.rs | 24 +++- crates/router/src/events.rs | 2 + crates/router/src/services/kafka.rs | 41 +++++++ crates/router/src/services/kafka/payout.rs | 74 ++++++++++++ 9 files changed, 251 insertions(+), 3 deletions(-) create mode 100644 crates/analytics/docs/clickhouse/scripts/payouts.sql create mode 100644 crates/router/src/services/kafka/payout.rs diff --git a/config/config.example.toml b/config/config.example.toml index 074028c208..0ed971c18b 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -568,6 +568,7 @@ connector_logs_topic = "topic" # Kafka topic to be used for connector api outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webhook events dispute_analytics_topic = "topic" # Kafka topic to be used for Dispute events audit_events_topic = "topic" # Kafka topic to be used for Payment Audit events +payout_analytics_topic = "topic" # Kafka topic to be used for Payout events # File storage configuration [file_storage] diff --git a/config/deployments/env_specific.toml b/config/deployments/env_specific.toml index 8472076dc0..e0caafa7d7 100644 --- a/config/deployments/env_specific.toml +++ b/config/deployments/env_specific.toml @@ -80,6 +80,7 @@ connector_logs_topic = "topic" # Kafka topic to be used for connector api outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webhook events dispute_analytics_topic = "topic" # Kafka topic to be used for Dispute events audit_events_topic = "topic" # Kafka topic to be used for Payment Audit events +payout_analytics_topic = "topic" # Kafka topic to be used for Payout events # File storage configuration [file_storage] diff --git a/config/development.toml b/config/development.toml index 39911248cc..3b62ec3ca5 100644 --- a/config/development.toml +++ b/config/development.toml @@ -559,6 +559,7 @@ connector_logs_topic = "hyperswitch-connector-api-events" outgoing_webhook_logs_topic = "hyperswitch-outgoing-webhook-events" dispute_analytics_topic = "hyperswitch-dispute-events" audit_events_topic = "hyperswitch-audit-events" +payout_analytics_topic = "hyperswitch-payout-events" [analytics] source = "sqlx" diff --git a/config/docker_compose.toml b/config/docker_compose.toml index 0209b3f48e..769fdd8f84 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -409,6 +409,7 @@ connector_logs_topic = "hyperswitch-connector-api-events" outgoing_webhook_logs_topic = "hyperswitch-outgoing-webhook-events" dispute_analytics_topic = "hyperswitch-dispute-events" audit_events_topic = "hyperswitch-audit-events" +payout_analytics_topic = "hyperswitch-payout-events" [analytics] source = "sqlx" diff --git a/crates/analytics/docs/clickhouse/scripts/payouts.sql b/crates/analytics/docs/clickhouse/scripts/payouts.sql new file mode 100644 index 0000000000..0358c0e937 --- /dev/null +++ b/crates/analytics/docs/clickhouse/scripts/payouts.sql @@ -0,0 +1,109 @@ +CREATE TABLE payout_queue ( + `payout_id` String, + `merchant_id` String, + `customer_id` String, + `address_id` String, + `payout_type` LowCardinality(String), + `payout_method_id` Nullable(String), + `amount` UInt64, + `destination_currency` LowCardinality(String), + `source_currency` LowCardinality(String), + `description` Nullable(String), + `recurring` Bool, + `auto_fulfill` Bool, + `return_url` Nullable(String), + `entity_type` LowCardinality(String), + `metadata` Nullable(String), + `created_at` DateTime CODEC(T64, LZ4), + `last_modified_at` DateTime CODEC(T64, LZ4), + `attempt_count` UInt16, + `profile_id` String, + `status` LowCardinality(String), + `sign_flag` Int8 +) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092', +kafka_topic_list = 'hyperswitch-payout-events', +kafka_group_name = 'hyper-c1', +kafka_format = 'JSONEachRow', +kafka_handle_error_mode = 'stream'; + +CREATE TABLE payout ( + `payout_id` String, + `merchant_id` String, + `customer_id` String, + `address_id` String, + `payout_type` LowCardinality(String), + `payout_method_id` Nullable(String), + `amount` UInt64, + `destination_currency` LowCardinality(String), + `source_currency` LowCardinality(String), + `description` Nullable(String), + `recurring` Bool, + `auto_fulfill` Bool, + `return_url` Nullable(String), + `entity_type` LowCardinality(String), + `metadata` Nullable(String), + `created_at` DateTime DEFAULT now() CODEC(T64, LZ4), + `last_modified_at` DateTime DEFAULT now() CODEC(T64, LZ4), + `attempt_count` UInt16, + `profile_id` String, + `status` LowCardinality(String), + `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4), + `sign_flag` Int8, + INDEX payoutTypeIndex payout_type TYPE bloom_filter GRANULARITY 1, + INDEX destinationCurrencyIndex destination_currency TYPE bloom_filter GRANULARITY 1, + INDEX sourceCurrencyIndex source_currency TYPE bloom_filter GRANULARITY 1, + INDEX entityTypeIndex entity_type TYPE bloom_filter GRANULARITY 1, + INDEX statusIndex status TYPE bloom_filter GRANULARITY 1 +) ENGINE = CollapsingMergeTree(sign_flag) PARTITION BY toStartOfDay(created_at) +ORDER BY + (created_at, merchant_id, payout_id) TTL created_at + toIntervalMonth(6); + +CREATE MATERIALIZED VIEW kafka_parse_payout TO payout ( + `payout_id` String, + `merchant_id` String, + `customer_id` String, + `address_id` String, + `payout_type` LowCardinality(String), + `payout_method_id` Nullable(String), + `amount` UInt64, + `destination_currency` LowCardinality(String), + `source_currency` LowCardinality(String), + `description` Nullable(String), + `recurring` Bool, + `auto_fulfill` Bool, + `return_url` Nullable(String), + `entity_type` LowCardinality(String), + `metadata` Nullable(String), + `created_at` DateTime64(3), + `last_modified_at` DateTime64(3), + `attempt_count` UInt16, + `profile_id` String, + `status` LowCardinality(String), + `inserted_at` DateTime64(3), + `sign_flag` Int8 +) AS +SELECT + payout_id, + merchant_id, + customer_id, + address_id, + payout_type, + payout_method_id, + amount, + destination_currency, + source_currency, + description, + recurring, + auto_fulfill, + return_url, + entity_type, + metadata, + created_at, + last_modified_at, + attempt_count, + profile_id, + status, + now() as inserted_at, + sign_flag +FROM + payout_queue; \ No newline at end of file diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index dc8b9b791d..427c66daa0 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -1546,9 +1546,20 @@ impl PayoutsInterface for KafkaStore { payout_update: storage::PayoutsUpdate, storage_scheme: MerchantStorageScheme, ) -> CustomResult { - self.diesel_store + let payout = self + .diesel_store .update_payout(this, payout_update, storage_scheme) + .await?; + + if let Err(er) = self + .kafka_producer + .log_payout(&payout, Some(this.clone())) .await + { + logger::error!(message="Failed to add analytics entry for Payout {payout:?}", error_message=?er); + }; + + Ok(payout) } async fn insert_payout( @@ -1556,9 +1567,16 @@ impl PayoutsInterface for KafkaStore { payout: storage::PayoutsNew, storage_scheme: MerchantStorageScheme, ) -> CustomResult { - self.diesel_store + let payout = self + .diesel_store .insert_payout(payout, storage_scheme) - .await + .await?; + + if let Err(er) = self.kafka_producer.log_payout(&payout, None).await { + logger::error!(message="Failed to add analytics entry for Payout {payout:?}", error_message=?er); + }; + + Ok(payout) } async fn find_optional_payout_by_merchant_id_payout_id( diff --git a/crates/router/src/events.rs b/crates/router/src/events.rs index 5f6d978d45..2a0944b2f2 100644 --- a/crates/router/src/events.rs +++ b/crates/router/src/events.rs @@ -26,6 +26,7 @@ pub enum EventType { OutgoingWebhookLogs, Dispute, AuditEvent, + Payout, } #[derive(Debug, Default, Deserialize, Clone)] @@ -39,6 +40,7 @@ pub enum EventsConfig { Logs, } +#[allow(clippy::large_enum_variant)] #[derive(Debug, Clone)] pub enum EventsHandler { Kafka(KafkaProducer), diff --git a/crates/router/src/services/kafka.rs b/crates/router/src/services/kafka.rs index 373c268f85..e4708f8668 100644 --- a/crates/router/src/services/kafka.rs +++ b/crates/router/src/services/kafka.rs @@ -11,17 +11,23 @@ use crate::events::EventType; mod dispute; mod payment_attempt; mod payment_intent; +mod payout; mod refund; use data_models::payments::{payment_attempt::PaymentAttempt, PaymentIntent}; use diesel_models::refund::Refund; use serde::Serialize; use time::OffsetDateTime; +#[cfg(feature = "payouts")] +use self::payout::KafkaPayout; use self::{ dispute::KafkaDispute, payment_attempt::KafkaPaymentAttempt, payment_intent::KafkaPaymentIntent, refund::KafkaRefund, }; use crate::types::storage::Dispute; +#[cfg(feature = "payouts")] +use crate::types::storage::Payouts; + // Using message queue result here to avoid confusion with Kafka result provided by library pub type MQResult = CustomResult; @@ -91,6 +97,7 @@ pub struct KafkaSettings { outgoing_webhook_logs_topic: String, dispute_analytics_topic: String, audit_events_topic: String, + payout_analytics_topic: String, } impl KafkaSettings { @@ -156,6 +163,12 @@ impl KafkaSettings { )) })?; + common_utils::fp_utils::when(self.payout_analytics_topic.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "Kafka Payout Analytics topic must not be empty".into(), + )) + })?; + Ok(()) } } @@ -171,6 +184,7 @@ pub struct KafkaProducer { outgoing_webhook_logs_topic: String, dispute_analytics_topic: String, audit_events_topic: String, + payout_analytics_topic: String, } struct RdKafkaProducer(ThreadedProducer); @@ -210,6 +224,7 @@ impl KafkaProducer { outgoing_webhook_logs_topic: conf.outgoing_webhook_logs_topic.clone(), dispute_analytics_topic: conf.dispute_analytics_topic.clone(), audit_events_topic: conf.audit_events_topic.clone(), + payout_analytics_topic: conf.payout_analytics_topic.clone(), }) } @@ -224,6 +239,7 @@ impl KafkaProducer { EventType::OutgoingWebhookLogs => &self.outgoing_webhook_logs_topic, EventType::Dispute => &self.dispute_analytics_topic, EventType::AuditEvent => &self.audit_events_topic, + EventType::Payout => &self.payout_analytics_topic, }; self.producer .0 @@ -340,6 +356,30 @@ impl KafkaProducer { .attach_printable_lazy(|| format!("Failed to add positive dispute event {dispute:?}")) } + #[cfg(feature = "payouts")] + pub async fn log_payout(&self, payout: &Payouts, old_payout: Option) -> MQResult<()> { + if let Some(negative_event) = old_payout { + self.log_event(&KafkaEvent::old(&KafkaPayout::from_storage( + &negative_event, + ))) + .attach_printable_lazy(|| { + format!("Failed to add negative payout event {negative_event:?}") + })?; + }; + self.log_event(&KafkaEvent::new(&KafkaPayout::from_storage(payout))) + .attach_printable_lazy(|| format!("Failed to add positive payout event {payout:?}")) + } + + #[cfg(feature = "payouts")] + pub async fn log_payout_delete(&self, delete_old_payout: &Payouts) -> MQResult<()> { + self.log_event(&KafkaEvent::old(&KafkaPayout::from_storage( + delete_old_payout, + ))) + .attach_printable_lazy(|| { + format!("Failed to add negative payout event {delete_old_payout:?}") + }) + } + pub fn get_topic(&self, event: EventType) -> &str { match event { EventType::ApiLogs => &self.api_logs_topic, @@ -350,6 +390,7 @@ impl KafkaProducer { EventType::OutgoingWebhookLogs => &self.outgoing_webhook_logs_topic, EventType::Dispute => &self.dispute_analytics_topic, EventType::AuditEvent => &self.audit_events_topic, + EventType::Payout => &self.payout_analytics_topic, } } } diff --git a/crates/router/src/services/kafka/payout.rs b/crates/router/src/services/kafka/payout.rs new file mode 100644 index 0000000000..7e181e3d1f --- /dev/null +++ b/crates/router/src/services/kafka/payout.rs @@ -0,0 +1,74 @@ +use common_utils::pii; +use diesel_models::enums as storage_enums; +use time::OffsetDateTime; + +#[cfg(feature = "payouts")] +use crate::types::storage::Payouts; + +#[derive(serde::Serialize, Debug)] +pub struct KafkaPayout<'a> { + pub payout_id: &'a String, + pub merchant_id: &'a String, + pub customer_id: &'a String, + pub address_id: &'a String, + pub payout_type: &'a storage_enums::PayoutType, + pub payout_method_id: Option<&'a String>, + pub amount: i64, + pub destination_currency: &'a storage_enums::Currency, + pub source_currency: &'a storage_enums::Currency, + pub description: Option<&'a String>, + pub recurring: bool, + pub auto_fulfill: bool, + pub return_url: Option<&'a String>, + pub entity_type: &'a storage_enums::PayoutEntityType, + pub metadata: Option<&'a pii::SecretSerdeValue>, + #[serde(default, with = "time::serde::timestamp")] + pub created_at: OffsetDateTime, + #[serde(default, with = "time::serde::timestamp")] + pub last_modified_at: OffsetDateTime, + pub attempt_count: i16, + pub profile_id: &'a String, + pub status: &'a storage_enums::PayoutStatus, +} + +#[cfg(feature = "payouts")] +impl<'a> KafkaPayout<'a> { + pub fn from_storage(payout: &'a Payouts) -> Self { + Self { + payout_id: &payout.payout_id, + merchant_id: &payout.merchant_id, + customer_id: &payout.customer_id, + address_id: &payout.address_id, + payout_type: &payout.payout_type, + payout_method_id: payout.payout_method_id.as_ref(), + amount: payout.amount, + destination_currency: &payout.destination_currency, + source_currency: &payout.source_currency, + description: payout.description.as_ref(), + recurring: payout.recurring, + auto_fulfill: payout.auto_fulfill, + return_url: payout.return_url.as_ref(), + entity_type: &payout.entity_type, + metadata: payout.metadata.as_ref(), + created_at: payout.created_at.assume_utc(), + last_modified_at: payout.last_modified_at.assume_utc(), + attempt_count: payout.attempt_count, + profile_id: &payout.profile_id, + status: &payout.status, + } + } +} + +impl<'a> super::KafkaMessage for KafkaPayout<'a> { + fn key(&self) -> String { + format!("{}_{}", self.merchant_id, self.payout_id) + } + + fn creation_timestamp(&self) -> Option { + Some(self.last_modified_at.unix_timestamp()) + } + + fn event_type(&self) -> crate::events::EventType { + crate::events::EventType::Payout + } +}