feat(consolidated-kafka-events): add consolidated kafka payment events (#4798)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: Sampras Lopes <Sampras.lopes@juspay.in>
This commit is contained in:
ivor-juspay
2024-06-03 14:46:17 +05:30
committed by GitHub
parent 865007717c
commit ccee1a9ce9
10 changed files with 455 additions and 39 deletions

View File

@ -30,6 +30,7 @@ pub enum EventType {
AuditEvent,
#[cfg(feature = "payouts")]
Payout,
Consolidated,
}
#[derive(Debug, Default, Deserialize, Clone)]

View File

@ -12,9 +12,13 @@ use rdkafka::{
pub mod payout;
use crate::events::EventType;
mod dispute;
mod dispute_event;
mod payment_attempt;
mod payment_attempt_event;
mod payment_intent;
mod payment_intent_event;
mod refund;
mod refund_event;
use diesel_models::refund::Refund;
use hyperswitch_domain_models::payments::{payment_attempt::PaymentAttempt, PaymentIntent};
use serde::Serialize;
@ -23,8 +27,10 @@ use time::{OffsetDateTime, PrimitiveDateTime};
#[cfg(feature = "payouts")]
use self::payout::KafkaPayout;
use self::{
dispute::KafkaDispute, payment_attempt::KafkaPaymentAttempt,
payment_intent::KafkaPaymentIntent, refund::KafkaRefund,
dispute::KafkaDispute, dispute_event::KafkaDisputeEvent, payment_attempt::KafkaPaymentAttempt,
payment_attempt_event::KafkaPaymentAttemptEvent, payment_intent::KafkaPaymentIntent,
payment_intent_event::KafkaPaymentIntentEvent, refund::KafkaRefund,
refund_event::KafkaRefundEvent,
};
use crate::types::storage::Dispute;
@ -89,6 +95,42 @@ impl<'a, T: KafkaMessage> KafkaMessage for KafkaEvent<'a, T> {
}
}
#[derive(serde::Serialize, Debug)]
struct KafkaConsolidatedLog<'a, T: KafkaMessage> {
#[serde(flatten)]
event: &'a T,
tenant_id: TenantID,
}
#[derive(serde::Serialize, Debug)]
struct KafkaConsolidatedEvent<'a, T: KafkaMessage> {
log: KafkaConsolidatedLog<'a, T>,
log_type: EventType,
}
impl<'a, T: KafkaMessage> KafkaConsolidatedEvent<'a, T> {
fn new(event: &'a T, tenant_id: TenantID) -> Self {
Self {
log: KafkaConsolidatedLog { event, tenant_id },
log_type: event.event_type(),
}
}
}
impl<'a, T: KafkaMessage> KafkaMessage for KafkaConsolidatedEvent<'a, T> {
fn key(&self) -> String {
self.log.event.key()
}
fn event_type(&self) -> EventType {
EventType::Consolidated
}
fn creation_timestamp(&self) -> Option<i64> {
self.log.event.creation_timestamp()
}
}
#[derive(Debug, serde::Deserialize, Clone, Default)]
#[serde(default)]
pub struct KafkaSettings {
@ -103,6 +145,7 @@ pub struct KafkaSettings {
audit_events_topic: String,
#[cfg(feature = "payouts")]
payout_analytics_topic: String,
consolidated_events_topic: String,
}
impl KafkaSettings {
@ -175,6 +218,12 @@ impl KafkaSettings {
))
})?;
common_utils::fp_utils::when(self.consolidated_events_topic.is_default_or_empty(), || {
Err(ApplicationError::InvalidConfigurationValueError(
"Consolidated Events topic must not be empty".into(),
))
})?;
Ok(())
}
}
@ -192,6 +241,7 @@ pub struct KafkaProducer {
audit_events_topic: String,
#[cfg(feature = "payouts")]
payout_analytics_topic: String,
consolidated_events_topic: String,
}
struct RdKafkaProducer(ThreadedProducer<DefaultProducerContext>);
@ -233,23 +283,13 @@ impl KafkaProducer {
audit_events_topic: conf.audit_events_topic.clone(),
#[cfg(feature = "payouts")]
payout_analytics_topic: conf.payout_analytics_topic.clone(),
consolidated_events_topic: conf.consolidated_events_topic.clone(),
})
}
pub fn log_event<T: KafkaMessage>(&self, event: &T) -> MQResult<()> {
router_env::logger::debug!("Logging Kafka Event {event:?}");
let topic = match event.event_type() {
EventType::PaymentIntent => &self.intent_analytics_topic,
EventType::PaymentAttempt => &self.attempt_analytics_topic,
EventType::Refund => &self.refund_analytics_topic,
EventType::ApiLogs => &self.api_logs_topic,
EventType::ConnectorApiLogs => &self.connector_logs_topic,
EventType::OutgoingWebhookLogs => &self.outgoing_webhook_logs_topic,
EventType::Dispute => &self.dispute_analytics_topic,
EventType::AuditEvent => &self.audit_events_topic,
#[cfg(feature = "payouts")]
EventType::Payout => &self.payout_analytics_topic,
};
let topic = self.get_topic(event.event_type());
self.producer
.0
.send(
@ -281,11 +321,18 @@ impl KafkaProducer {
format!("Failed to add negative attempt event {negative_event:?}")
})?;
};
self.log_event(&KafkaEvent::new(
&KafkaPaymentAttempt::from_storage(attempt),
tenant_id.clone(),
))
.attach_printable_lazy(|| format!("Failed to add positive attempt event {attempt:?}"))
.attach_printable_lazy(|| format!("Failed to add positive attempt event {attempt:?}"))?;
self.log_event(&KafkaConsolidatedEvent::new(
&KafkaPaymentAttemptEvent::from_storage(attempt),
tenant_id.clone(),
))
.attach_printable_lazy(|| format!("Failed to add consolidated attempt event {attempt:?}"))
}
pub async fn log_payment_attempt_delete(
@ -317,11 +364,18 @@ impl KafkaProducer {
format!("Failed to add negative intent event {negative_event:?}")
})?;
};
self.log_event(&KafkaEvent::new(
&KafkaPaymentIntent::from_storage(intent),
tenant_id.clone(),
))
.attach_printable_lazy(|| format!("Failed to add positive intent event {intent:?}"))
.attach_printable_lazy(|| format!("Failed to add positive intent event {intent:?}"))?;
self.log_event(&KafkaConsolidatedEvent::new(
&KafkaPaymentIntentEvent::from_storage(intent),
tenant_id.clone(),
))
.attach_printable_lazy(|| format!("Failed to add consolidated intent event {intent:?}"))
}
pub async fn log_payment_intent_delete(
@ -353,11 +407,18 @@ impl KafkaProducer {
format!("Failed to add negative refund event {negative_event:?}")
})?;
};
self.log_event(&KafkaEvent::new(
&KafkaRefund::from_storage(refund),
tenant_id.clone(),
))
.attach_printable_lazy(|| format!("Failed to add positive refund event {refund:?}"))
.attach_printable_lazy(|| format!("Failed to add positive refund event {refund:?}"))?;
self.log_event(&KafkaConsolidatedEvent::new(
&KafkaRefundEvent::from_storage(refund),
tenant_id.clone(),
))
.attach_printable_lazy(|| format!("Failed to add consolidated refund event {refund:?}"))
}
pub async fn log_refund_delete(
@ -389,11 +450,18 @@ impl KafkaProducer {
format!("Failed to add negative dispute event {negative_event:?}")
})?;
};
self.log_event(&KafkaEvent::new(
&KafkaDispute::from_storage(dispute),
tenant_id.clone(),
))
.attach_printable_lazy(|| format!("Failed to add positive dispute event {dispute:?}"))
.attach_printable_lazy(|| format!("Failed to add positive dispute event {dispute:?}"))?;
self.log_event(&KafkaConsolidatedEvent::new(
&KafkaDisputeEvent::from_storage(dispute),
tenant_id.clone(),
))
.attach_printable_lazy(|| format!("Failed to add consolidated dispute event {dispute:?}"))
}
#[cfg(feature = "payouts")]
@ -437,6 +505,7 @@ impl KafkaProducer {
EventType::AuditEvent => &self.audit_events_topic,
#[cfg(feature = "payouts")]
EventType::Payout => &self.payout_analytics_topic,
EventType::Consolidated => &self.consolidated_events_topic,
}
}
}

