From 24214bcfcd0a34acd39dba88f6c015ac6b1edbc4 Mon Sep 17 00:00:00 2001 From: Subhajit Ghosh <99127578+subhajit20@users.noreply.github.com> Date: Wed, 15 May 2024 16:24:39 +0530 Subject: [PATCH] refactor(db): Add TenantID field to KafkaEvent struct (#4598) Co-authored-by: Sampras Lopes --- crates/router/src/db/kafka_store.rs | 55 +++++++++---- crates/router/src/services/kafka.rs | 116 +++++++++++++++++++--------- 2 files changed, 118 insertions(+), 53 deletions(-) diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index a01395ed0c..bfe0592fd3 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -76,7 +76,7 @@ use crate::{ }, }; -#[derive(Clone, Serialize)] +#[derive(Debug, Clone, Serialize)] pub struct TenantID(pub String); #[derive(Clone)] @@ -413,7 +413,11 @@ impl DisputeInterface for KafkaStore { ) -> CustomResult { let dispute = self.diesel_store.insert_dispute(dispute_new).await?; - if let Err(er) = self.kafka_producer.log_dispute(&dispute, None).await { + if let Err(er) = self + .kafka_producer + .log_dispute(&dispute, None, self.tenant_id.clone()) + .await + { logger::error!(message="Failed to add analytics entry for Dispute {dispute:?}", error_message=?er); }; @@ -466,7 +470,7 @@ impl DisputeInterface for KafkaStore { .await?; if let Err(er) = self .kafka_producer - .log_dispute(&dispute_new, Some(this)) + .log_dispute(&dispute_new, Some(this), self.tenant_id.clone()) .await { logger::error!(message="Failed to add analytics entry for Dispute {dispute_new:?}", error_message=?er); @@ -1109,7 +1113,7 @@ impl PaymentAttemptInterface for KafkaStore { if let Err(er) = self .kafka_producer - .log_payment_attempt(&attempt, None) + .log_payment_attempt(&attempt, None, self.tenant_id.clone()) .await { logger::error!(message="Failed to log analytics event for payment attempt {attempt:?}", error_message=?er) @@ -1131,7 +1135,7 @@ impl PaymentAttemptInterface for KafkaStore { if let Err(er) = self .kafka_producer - .log_payment_attempt(&attempt, Some(this)) + .log_payment_attempt(&attempt, Some(this), self.tenant_id.clone()) .await { logger::error!(message="Failed to log analytics event for payment attempt {attempt:?}", error_message=?er) @@ -1311,7 +1315,7 @@ impl PaymentIntentInterface for KafkaStore { if let Err(er) = self .kafka_producer - .log_payment_intent(&intent, Some(this)) + .log_payment_intent(&intent, Some(this), self.tenant_id.clone()) .await { logger::error!(message="Failed to add analytics entry for Payment Intent {intent:?}", error_message=?er); @@ -1331,7 +1335,11 @@ impl PaymentIntentInterface for KafkaStore { .insert_payment_intent(new, storage_scheme) .await?; - if let Err(er) = self.kafka_producer.log_payment_intent(&intent, None).await { + if let Err(er) = self + .kafka_producer + .log_payment_intent(&intent, None, self.tenant_id.clone()) + .await + { logger::error!(message="Failed to add analytics entry for Payment Intent {intent:?}", error_message=?er); }; @@ -1558,6 +1566,7 @@ impl PayoutAttemptInterface for KafkaStore { .log_payout( &KafkaPayout::from_storage(payouts, &updated_payout_attempt), Some(KafkaPayout::from_storage(payouts, this)), + self.tenant_id.clone(), ) .await { @@ -1582,6 +1591,7 @@ impl PayoutAttemptInterface for KafkaStore { .log_payout( &KafkaPayout::from_storage(payouts, &payout_attempt_new), None, + self.tenant_id.clone(), ) .await { @@ -1639,6 +1649,7 @@ impl PayoutsInterface for KafkaStore { .log_payout( &KafkaPayout::from_storage(&payout, payout_attempt), Some(KafkaPayout::from_storage(this, payout_attempt)), + self.tenant_id.clone(), ) .await { @@ -1903,7 +1914,11 @@ impl RefundInterface for KafkaStore { .update_refund(this.clone(), refund, storage_scheme) .await?; - if let Err(er) = self.kafka_producer.log_refund(&refund, Some(this)).await { + if let Err(er) = self + .kafka_producer + .log_refund(&refund, Some(this), self.tenant_id.clone()) + .await + { logger::error!(message="Failed to insert analytics event for Refund Update {refund?}", error_message=?er); } Ok(refund) @@ -1931,7 +1946,11 @@ impl RefundInterface for KafkaStore { ) -> CustomResult { let refund = self.diesel_store.insert_refund(new, storage_scheme).await?; - if let Err(er) = self.kafka_producer.log_refund(&refund, None).await { + if let Err(er) = self + .kafka_producer + .log_refund(&refund, None, self.tenant_id.clone()) + .await + { logger::error!(message="Failed to insert analytics event for Refund Create {refund?}", error_message=?er); } Ok(refund) @@ -2504,7 +2523,7 @@ impl BatchSampleDataInterface for KafkaStore { for payment_intent in payment_intents_list.iter() { let _ = self .kafka_producer - .log_payment_intent(payment_intent, None) + .log_payment_intent(payment_intent, None, self.tenant_id.clone()) .await; } Ok(payment_intents_list) @@ -2525,7 +2544,7 @@ impl BatchSampleDataInterface for KafkaStore { for payment_attempt in payment_attempts_list.iter() { let _ = self .kafka_producer - .log_payment_attempt(payment_attempt, None) + .log_payment_attempt(payment_attempt, None, self.tenant_id.clone()) .await; } Ok(payment_attempts_list) @@ -2542,7 +2561,10 @@ impl BatchSampleDataInterface for KafkaStore { .await?; for refund in refunds_list.iter() { - let _ = self.kafka_producer.log_refund(refund, None).await; + let _ = self + .kafka_producer + .log_refund(refund, None, self.tenant_id.clone()) + .await; } Ok(refunds_list) } @@ -2562,7 +2584,7 @@ impl BatchSampleDataInterface for KafkaStore { for payment_intent in payment_intents_list.iter() { let _ = self .kafka_producer - .log_payment_intent_delete(payment_intent) + .log_payment_intent_delete(payment_intent, self.tenant_id.clone()) .await; } Ok(payment_intents_list) @@ -2583,7 +2605,7 @@ impl BatchSampleDataInterface for KafkaStore { for payment_attempt in payment_attempts_list.iter() { let _ = self .kafka_producer - .log_payment_attempt_delete(payment_attempt) + .log_payment_attempt_delete(payment_attempt, self.tenant_id.clone()) .await; } @@ -2601,7 +2623,10 @@ impl BatchSampleDataInterface for KafkaStore { .await?; for refund in refunds_list.iter() { - let _ = self.kafka_producer.log_refund_delete(refund).await; + let _ = self + .kafka_producer + .log_refund_delete(refund, self.tenant_id.clone()) + .await; } Ok(refunds_list) diff --git a/crates/router/src/services/kafka.rs b/crates/router/src/services/kafka.rs index ba9806b29d..58657c59fd 100644 --- a/crates/router/src/services/kafka.rs +++ b/crates/router/src/services/kafka.rs @@ -30,6 +30,7 @@ use crate::types::storage::Dispute; // Using message queue result here to avoid confusion with Kafka result provided by library pub type MQResult = CustomResult; +use crate::db::kafka_store::TenantID; pub trait KafkaMessage where @@ -54,19 +55,22 @@ struct KafkaEvent<'a, T: KafkaMessage> { #[serde(flatten)] event: &'a T, sign_flag: i32, + tenant_id: TenantID, } impl<'a, T: KafkaMessage> KafkaEvent<'a, T> { - fn new(event: &'a T) -> Self { + fn new(event: &'a T, tenant_id: TenantID) -> Self { Self { event, sign_flag: 1, + tenant_id, } } - fn old(event: &'a T) -> Self { + fn old(event: &'a T, tenant_id: TenantID) -> Self { Self { event, sign_flag: -1, + tenant_id, } } } @@ -266,28 +270,33 @@ impl KafkaProducer { &self, attempt: &PaymentAttempt, old_attempt: Option, + tenant_id: TenantID, ) -> MQResult<()> { if let Some(negative_event) = old_attempt { - self.log_event(&KafkaEvent::old(&KafkaPaymentAttempt::from_storage( - &negative_event, - ))) + self.log_event(&KafkaEvent::old( + &KafkaPaymentAttempt::from_storage(&negative_event), + tenant_id.clone(), + )) .attach_printable_lazy(|| { format!("Failed to add negative attempt event {negative_event:?}") })?; }; - self.log_event(&KafkaEvent::new(&KafkaPaymentAttempt::from_storage( - attempt, - ))) + self.log_event(&KafkaEvent::new( + &KafkaPaymentAttempt::from_storage(attempt), + tenant_id.clone(), + )) .attach_printable_lazy(|| format!("Failed to add positive attempt event {attempt:?}")) } pub async fn log_payment_attempt_delete( &self, delete_old_attempt: &PaymentAttempt, + tenant_id: TenantID, ) -> MQResult<()> { - self.log_event(&KafkaEvent::old(&KafkaPaymentAttempt::from_storage( - delete_old_attempt, - ))) + self.log_event(&KafkaEvent::old( + &KafkaPaymentAttempt::from_storage(delete_old_attempt), + tenant_id.clone(), + )) .attach_printable_lazy(|| { format!("Failed to add negative attempt event {delete_old_attempt:?}") }) @@ -297,48 +306,69 @@ impl KafkaProducer { &self, intent: &PaymentIntent, old_intent: Option, + tenant_id: TenantID, ) -> MQResult<()> { if let Some(negative_event) = old_intent { - self.log_event(&KafkaEvent::old(&KafkaPaymentIntent::from_storage( - &negative_event, - ))) + self.log_event(&KafkaEvent::old( + &KafkaPaymentIntent::from_storage(&negative_event), + tenant_id.clone(), + )) .attach_printable_lazy(|| { format!("Failed to add negative intent event {negative_event:?}") })?; }; - self.log_event(&KafkaEvent::new(&KafkaPaymentIntent::from_storage(intent))) - .attach_printable_lazy(|| format!("Failed to add positive intent event {intent:?}")) + self.log_event(&KafkaEvent::new( + &KafkaPaymentIntent::from_storage(intent), + tenant_id.clone(), + )) + .attach_printable_lazy(|| format!("Failed to add positive intent event {intent:?}")) } pub async fn log_payment_intent_delete( &self, delete_old_intent: &PaymentIntent, + tenant_id: TenantID, ) -> MQResult<()> { - self.log_event(&KafkaEvent::old(&KafkaPaymentIntent::from_storage( - delete_old_intent, - ))) + self.log_event(&KafkaEvent::old( + &KafkaPaymentIntent::from_storage(delete_old_intent), + tenant_id.clone(), + )) .attach_printable_lazy(|| { format!("Failed to add negative intent event {delete_old_intent:?}") }) } - pub async fn log_refund(&self, refund: &Refund, old_refund: Option) -> MQResult<()> { + pub async fn log_refund( + &self, + refund: &Refund, + old_refund: Option, + tenant_id: TenantID, + ) -> MQResult<()> { if let Some(negative_event) = old_refund { - self.log_event(&KafkaEvent::old(&KafkaRefund::from_storage( - &negative_event, - ))) + self.log_event(&KafkaEvent::old( + &KafkaRefund::from_storage(&negative_event), + tenant_id.clone(), + )) .attach_printable_lazy(|| { format!("Failed to add negative refund event {negative_event:?}") })?; }; - self.log_event(&KafkaEvent::new(&KafkaRefund::from_storage(refund))) - .attach_printable_lazy(|| format!("Failed to add positive refund event {refund:?}")) + self.log_event(&KafkaEvent::new( + &KafkaRefund::from_storage(refund), + tenant_id.clone(), + )) + .attach_printable_lazy(|| format!("Failed to add positive refund event {refund:?}")) } - pub async fn log_refund_delete(&self, delete_old_refund: &Refund) -> MQResult<()> { - self.log_event(&KafkaEvent::old(&KafkaRefund::from_storage( - delete_old_refund, - ))) + pub async fn log_refund_delete( + &self, + delete_old_refund: &Refund, + tenant_id: TenantID, + ) -> MQResult<()> { + self.log_event(&KafkaEvent::old( + &KafkaRefund::from_storage(delete_old_refund), + tenant_id.clone(), + )) .attach_printable_lazy(|| { format!("Failed to add negative refund event {delete_old_refund:?}") }) @@ -348,17 +378,22 @@ impl KafkaProducer { &self, dispute: &Dispute, old_dispute: Option, + tenant_id: TenantID, ) -> MQResult<()> { if let Some(negative_event) = old_dispute { - self.log_event(&KafkaEvent::old(&KafkaDispute::from_storage( - &negative_event, - ))) + self.log_event(&KafkaEvent::old( + &KafkaDispute::from_storage(&negative_event), + tenant_id.clone(), + )) .attach_printable_lazy(|| { format!("Failed to add negative dispute event {negative_event:?}") })?; }; - self.log_event(&KafkaEvent::new(&KafkaDispute::from_storage(dispute))) - .attach_printable_lazy(|| format!("Failed to add positive dispute event {dispute:?}")) + self.log_event(&KafkaEvent::new( + &KafkaDispute::from_storage(dispute), + tenant_id.clone(), + )) + .attach_printable_lazy(|| format!("Failed to add positive dispute event {dispute:?}")) } #[cfg(feature = "payouts")] @@ -366,20 +401,25 @@ impl KafkaProducer { &self, payout: &KafkaPayout<'_>, old_payout: Option>, + tenant_id: TenantID, ) -> MQResult<()> { if let Some(negative_event) = old_payout { - self.log_event(&KafkaEvent::old(&negative_event)) + self.log_event(&KafkaEvent::old(&negative_event, tenant_id.clone())) .attach_printable_lazy(|| { format!("Failed to add negative payout event {negative_event:?}") })?; }; - self.log_event(&KafkaEvent::new(payout)) + self.log_event(&KafkaEvent::new(payout, tenant_id.clone())) .attach_printable_lazy(|| format!("Failed to add positive payout event {payout:?}")) } #[cfg(feature = "payouts")] - pub async fn log_payout_delete(&self, delete_old_payout: &KafkaPayout<'_>) -> MQResult<()> { - self.log_event(&KafkaEvent::old(delete_old_payout)) + pub async fn log_payout_delete( + &self, + delete_old_payout: &KafkaPayout<'_>, + tenant_id: TenantID, + ) -> MQResult<()> { + self.log_event(&KafkaEvent::old(delete_old_payout, tenant_id.clone())) .attach_printable_lazy(|| { format!("Failed to add negative payout event {delete_old_payout:?}") })