refactor: add infra-values in intent kafka events (#8264)

This commit is contained in:
Sai Harsha Vardhan
2025-06-05 18:45:06 +05:30
committed by GitHub
parent d15ee49bad
commit 4a7c08fbc5
6 changed files with 41 additions and 12 deletions

View File

@ -49,6 +49,7 @@ pub struct KeyManagerState {
pub ca: Secret<String>, pub ca: Secret<String>,
#[cfg(feature = "keymanager_mtls")] #[cfg(feature = "keymanager_mtls")]
pub cert: Secret<String>, pub cert: Secret<String>,
pub infra_values: Option<serde_json::Value>,
} }
pub trait GetKeymanagerTenant { pub trait GetKeymanagerTenant {

View File

@ -1866,7 +1866,12 @@ impl PaymentIntentInterface for KafkaStore {
if let Err(er) = self if let Err(er) = self
.kafka_producer .kafka_producer
.log_payment_intent(&intent, Some(this), self.tenant_id.clone()) .log_payment_intent(
&intent,
Some(this),
self.tenant_id.clone(),
state.infra_values.clone(),
)
.await .await
{ {
logger::error!(message="Failed to add analytics entry for Payment Intent {intent:?}", error_message=?er); logger::error!(message="Failed to add analytics entry for Payment Intent {intent:?}", error_message=?er);
@ -1890,7 +1895,12 @@ impl PaymentIntentInterface for KafkaStore {
if let Err(er) = self if let Err(er) = self
.kafka_producer .kafka_producer
.log_payment_intent(&intent, None, self.tenant_id.clone()) .log_payment_intent(
&intent,
None,
self.tenant_id.clone(),
state.infra_values.clone(),
)
.await .await
{ {
logger::error!(message="Failed to add analytics entry for Payment Intent {intent:?}", error_message=?er); logger::error!(message="Failed to add analytics entry for Payment Intent {intent:?}", error_message=?er);
@ -3596,7 +3606,12 @@ impl BatchSampleDataInterface for KafkaStore {
for payment_intent in payment_intents_list.iter() { for payment_intent in payment_intents_list.iter() {
let _ = self let _ = self
.kafka_producer .kafka_producer
.log_payment_intent(payment_intent, None, self.tenant_id.clone()) .log_payment_intent(
payment_intent,
None,
self.tenant_id.clone(),
state.infra_values.clone(),
)
.await; .await;
} }
Ok(payment_intents_list) Ok(payment_intents_list)
@ -3680,7 +3695,11 @@ impl BatchSampleDataInterface for KafkaStore {
for payment_intent in payment_intents_list.iter() { for payment_intent in payment_intents_list.iter() {
let _ = self let _ = self
.kafka_producer .kafka_producer
.log_payment_intent_delete(payment_intent, self.tenant_id.clone()) .log_payment_intent_delete(
payment_intent,
self.tenant_id.clone(),
state.infra_values.clone(),
)
.await; .await;
} }
Ok(payment_intents_list) Ok(payment_intents_list)

View File

@ -473,10 +473,11 @@ impl KafkaProducer {
intent: &PaymentIntent, intent: &PaymentIntent,
old_intent: Option<PaymentIntent>, old_intent: Option<PaymentIntent>,
tenant_id: TenantID, tenant_id: TenantID,
infra_values: Option<Value>,
) -> MQResult<()> { ) -> MQResult<()> {
if let Some(negative_event) = old_intent { if let Some(negative_event) = old_intent {
self.log_event(&KafkaEvent::old( self.log_event(&KafkaEvent::old(
&KafkaPaymentIntent::from_storage(&negative_event), &KafkaPaymentIntent::from_storage(&negative_event, infra_values.clone()),
tenant_id.clone(), tenant_id.clone(),
self.ckh_database_name.clone(), self.ckh_database_name.clone(),
)) ))
@ -486,14 +487,14 @@ impl KafkaProducer {
}; };
self.log_event(&KafkaEvent::new( self.log_event(&KafkaEvent::new(
&KafkaPaymentIntent::from_storage(intent), &KafkaPaymentIntent::from_storage(intent, infra_values.clone()),
tenant_id.clone(), tenant_id.clone(),
self.ckh_database_name.clone(), self.ckh_database_name.clone(),
)) ))
.attach_printable_lazy(|| format!("Failed to add positive intent event {intent:?}"))?; .attach_printable_lazy(|| format!("Failed to add positive intent event {intent:?}"))?;
self.log_event(&KafkaConsolidatedEvent::new( self.log_event(&KafkaConsolidatedEvent::new(
&KafkaPaymentIntentEvent::from_storage(intent), &KafkaPaymentIntentEvent::from_storage(intent, infra_values.clone()),
tenant_id.clone(), tenant_id.clone(),
)) ))
.attach_printable_lazy(|| format!("Failed to add consolidated intent event {intent:?}")) .attach_printable_lazy(|| format!("Failed to add consolidated intent event {intent:?}"))
@ -503,9 +504,10 @@ impl KafkaProducer {
&self, &self,
delete_old_intent: &PaymentIntent, delete_old_intent: &PaymentIntent,
tenant_id: TenantID, tenant_id: TenantID,
infra_values: Option<Value>,
) -> MQResult<()> { ) -> MQResult<()> {
self.log_event(&KafkaEvent::old( self.log_event(&KafkaEvent::old(
&KafkaPaymentIntent::from_storage(delete_old_intent), &KafkaPaymentIntent::from_storage(delete_old_intent, infra_values),
tenant_id.clone(), tenant_id.clone(),
self.ckh_database_name.clone(), self.ckh_database_name.clone(),
)) ))

View File

@ -42,6 +42,8 @@ pub struct KafkaPaymentIntent<'a> {
pub feature_metadata: Option<&'a Value>, pub feature_metadata: Option<&'a Value>,
pub merchant_order_reference_id: Option<&'a String>, pub merchant_order_reference_id: Option<&'a String>,
pub organization_id: &'a id_type::OrganizationId, pub organization_id: &'a id_type::OrganizationId,
#[serde(flatten)]
infra_values: Option<Value>,
} }
#[cfg(feature = "v2")] #[cfg(feature = "v2")]
@ -81,7 +83,7 @@ pub struct KafkaPaymentIntent<'a> {
#[cfg(feature = "v1")] #[cfg(feature = "v1")]
impl<'a> KafkaPaymentIntent<'a> { impl<'a> KafkaPaymentIntent<'a> {
pub fn from_storage(intent: &'a PaymentIntent) -> Self { pub fn from_storage(intent: &'a PaymentIntent, infra_values: Option<Value>) -> Self {
Self { Self {
payment_id: &intent.payment_id, payment_id: &intent.payment_id,
merchant_id: &intent.merchant_id, merchant_id: &intent.merchant_id,
@ -121,13 +123,14 @@ impl<'a> KafkaPaymentIntent<'a> {
feature_metadata: intent.feature_metadata.as_ref(), feature_metadata: intent.feature_metadata.as_ref(),
merchant_order_reference_id: intent.merchant_order_reference_id.as_ref(), merchant_order_reference_id: intent.merchant_order_reference_id.as_ref(),
organization_id: &intent.organization_id, organization_id: &intent.organization_id,
infra_values,
} }
} }
} }
#[cfg(feature = "v2")] #[cfg(feature = "v2")]
impl<'a> KafkaPaymentIntent<'a> { impl<'a> KafkaPaymentIntent<'a> {
pub fn from_storage(intent: &'a PaymentIntent) -> Self { pub fn from_storage(intent: &'a PaymentIntent, infra_values: Option<Value>) -> Self {
// Self { // Self {
// id: &intent.id, // id: &intent.id,
// merchant_id: &intent.merchant_id, // merchant_id: &intent.merchant_id,

View File

@ -43,6 +43,8 @@ pub struct KafkaPaymentIntentEvent<'a> {
pub feature_metadata: Option<&'a Value>, pub feature_metadata: Option<&'a Value>,
pub merchant_order_reference_id: Option<&'a String>, pub merchant_order_reference_id: Option<&'a String>,
pub organization_id: &'a id_type::OrganizationId, pub organization_id: &'a id_type::OrganizationId,
#[serde(flatten)]
pub infra_values: Option<Value>,
} }
#[cfg(feature = "v2")] #[cfg(feature = "v2")]
@ -95,7 +97,7 @@ impl KafkaPaymentIntentEvent<'_> {
#[cfg(feature = "v1")] #[cfg(feature = "v1")]
impl<'a> KafkaPaymentIntentEvent<'a> { impl<'a> KafkaPaymentIntentEvent<'a> {
pub fn from_storage(intent: &'a PaymentIntent) -> Self { pub fn from_storage(intent: &'a PaymentIntent, infra_values: Option<Value>) -> Self {
Self { Self {
payment_id: &intent.payment_id, payment_id: &intent.payment_id,
merchant_id: &intent.merchant_id, merchant_id: &intent.merchant_id,
@ -135,13 +137,14 @@ impl<'a> KafkaPaymentIntentEvent<'a> {
feature_metadata: intent.feature_metadata.as_ref(), feature_metadata: intent.feature_metadata.as_ref(),
merchant_order_reference_id: intent.merchant_order_reference_id.as_ref(), merchant_order_reference_id: intent.merchant_order_reference_id.as_ref(),
organization_id: &intent.organization_id, organization_id: &intent.organization_id,
infra_values: infra_values.clone(),
} }
} }
} }
#[cfg(feature = "v2")] #[cfg(feature = "v2")]
impl<'a> KafkaPaymentIntentEvent<'a> { impl<'a> KafkaPaymentIntentEvent<'a> {
pub fn from_storage(intent: &'a PaymentIntent) -> Self { pub fn from_storage(intent: &'a PaymentIntent, infra_values: Option<Value>) -> Self {
// Self { // Self {
// id: &intent.id, // id: &intent.id,
// merchant_id: &intent.merchant_id, // merchant_id: &intent.merchant_id,

View File

@ -21,6 +21,7 @@ impl From<&app::SessionState> for KeyManagerState {
cert: conf.cert.clone(), cert: conf.cert.clone(),
#[cfg(feature = "keymanager_mtls")] #[cfg(feature = "keymanager_mtls")]
ca: conf.ca.clone(), ca: conf.ca.clone(),
infra_values: app::AppState::process_env_mappings(state.conf.infra_values.clone()),
} }
} }
} }