View File

@ -0,0 +1,77 @@
use diesel_models::enums as storage_enums;
use masking::Secret;
use time::OffsetDateTime;
use crate::types::storage::dispute::Dispute;
#[serde_with::skip_serializing_none]
#[derive(serde::Serialize, Debug)]
pub struct KafkaDisputeEvent<'a> {
pub dispute_id: &'a String,
pub dispute_amount: i64,
pub currency: &'a String,
pub dispute_stage: &'a storage_enums::DisputeStage,
pub dispute_status: &'a storage_enums::DisputeStatus,
pub payment_id: &'a String,
pub attempt_id: &'a String,
pub merchant_id: &'a String,
pub connector_status: &'a String,
pub connector_dispute_id: &'a String,
pub connector_reason: Option<&'a String>,
pub connector_reason_code: Option<&'a String>,
#[serde(default, with = "time::serde::timestamp::milliseconds::option")]
pub challenge_required_by: Option<OffsetDateTime>,
#[serde(default, with = "time::serde::timestamp::milliseconds::option")]
pub connector_created_at: Option<OffsetDateTime>,
#[serde(default, with = "time::serde::timestamp::milliseconds::option")]
pub connector_updated_at: Option<OffsetDateTime>,
#[serde(default, with = "time::serde::timestamp::milliseconds")]
pub created_at: OffsetDateTime,
#[serde(default, with = "time::serde::timestamp::milliseconds")]
pub modified_at: OffsetDateTime,
pub connector: &'a String,
pub evidence: &'a Secret<serde_json::Value>,
pub profile_id: Option<&'a String>,
pub merchant_connector_id: Option<&'a String>,
}
impl<'a> KafkaDisputeEvent<'a> {
pub fn from_storage(dispute: &'a Dispute) -> Self {
Self {
dispute_id: &dispute.dispute_id,
dispute_amount: dispute.amount.parse::<i64>().unwrap_or_default(),
currency: &dispute.currency,
dispute_stage: &dispute.dispute_stage,
dispute_status: &dispute.dispute_status,
payment_id: &dispute.payment_id,
attempt_id: &dispute.attempt_id,
merchant_id: &dispute.merchant_id,
connector_status: &dispute.connector_status,
connector_dispute_id: &dispute.connector_dispute_id,
connector_reason: dispute.connector_reason.as_ref(),
connector_reason_code: dispute.connector_reason_code.as_ref(),
challenge_required_by: dispute.challenge_required_by.map(|i| i.assume_utc()),
connector_created_at: dispute.connector_created_at.map(|i| i.assume_utc()),
connector_updated_at: dispute.connector_updated_at.map(|i| i.assume_utc()),
created_at: dispute.created_at.assume_utc(),
modified_at: dispute.modified_at.assume_utc(),
connector: &dispute.connector,
evidence: &dispute.evidence,
profile_id: dispute.profile_id.as_ref(),
merchant_connector_id: dispute.merchant_connector_id.as_ref(),
}
}
}
impl<'a> super::KafkaMessage for KafkaDisputeEvent<'a> {
fn key(&self) -> String {
format!(
"{}_{}_{}",
self.merchant_id, self.payment_id, self.dispute_id
)
}
fn event_type(&self) -> crate::events::EventType {
crate::events::EventType::Dispute
}
}

