chore(router): events enhancement for kafka (#8780)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Shivansh Mathur
2025-07-30 17:17:53 +05:30
committed by GitHub
parent 74995446d0
commit e48e55a048
4 changed files with 71 additions and 1 deletions

View File

@ -547,5 +547,6 @@ pub(crate) async fn fetch_raw_secrets(
clone_connector_allowlist: conf.clone_connector_allowlist,
merchant_id_auth: conf.merchant_id_auth,
infra_values: conf.infra_values,
enhancement: conf.enhancement,
}
}

View File

@ -162,6 +162,8 @@ pub struct Settings<S: SecretState> {
pub merchant_id_auth: MerchantIdAuthSettings,
#[serde(default)]
pub infra_values: Option<HashMap<String, String>>,
#[serde(default)]
pub enhancement: Option<HashMap<String, String>>,
}
#[derive(Debug, Deserialize, Clone, Default)]

View File

@ -132,6 +132,7 @@ pub struct SessionState {
pub locale: String,
pub crm_client: Arc<dyn CrmInterface>,
pub infra_components: Option<serde_json::Value>,
pub enhancement: Option<HashMap<String, String>>,
}
impl scheduler::SchedulerSessionState for SessionState {
fn get_db(&self) -> Box<dyn SchedulerInterface> {
@ -245,6 +246,7 @@ pub struct AppState {
pub theme_storage_client: Arc<dyn FileStorageInterface>,
pub crm_client: Arc<dyn CrmInterface>,
pub infra_components: Option<serde_json::Value>,
pub enhancement: Option<HashMap<String, String>>,
}
impl scheduler::SchedulerAppState for AppState {
fn get_tenants(&self) -> Vec<id_type::TenantId> {
@ -410,6 +412,7 @@ impl AppState {
let grpc_client = conf.grpc_client.get_grpc_client_interface().await;
let infra_component_values = Self::process_env_mappings(conf.infra_values.clone());
let enhancement = conf.enhancement.clone();
Self {
flow_name: String::from("default"),
stores,
@ -431,6 +434,7 @@ impl AppState {
theme_storage_client,
crm_client,
infra_components: infra_component_values,
enhancement,
}
})
.await
@ -526,6 +530,7 @@ impl AppState {
locale: locale.unwrap_or(common_utils::consts::DEFAULT_LOCALE.to_string()),
crm_client: self.crm_client.clone(),
infra_components: self.infra_components.clone(),
enhancement: self.enhancement.clone(),
})
}

View File

@ -701,7 +701,11 @@ where
}
};
let infra = state.infra_components.clone();
let infra = extract_mapped_fields(
&serialized_request,
state.enhancement.as_ref(),
state.infra_components.as_ref(),
);
let api_event = ApiEvent::new(
tenant_id,
@ -2089,6 +2093,64 @@ pub fn get_payment_link_status(
}
}
pub fn extract_mapped_fields(
value: &serde_json::Value,
mapping: Option<&HashMap<String, String>>,
existing_enhancement: Option<&serde_json::Value>,
) -> Option<serde_json::Value> {
let mapping = mapping?;
if mapping.is_empty() {
return existing_enhancement.cloned();
}
let mut enhancement = match existing_enhancement {
Some(existing) if existing.is_object() => existing.clone(),
_ => serde_json::json!({}),
};
for (dot_path, output_key) in mapping {
if let Some(extracted_value) = extract_field_by_dot_path(value, dot_path) {
if let Some(obj) = enhancement.as_object_mut() {
obj.insert(output_key.clone(), extracted_value);
}
}
}
if enhancement.as_object().is_some_and(|obj| !obj.is_empty()) {
Some(enhancement)
} else {
None
}
}
pub fn extract_field_by_dot_path(
value: &serde_json::Value,
path: &str,
) -> Option<serde_json::Value> {
let parts: Vec<&str> = path.split('.').collect();
let mut current = value;
for part in parts {
match current {
serde_json::Value::Object(obj) => {
current = obj.get(part)?;
}
serde_json::Value::Array(arr) => {
// Try to parse part as array index
if let Ok(index) = part.parse::<usize>() {
current = arr.get(index)?;
} else {
return None;
}
}
_ => return None,
}
}
Some(current.clone())
}
#[cfg(test)]
mod tests {
#[test]