View File

@ -0,0 +1,119 @@
// use diesel_models::enums::MandateDetails;
use common_utils::types::MinorUnit;
use diesel_models::enums as storage_enums;
use hyperswitch_domain_models::{
mandates::MandateDetails, payments::payment_attempt::PaymentAttempt,
};
use time::OffsetDateTime;
#[serde_with::skip_serializing_none]
#[derive(serde::Serialize, Debug)]
pub struct KafkaPaymentAttemptEvent<'a> {
pub payment_id: &'a String,
pub merchant_id: &'a String,
pub attempt_id: &'a String,
pub status: storage_enums::AttemptStatus,
pub amount: MinorUnit,
pub currency: Option<storage_enums::Currency>,
pub save_to_locker: Option<bool>,
pub connector: Option<&'a String>,
pub error_message: Option<&'a String>,
pub offer_amount: Option<MinorUnit>,
pub surcharge_amount: Option<MinorUnit>,
pub tax_amount: Option<MinorUnit>,
pub payment_method_id: Option<&'a String>,
pub payment_method: Option<storage_enums::PaymentMethod>,
pub connector_transaction_id: Option<&'a String>,
pub capture_method: Option<storage_enums::CaptureMethod>,
#[serde(default, with = "time::serde::timestamp::milliseconds::option")]
pub capture_on: Option<OffsetDateTime>,
pub confirm: bool,
pub authentication_type: Option<storage_enums::AuthenticationType>,
#[serde(with = "time::serde::timestamp::milliseconds")]
pub created_at: OffsetDateTime,
#[serde(with = "time::serde::timestamp::milliseconds")]
pub modified_at: OffsetDateTime,
#[serde(default, with = "time::serde::timestamp::milliseconds::option")]
pub last_synced: Option<OffsetDateTime>,
pub cancellation_reason: Option<&'a String>,
pub amount_to_capture: Option<MinorUnit>,
pub mandate_id: Option<&'a String>,
pub browser_info: Option<String>,
pub error_code: Option<&'a String>,
pub connector_metadata: Option<String>,
// TODO: These types should implement copy ideally
pub payment_experience: Option<&'a storage_enums::PaymentExperience>,
pub payment_method_type: Option<&'a storage_enums::PaymentMethodType>,
pub payment_method_data: Option<String>,
pub error_reason: Option<&'a String>,
pub multiple_capture_count: Option<i16>,
pub amount_capturable: MinorUnit,
pub merchant_connector_id: Option<&'a String>,
pub net_amount: MinorUnit,
pub unified_code: Option<&'a String>,
pub unified_message: Option<&'a String>,
pub mandate_data: Option<&'a MandateDetails>,
pub client_source: Option<&'a String>,
pub client_version: Option<&'a String>,
}
impl<'a> KafkaPaymentAttemptEvent<'a> {
pub fn from_storage(attempt: &'a PaymentAttempt) -> Self {
Self {
payment_id: &attempt.payment_id,
merchant_id: &attempt.merchant_id,
attempt_id: &attempt.attempt_id,
status: attempt.status,
amount: attempt.amount,
currency: attempt.currency,
save_to_locker: attempt.save_to_locker,
connector: attempt.connector.as_ref(),
error_message: attempt.error_message.as_ref(),
offer_amount: attempt.offer_amount,
surcharge_amount: attempt.surcharge_amount,
tax_amount: attempt.tax_amount,
payment_method_id: attempt.payment_method_id.as_ref(),
payment_method: attempt.payment_method,
connector_transaction_id: attempt.connector_transaction_id.as_ref(),
capture_method: attempt.capture_method,
capture_on: attempt.capture_on.map(|i| i.assume_utc()),
confirm: attempt.confirm,
authentication_type: attempt.authentication_type,
created_at: attempt.created_at.assume_utc(),
modified_at: attempt.modified_at.assume_utc(),
last_synced: attempt.last_synced.map(|i| i.assume_utc()),
cancellation_reason: attempt.cancellation_reason.as_ref(),
amount_to_capture: attempt.amount_to_capture,
mandate_id: attempt.mandate_id.as_ref(),
browser_info: attempt.browser_info.as_ref().map(|v| v.to_string()),
error_code: attempt.error_code.as_ref(),
connector_metadata: attempt.connector_metadata.as_ref().map(|v| v.to_string()),
payment_experience: attempt.payment_experience.as_ref(),
payment_method_type: attempt.payment_method_type.as_ref(),
payment_method_data: attempt.payment_method_data.as_ref().map(|v| v.to_string()),
error_reason: attempt.error_reason.as_ref(),
multiple_capture_count: attempt.multiple_capture_count,
amount_capturable: attempt.amount_capturable,
merchant_connector_id: attempt.merchant_connector_id.as_ref(),
net_amount: attempt.net_amount,
unified_code: attempt.unified_code.as_ref(),
unified_message: attempt.unified_message.as_ref(),
mandate_data: attempt.mandate_data.as_ref(),
client_source: attempt.client_source.as_ref(),
client_version: attempt.client_version.as_ref(),
}
}
}
impl<'a> super::KafkaMessage for KafkaPaymentAttemptEvent<'a> {
fn key(&self) -> String {
format!(
"{}_{}_{}",
self.merchant_id, self.payment_id, self.attempt_id
)
}
fn event_type(&self) -> crate::events::EventType {
crate::events::EventType::PaymentAttempt
}
}

View File

@ -0,0 +1,75 @@
use common_utils::{id_type, types::MinorUnit};
use diesel_models::enums as storage_enums;
use hyperswitch_domain_models::payments::PaymentIntent;
use time::OffsetDateTime;
#[serde_with::skip_serializing_none]
#[derive(serde::Serialize, Debug)]
pub struct KafkaPaymentIntentEvent<'a> {
pub payment_id: &'a String,
pub merchant_id: &'a String,
pub status: storage_enums::IntentStatus,
pub amount: MinorUnit,
pub currency: Option<storage_enums::Currency>,
pub amount_captured: Option<MinorUnit>,
pub customer_id: Option<&'a id_type::CustomerId>,
pub description: Option<&'a String>,
pub return_url: Option<&'a String>,
pub connector_id: Option<&'a String>,
pub statement_descriptor_name: Option<&'a String>,
pub statement_descriptor_suffix: Option<&'a String>,
#[serde(with = "time::serde::timestamp::milliseconds")]
pub created_at: OffsetDateTime,
#[serde(with = "time::serde::timestamp::milliseconds")]
pub modified_at: OffsetDateTime,
#[serde(default, with = "time::serde::timestamp::milliseconds::option")]
pub last_synced: Option<OffsetDateTime>,
pub setup_future_usage: Option<storage_enums::FutureUsage>,
pub off_session: Option<bool>,
pub client_secret: Option<&'a String>,
pub active_attempt_id: String,
pub business_country: Option<storage_enums::CountryAlpha2>,
pub business_label: Option<&'a String>,
pub attempt_count: i16,
pub payment_confirm_source: Option<storage_enums::PaymentSource>,
}
impl<'a> KafkaPaymentIntentEvent<'a> {
pub fn from_storage(intent: &'a PaymentIntent) -> Self {
Self {
payment_id: &intent.payment_id,
merchant_id: &intent.merchant_id,
status: intent.status,
amount: intent.amount,
currency: intent.currency,
amount_captured: intent.amount_captured,
customer_id: intent.customer_id.as_ref(),
description: intent.description.as_ref(),
return_url: intent.return_url.as_ref(),
connector_id: intent.connector_id.as_ref(),
statement_descriptor_name: intent.statement_descriptor_name.as_ref(),
statement_descriptor_suffix: intent.statement_descriptor_suffix.as_ref(),
created_at: intent.created_at.assume_utc(),
modified_at: intent.modified_at.assume_utc(),
last_synced: intent.last_synced.map(|i| i.assume_utc()),
setup_future_usage: intent.setup_future_usage,
off_session: intent.off_session,
client_secret: intent.client_secret.as_ref(),
active_attempt_id: intent.active_attempt.get_id(),
business_country: intent.business_country,
business_label: intent.business_label.as_ref(),
attempt_count: intent.attempt_count,
payment_confirm_source: intent.payment_confirm_source,
}
}
}
impl<'a> super::KafkaMessage for KafkaPaymentIntentEvent<'a> {
fn key(&self) -> String {
format!("{}_{}", self.merchant_id, self.payment_id)
}
fn event_type(&self) -> crate::events::EventType {
crate::events::EventType::PaymentIntent
}
}

View File

@ -0,0 +1,74 @@
use common_utils::types::MinorUnit;
use diesel_models::{enums as storage_enums, refund::Refund};
use time::OffsetDateTime;
#[serde_with::skip_serializing_none]
#[derive(serde::Serialize, Debug)]
pub struct KafkaRefundEvent<'a> {
pub internal_reference_id: &'a String,
pub refund_id: &'a String, //merchant_reference id
pub payment_id: &'a String,
pub merchant_id: &'a String,
pub connector_transaction_id: &'a String,
pub connector: &'a String,
pub connector_refund_id: Option<&'a String>,
pub external_reference_id: Option<&'a String>,
pub refund_type: &'a storage_enums::RefundType,
pub total_amount: &'a MinorUnit,
pub currency: &'a storage_enums::Currency,
pub refund_amount: &'a MinorUnit,
pub refund_status: &'a storage_enums::RefundStatus,
pub sent_to_gateway: &'a bool,
pub refund_error_message: Option<&'a String>,
pub refund_arn: Option<&'a String>,
#[serde(default, with = "time::serde::timestamp::milliseconds")]
pub created_at: OffsetDateTime,
#[serde(default, with = "time::serde::timestamp::milliseconds")]
pub modified_at: OffsetDateTime,
pub description: Option<&'a String>,
pub attempt_id: &'a String,
pub refund_reason: Option<&'a String>,
pub refund_error_code: Option<&'a String>,
}
impl<'a> KafkaRefundEvent<'a> {
pub fn from_storage(refund: &'a Refund) -> Self {
Self {
internal_reference_id: &refund.internal_reference_id,
refund_id: &refund.refund_id,
payment_id: &refund.payment_id,
merchant_id: &refund.merchant_id,
connector_transaction_id: &refund.connector_transaction_id,
connector: &refund.connector,
connector_refund_id: refund.connector_refund_id.as_ref(),
external_reference_id: refund.external_reference_id.as_ref(),
refund_type: &refund.refund_type,
total_amount: &refund.total_amount,
currency: &refund.currency,
refund_amount: &refund.refund_amount,
refund_status: &refund.refund_status,
sent_to_gateway: &refund.sent_to_gateway,
refund_error_message: refund.refund_error_message.as_ref(),
refund_arn: refund.refund_arn.as_ref(),
created_at: refund.created_at.assume_utc(),
modified_at: refund.updated_at.assume_utc(),
description: refund.description.as_ref(),
attempt_id: &refund.attempt_id,
refund_reason: refund.refund_reason.as_ref(),
refund_error_code: refund.refund_error_code.as_ref(),
}
}
}
impl<'a> super::KafkaMessage for KafkaRefundEvent<'a> {
fn key(&self) -> String {
format!(
"{}_{}_{}_{}",
self.merchant_id, self.payment_id, self.attempt_id, self.refund_id
)
}
fn event_type(&self) -> crate::events::EventType {
crate::events::EventType::Refund
}
}