diff --git a/crates/external_services/src/grpc_client/unified_connector_service.rs b/crates/external_services/src/grpc_client/unified_connector_service.rs index 8db71c930f..4f6313ea31 100644 --- a/crates/external_services/src/grpc_client/unified_connector_service.rs +++ b/crates/external_services/src/grpc_client/unified_connector_service.rs @@ -9,7 +9,8 @@ use tonic::{ }; use unified_connector_service_client::payments::{ self as payments_grpc, payment_service_client::PaymentServiceClient, - PaymentServiceAuthorizeResponse, + PaymentServiceAuthorizeResponse, PaymentServiceTransformRequest, + PaymentServiceTransformResponse, }; use crate::{ @@ -96,6 +97,10 @@ pub enum UnifiedConnectorServiceError { /// Failed to perform Payment Repeat Payment from gRPC Server #[error("Failed to perform Repeat Payment from gRPC Server")] PaymentRepeatEverythingFailure, + + /// Failed to transform incoming webhook from gRPC Server + #[error("Failed to transform incoming webhook from gRPC Server")] + WebhookTransformFailure, } /// Result type for Dynamic Routing @@ -194,6 +199,7 @@ impl UnifiedConnectorServiceClient { ) -> UnifiedConnectorServiceResult> { let mut request = tonic::Request::new(payment_authorize_request); + let connector_name = connector_auth_metadata.connector_name.clone(); let metadata = build_unified_connector_service_grpc_headers(connector_auth_metadata, grpc_headers)?; *request.metadata_mut() = metadata; @@ -203,7 +209,14 @@ impl UnifiedConnectorServiceClient { .authorize(request) .await .change_context(UnifiedConnectorServiceError::PaymentAuthorizeFailure) - .inspect_err(|error| logger::error!(?error)) + .inspect_err(|error| { + logger::error!( + grpc_error=?error, + method="payment_authorize", + connector_name=?connector_name, + "UCS payment authorize gRPC call failed" + ) + }) } /// Performs Payment Sync/Get @@ -216,6 +229,7 @@ impl UnifiedConnectorServiceClient { { let mut request = tonic::Request::new(payment_get_request); + let connector_name = connector_auth_metadata.connector_name.clone(); let metadata = build_unified_connector_service_grpc_headers(connector_auth_metadata, grpc_headers)?; *request.metadata_mut() = metadata; @@ -225,7 +239,14 @@ impl UnifiedConnectorServiceClient { .get(request) .await .change_context(UnifiedConnectorServiceError::PaymentGetFailure) - .inspect_err(|error| logger::error!(?error)) + .inspect_err(|error| { + logger::error!( + grpc_error=?error, + method="payment_get", + connector_name=?connector_name, + "UCS payment get/sync gRPC call failed" + ) + }) } /// Performs Payment Setup Mandate @@ -238,6 +259,7 @@ impl UnifiedConnectorServiceClient { { let mut request = tonic::Request::new(payment_register_request); + let connector_name = connector_auth_metadata.connector_name.clone(); let metadata = build_unified_connector_service_grpc_headers(connector_auth_metadata, grpc_headers)?; *request.metadata_mut() = metadata; @@ -247,7 +269,14 @@ impl UnifiedConnectorServiceClient { .register(request) .await .change_context(UnifiedConnectorServiceError::PaymentRegisterFailure) - .inspect_err(|error| logger::error!(?error)) + .inspect_err(|error| { + logger::error!( + grpc_error=?error, + method="payment_setup_mandate", + connector_name=?connector_name, + "UCS payment setup mandate gRPC call failed" + ) + }) } /// Performs Payment repeat (MIT - Merchant Initiated Transaction). @@ -261,6 +290,7 @@ impl UnifiedConnectorServiceClient { > { let mut request = tonic::Request::new(payment_repeat_request); + let connector_name = connector_auth_metadata.connector_name.clone(); let metadata = build_unified_connector_service_grpc_headers(connector_auth_metadata, grpc_headers)?; *request.metadata_mut() = metadata; @@ -270,7 +300,43 @@ impl UnifiedConnectorServiceClient { .repeat_everything(request) .await .change_context(UnifiedConnectorServiceError::PaymentRepeatEverythingFailure) - .inspect_err(|error| logger::error!(?error)) + .inspect_err(|error| { + logger::error!( + grpc_error=?error, + method="payment_repeat", + connector_name=?connector_name, + "UCS payment repeat gRPC call failed" + ) + }) + } + + /// Transforms incoming webhook through UCS + pub async fn transform_incoming_webhook( + &self, + webhook_transform_request: PaymentServiceTransformRequest, + connector_auth_metadata: ConnectorAuthMetadata, + grpc_headers: GrpcHeaders, + ) -> UnifiedConnectorServiceResult> { + let mut request = tonic::Request::new(webhook_transform_request); + + let connector_name = connector_auth_metadata.connector_name.clone(); + let metadata = + build_unified_connector_service_grpc_headers(connector_auth_metadata, grpc_headers)?; + *request.metadata_mut() = metadata; + + self.client + .clone() + .transform(request) + .await + .change_context(UnifiedConnectorServiceError::WebhookTransformFailure) + .inspect_err(|error| { + logger::error!( + grpc_error=?error, + method="transform_incoming_webhook", + connector_name=?connector_name, + "UCS webhook transform gRPC call failed" + ) + }) } } @@ -318,17 +384,18 @@ pub fn build_unified_connector_service_grpc_headers( parse(common_utils_consts::X_MERCHANT_ID, meta.merchant_id.peek())?, ); - grpc_headers.tenant_id - .parse() - .map(|tenant_id| { - metadata.append( - common_utils_consts::TENANT_HEADER, - tenant_id) - }) - .inspect_err( - |err| logger::warn!(header_parse_error=?err,"invalid {} received",common_utils_consts::TENANT_HEADER), - ) - .ok(); + if let Err(err) = grpc_headers + .tenant_id + .parse() + .map(|tenant_id| metadata.append(common_utils_consts::TENANT_HEADER, tenant_id)) + { + logger::error!( + header_parse_error=?err, + tenant_id=?grpc_headers.tenant_id, + "Failed to parse tenant_id header for UCS gRPC request: {}", + common_utils_consts::TENANT_HEADER + ); + } Ok(metadata) } diff --git a/crates/router/src/core/payments/helpers.rs b/crates/router/src/core/payments/helpers.rs index a6be59c811..87f47a9b2b 100644 --- a/crates/router/src/core/payments/helpers.rs +++ b/crates/router/src/core/payments/helpers.rs @@ -4188,6 +4188,15 @@ impl MerchantConnectorAccountType { Self::CacheVal(_) => None, } } + + pub fn get_webhook_details( + &self, + ) -> CustomResult>, errors::ApiErrorResponse> { + match self { + Self::DbVal(db_val) => Ok(db_val.connector_webhook_details.as_ref()), + Self::CacheVal(_) => Ok(None), + } + } } /// Query for merchant connector account either by business label or profile id diff --git a/crates/router/src/core/unified_connector_service.rs b/crates/router/src/core/unified_connector_service.rs index 107b6e7e7a..4e32095dea 100644 --- a/crates/router/src/core/unified_connector_service.rs +++ b/crates/router/src/core/unified_connector_service.rs @@ -1,3 +1,4 @@ +use api_models::admin; use common_enums::{AttemptStatus, PaymentMethodType}; use common_utils::{errors::CustomResult, ext_traits::ValueExt}; use error_stack::ResultExt; @@ -13,6 +14,7 @@ use hyperswitch_domain_models::{ router_response_types::PaymentsResponseData, }; use masking::{ExposeInterface, PeekInterface, Secret}; +use router_env::logger; use unified_connector_service_client::payments::{ self as payments_grpc, payment_method::PaymentMethod, CardDetails, CardPaymentMethodType, PaymentServiceAuthorizeResponse, @@ -21,7 +23,7 @@ use unified_connector_service_client::payments::{ use crate::{ consts, core::{ - errors::RouterResult, + errors::{ApiErrorResponse, RouterResult}, payments::helpers::{ is_ucs_enabled, should_execute_based_on_rollout, MerchantConnectorAccountType, }, @@ -29,9 +31,13 @@ use crate::{ }, routes::SessionState, types::transformers::ForeignTryFrom, + utils, }; -mod transformers; +pub mod transformers; + +// Re-export webhook transformer types for easier access +pub use transformers::WebhookTransformData; pub async fn should_call_unified_connector_service( state: &SessionState, @@ -80,6 +86,42 @@ pub async fn should_call_unified_connector_service( Ok(should_execute) } +pub async fn should_call_unified_connector_service_for_webhooks( + state: &SessionState, + merchant_context: &MerchantContext, + connector_name: &str, +) -> RouterResult { + if state.grpc_client.unified_connector_service_client.is_none() { + logger::debug!( + connector = connector_name.to_string(), + "Unified Connector Service client is not available for webhooks" + ); + return Ok(false); + } + + let ucs_config_key = consts::UCS_ENABLED; + + if !is_ucs_enabled(state, ucs_config_key).await { + return Ok(false); + } + + let merchant_id = merchant_context + .get_merchant_account() + .get_id() + .get_string_repr(); + + let config_key = format!( + "{}_{}_{}_Webhooks", + consts::UCS_ROLLOUT_PERCENT_CONFIG_PREFIX, + merchant_id, + connector_name + ); + + let should_execute = should_execute_based_on_rollout(state, &config_key).await?; + + Ok(should_execute) +} + pub fn build_unified_connector_service_payment_method( payment_method_data: hyperswitch_domain_models::payment_method_data::PaymentMethodData, payment_method_type: PaymentMethodType, @@ -317,3 +359,161 @@ pub fn handle_unified_connector_service_response_for_payment_repeat( Ok((status, router_data_response, status_code)) } + +pub fn build_webhook_secrets_from_merchant_connector_account( + #[cfg(feature = "v1")] merchant_connector_account: &MerchantConnectorAccountType, + #[cfg(feature = "v2")] merchant_connector_account: &MerchantConnectorAccountTypeDetails, +) -> CustomResult, UnifiedConnectorServiceError> { + // Extract webhook credentials from merchant connector account + // This depends on how webhook secrets are stored in the merchant connector account + + #[cfg(feature = "v1")] + let webhook_details = merchant_connector_account + .get_webhook_details() + .map_err(|_| UnifiedConnectorServiceError::FailedToObtainAuthType)?; + + #[cfg(feature = "v2")] + let webhook_details = match merchant_connector_account { + MerchantConnectorAccountTypeDetails::MerchantConnectorAccount(mca) => { + mca.connector_webhook_details.as_ref() + } + MerchantConnectorAccountTypeDetails::MerchantConnectorDetails(_) => None, + }; + + match webhook_details { + Some(details) => { + // Parse the webhook details JSON to extract secrets + let webhook_details: admin::MerchantConnectorWebhookDetails = details + .clone() + .parse_value("MerchantConnectorWebhookDetails") + .change_context(UnifiedConnectorServiceError::FailedToObtainAuthType) + .attach_printable("Failed to parse MerchantConnectorWebhookDetails")?; + + // Build gRPC WebhookSecrets from parsed details + Ok(Some(payments_grpc::WebhookSecrets { + secret: webhook_details.merchant_secret.expose().to_string(), + additional_secret: webhook_details + .additional_secret + .map(|secret| secret.expose().to_string()), + })) + } + None => Ok(None), + } +} + +/// High-level abstraction for calling UCS webhook transformation +/// This provides a clean interface similar to payment flow UCS calls +pub async fn call_unified_connector_service_for_webhook( + state: &SessionState, + merchant_context: &MerchantContext, + connector_name: &str, + body: &actix_web::web::Bytes, + request_details: &hyperswitch_interfaces::webhooks::IncomingWebhookRequestDetails<'_>, + merchant_connector_account: Option< + &hyperswitch_domain_models::merchant_connector_account::MerchantConnectorAccount, + >, +) -> RouterResult<( + api_models::webhooks::IncomingWebhookEvent, + bool, + WebhookTransformData, +)> { + let ucs_client = state + .grpc_client + .unified_connector_service_client + .as_ref() + .ok_or_else(|| { + error_stack::report!(ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("UCS client is not available for webhook processing") + })?; + + // Build webhook secrets from merchant connector account + let webhook_secrets = merchant_connector_account.and_then(|mca| { + #[cfg(feature = "v1")] + let mca_type = MerchantConnectorAccountType::DbVal(Box::new(mca.clone())); + #[cfg(feature = "v2")] + let mca_type = + MerchantConnectorAccountTypeDetails::MerchantConnectorAccount(Box::new(mca.clone())); + + build_webhook_secrets_from_merchant_connector_account(&mca_type) + .map_err(|e| { + logger::warn!( + build_error=?e, + connector_name=connector_name, + "Failed to build webhook secrets from merchant connector account in call_unified_connector_service_for_webhook" + ); + e + }) + .ok() + .flatten() + }); + + // Build UCS transform request using new webhook transformers + let transform_request = transformers::build_webhook_transform_request( + body, + request_details, + webhook_secrets, + merchant_context + .get_merchant_account() + .get_id() + .get_string_repr(), + connector_name, + )?; + + // Build connector auth metadata + let connector_auth_metadata = merchant_connector_account + .map(|mca| { + #[cfg(feature = "v1")] + let mca_type = MerchantConnectorAccountType::DbVal(Box::new(mca.clone())); + #[cfg(feature = "v2")] + let mca_type = MerchantConnectorAccountTypeDetails::MerchantConnectorAccount(Box::new( + mca.clone(), + )); + + build_unified_connector_service_auth_metadata(mca_type, merchant_context) + }) + .transpose() + .change_context(ApiErrorResponse::InternalServerError) + .attach_printable("Failed to build UCS auth metadata")? + .ok_or_else(|| { + error_stack::report!(ApiErrorResponse::InternalServerError).attach_printable( + "Missing merchant connector account for UCS webhook transformation", + ) + })?; + + // Build gRPC headers + let grpc_headers = external_services::grpc_client::GrpcHeaders { + tenant_id: state.tenant.tenant_id.get_string_repr().to_string(), + request_id: Some(utils::generate_id(consts::ID_LENGTH, "webhook_req")), + }; + + // Make UCS call - client availability already verified + match ucs_client + .transform_incoming_webhook(transform_request, connector_auth_metadata, grpc_headers) + .await + { + Ok(response) => { + let transform_response = response.into_inner(); + let transform_data = transformers::transform_ucs_webhook_response(transform_response)?; + + // UCS handles everything internally - event type, source verification, decoding + Ok(( + transform_data.event_type, + transform_data.source_verified, + transform_data, + )) + } + Err(err) => { + // When UCS is configured, we don't fall back to direct connector processing + Err(ApiErrorResponse::WebhookProcessingFailure) + .attach_printable(format!("UCS webhook processing failed: {err}")) + } + } +} + +/// Extract webhook content from UCS response for further processing +/// This provides a helper function to extract specific data from UCS responses +pub fn extract_webhook_content_from_ucs_response( + transform_data: &WebhookTransformData, +) -> Option<&unified_connector_service_client::payments::WebhookResponseContent> { + transform_data.webhook_content.as_ref() +} diff --git a/crates/router/src/core/unified_connector_service/transformers.rs b/crates/router/src/core/unified_connector_service/transformers.rs index 9f8ad6c128..a6559abc2d 100644 --- a/crates/router/src/core/unified_connector_service/transformers.rs +++ b/crates/router/src/core/unified_connector_service/transformers.rs @@ -17,11 +17,14 @@ use hyperswitch_domain_models::{ }; use masking::{ExposeInterface, PeekInterface}; use router_env::tracing; -use unified_connector_service_client::payments::{self as payments_grpc, Identifier}; +use unified_connector_service_client::payments::{ + self as payments_grpc, Identifier, PaymentServiceTransformRequest, + PaymentServiceTransformResponse, +}; use url::Url; use crate::{ - core::unified_connector_service::build_unified_connector_service_payment_method, + core::{errors, unified_connector_service::build_unified_connector_service_payment_method}, types::transformers::ForeignTryFrom, }; impl ForeignTryFrom<&RouterData> @@ -39,6 +42,13 @@ impl ForeignTryFrom<&RouterData> .map(|id| Identifier { id_type: Some(payments_grpc::identifier::IdType::Id(id)), }) + .map_err(|e| { + tracing::debug!( + transaction_id_error=?e, + "Failed to extract connector transaction ID for UCS payment sync request" + ); + e + }) .ok(); let connector_ref_id = router_data @@ -670,6 +680,7 @@ impl ForeignTryFrom for payments_grpc::CardNetwork { common_enums::CardNetwork::UnionPay => Ok(Self::Unionpay), common_enums::CardNetwork::RuPay => Ok(Self::Rupay), common_enums::CardNetwork::Maestro => Ok(Self::Maestro), + common_enums::CardNetwork::AmericanExpress => Ok(Self::Amex), _ => Err( UnifiedConnectorServiceError::RequestEncodingFailedWithReason( "Card Network not supported".to_string(), @@ -1003,6 +1014,113 @@ impl ForeignTryFrom } } +impl ForeignTryFrom<&hyperswitch_interfaces::webhooks::IncomingWebhookRequestDetails<'_>> + for payments_grpc::RequestDetails +{ + type Error = error_stack::Report; + + fn foreign_try_from( + request_details: &hyperswitch_interfaces::webhooks::IncomingWebhookRequestDetails<'_>, + ) -> Result { + let headers_map = request_details + .headers + .iter() + .map(|(key, value)| { + let value_string = value.to_str().unwrap_or_default().to_string(); + (key.as_str().to_string(), value_string) + }) + .collect(); + + Ok(Self { + method: 1, // POST method for webhooks + uri: Some({ + let uri_result = request_details + .headers + .get("x-forwarded-path") + .and_then(|h| h.to_str().map_err(|e| { + tracing::warn!( + header_conversion_error=?e, + header_value=?h, + "Failed to convert x-forwarded-path header to string for webhook processing" + ); + e + }).ok()); + + uri_result.unwrap_or_else(|| { + tracing::debug!("x-forwarded-path header not found or invalid, using default '/Unknown'"); + "/Unknown" + }).to_string() + }), + body: request_details.body.to_vec(), + headers: headers_map, + query_params: Some(request_details.query_params.clone()), + }) + } +} + +/// Webhook transform data structure containing UCS response information +pub struct WebhookTransformData { + pub event_type: api_models::webhooks::IncomingWebhookEvent, + pub source_verified: bool, + pub webhook_content: Option, + pub response_ref_id: Option, +} + +/// Transform UCS webhook response into webhook event data +pub fn transform_ucs_webhook_response( + response: PaymentServiceTransformResponse, +) -> Result> { + let event_type = match response.event_type { + 0 => api_models::webhooks::IncomingWebhookEvent::PaymentIntentSuccess, + 1 => api_models::webhooks::IncomingWebhookEvent::PaymentIntentFailure, + 2 => api_models::webhooks::IncomingWebhookEvent::PaymentIntentProcessing, + 3 => api_models::webhooks::IncomingWebhookEvent::PaymentIntentCancelled, + 4 => api_models::webhooks::IncomingWebhookEvent::RefundSuccess, + 5 => api_models::webhooks::IncomingWebhookEvent::RefundFailure, + 6 => api_models::webhooks::IncomingWebhookEvent::MandateRevoked, + _ => api_models::webhooks::IncomingWebhookEvent::EventNotSupported, + }; + + Ok(WebhookTransformData { + event_type, + source_verified: response.source_verified, + webhook_content: response.content, + response_ref_id: response.response_ref_id.and_then(|identifier| { + identifier.id_type.and_then(|id_type| match id_type { + payments_grpc::identifier::IdType::Id(id) => Some(id), + payments_grpc::identifier::IdType::EncodedData(encoded_data) => Some(encoded_data), + payments_grpc::identifier::IdType::NoResponseIdMarker(_) => None, + }) + }), + }) +} + +/// Build UCS webhook transform request from webhook components +pub fn build_webhook_transform_request( + _webhook_body: &[u8], + request_details: &hyperswitch_interfaces::webhooks::IncomingWebhookRequestDetails<'_>, + webhook_secrets: Option, + merchant_id: &str, + connector_id: &str, +) -> Result> { + let request_details_grpc = payments_grpc::RequestDetails::foreign_try_from(request_details) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to transform webhook request details to gRPC format")?; + + Ok(PaymentServiceTransformRequest { + request_ref_id: Some(Identifier { + id_type: Some(payments_grpc::identifier::IdType::Id(format!( + "{}_{}_{}", + merchant_id, + connector_id, + time::OffsetDateTime::now_utc().unix_timestamp() + ))), + }), + request_details: Some(request_details_grpc), + webhook_secrets, + }) +} + pub fn convert_connector_service_status_code( status_code: u32, ) -> Result> { diff --git a/crates/router/src/core/webhooks/incoming.rs b/crates/router/src/core/webhooks/incoming.rs index fae94ceb11..29a6c6c8e2 100644 --- a/crates/router/src/core/webhooks/incoming.rs +++ b/crates/router/src/core/webhooks/incoming.rs @@ -30,7 +30,7 @@ use crate::{ errors::{self, ConnectorErrorExt, CustomResult, RouterResponse, StorageErrorExt}, metrics, payment_methods, payments::{self, tokenization}, - refunds, relay, utils as core_utils, + refunds, relay, unified_connector_service, utils as core_utils, webhooks::{network_tokenization_incoming, utils::construct_webhook_router_data}, }, db::StorageInterface, @@ -202,8 +202,7 @@ async fn incoming_webhooks_core( WebhookResponseTracker, serde_json::Value, )> { - let key_manager_state = &(&state).into(); - + // Initial setup and metrics metrics::WEBHOOK_INCOMING_COUNT.add( 1, router_env::metric_attributes!(( @@ -211,7 +210,8 @@ async fn incoming_webhooks_core( merchant_context.get_merchant_account().get_id().clone() )), ); - let mut request_details = IncomingWebhookRequestDetails { + + let request_details = IncomingWebhookRequestDetails { method: req.method().clone(), uri: req.uri().clone(), headers: req.headers(), @@ -220,31 +220,211 @@ async fn incoming_webhooks_core( }; // Fetch the merchant connector account to get the webhooks source secret - // `webhooks source secret` is a secret shared between the merchant and connector - // This is used for source verification and webhooks integrity let (merchant_connector_account, connector, connector_name) = fetch_optional_mca_and_connector(&state, &merchant_context, connector_name_or_mca_id) .await?; - let decoded_body = connector - .decode_webhook_body( - &request_details, - merchant_context.get_merchant_account().get_id(), - merchant_connector_account - .clone() - .and_then(|merchant_connector_account| { - merchant_connector_account.connector_webhook_details - }), - connector_name.as_str(), + // Determine webhook processing path (UCS vs non-UCS) and handle event type extraction + let webhook_processing_result = + if unified_connector_service::should_call_unified_connector_service_for_webhooks( + &state, + &merchant_context, + &connector_name, ) - .await + .await? + { + logger::info!( + connector = connector_name, + "Using Unified Connector Service for webhook processing", + ); + process_ucs_webhook_transform( + &state, + &merchant_context, + &connector_name, + &body, + &request_details, + merchant_connector_account.as_ref(), + ) + .await? + } else { + // NON-UCS PATH: Need to decode body first + let decoded_body = connector + .decode_webhook_body( + &request_details, + merchant_context.get_merchant_account().get_id(), + merchant_connector_account + .and_then(|mca| mca.connector_webhook_details.clone()), + &connector_name, + ) + .await + .switch() + .attach_printable("There was an error in incoming webhook body decoding")?; + + process_non_ucs_webhook( + &state, + &merchant_context, + &connector, + &connector_name, + decoded_body.into(), + &request_details, + ) + .await? + }; + + // Update request_details with the appropriate body (decoded for non-UCS, raw for UCS) + let final_request_details = match &webhook_processing_result.decoded_body { + Some(decoded_body) => IncomingWebhookRequestDetails { + method: request_details.method.clone(), + uri: request_details.uri.clone(), + headers: request_details.headers, + query_params: request_details.query_params.clone(), + body: decoded_body, + }, + None => request_details, // Use original request details for UCS + }; + + logger::info!(event_type=?webhook_processing_result.event_type); + + // Check if webhook should be processed further + let is_webhook_event_supported = !matches!( + webhook_processing_result.event_type, + webhooks::IncomingWebhookEvent::EventNotSupported + ); + let is_webhook_event_enabled = !utils::is_webhook_event_disabled( + &*state.clone().store, + connector_name.as_str(), + merchant_context.get_merchant_account().get_id(), + &webhook_processing_result.event_type, + ) + .await; + let flow_type: api::WebhookFlow = webhook_processing_result.event_type.into(); + let process_webhook_further = is_webhook_event_enabled + && is_webhook_event_supported + && !matches!(flow_type, api::WebhookFlow::ReturnResponse); + logger::info!(process_webhook=?process_webhook_further); + let mut event_object: Box = Box::new(serde_json::Value::Null); + + let webhook_effect = match process_webhook_further { + true => { + let business_logic_result = process_webhook_business_logic( + &state, + req_state, + &merchant_context, + &connector, + &connector_name, + webhook_processing_result.event_type, + webhook_processing_result.source_verified, + &webhook_processing_result.transform_data, + &final_request_details, + is_relay_webhook, + ) + .await; + + match business_logic_result { + Ok(response) => { + // Extract event object for serialization + event_object = extract_webhook_event_object( + &webhook_processing_result.transform_data, + &connector, + &final_request_details, + )?; + response + } + Err(error) => { + let error_result = handle_incoming_webhook_error( + error, + &connector, + connector_name.as_str(), + &final_request_details, + ); + match error_result { + Ok((_, webhook_tracker, _)) => webhook_tracker, + Err(e) => return Err(e), + } + } + } + } + false => { + metrics::WEBHOOK_INCOMING_FILTERED_COUNT.add( + 1, + router_env::metric_attributes!(( + MERCHANT_ID, + merchant_context.get_merchant_account().get_id().clone() + )), + ); + WebhookResponseTracker::NoEffect + } + }; + + // Generate response + let response = connector + .get_webhook_api_response(&final_request_details, None) .switch() - .attach_printable("There was an error in incoming webhook body decoding")?; + .attach_printable("Could not get incoming webhook api response from connector")?; - request_details.body = &decoded_body; + let serialized_request = event_object + .masked_serialize() + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Could not convert webhook effect to string")?; - let event_type = match connector - .get_webhook_event_type(&request_details) + Ok((response, webhook_effect, serialized_request)) +} + +/// Process UCS webhook transformation using the high-level UCS abstraction +async fn process_ucs_webhook_transform( + state: &SessionState, + merchant_context: &domain::MerchantContext, + connector_name: &str, + body: &actix_web::web::Bytes, + request_details: &IncomingWebhookRequestDetails<'_>, + merchant_connector_account: Option<&domain::MerchantConnectorAccount>, +) -> errors::RouterResult { + // Use the new UCS abstraction which provides clean separation + let (event_type, source_verified, transform_data) = + unified_connector_service::call_unified_connector_service_for_webhook( + state, + merchant_context, + connector_name, + body, + request_details, + merchant_connector_account, + ) + .await?; + Ok(WebhookProcessingResult { + event_type, + source_verified, + transform_data: Some(Box::new(transform_data)), + decoded_body: None, // UCS path uses raw body + }) +} +/// Result type for webhook processing path determination +pub struct WebhookProcessingResult { + event_type: webhooks::IncomingWebhookEvent, + source_verified: bool, + transform_data: Option>, + decoded_body: Option, +} + +/// Process non-UCS webhook using traditional connector processing +async fn process_non_ucs_webhook( + state: &SessionState, + merchant_context: &domain::MerchantContext, + connector: &ConnectorEnum, + connector_name: &str, + decoded_body: actix_web::web::Bytes, + request_details: &IncomingWebhookRequestDetails<'_>, +) -> errors::RouterResult { + // Create request_details with decoded body for connector processing + let updated_request_details = IncomingWebhookRequestDetails { + method: request_details.method.clone(), + uri: request_details.uri.clone(), + headers: request_details.headers, + query_params: request_details.query_params.clone(), + body: &decoded_body, + }; + + match connector + .get_webhook_event_type(&updated_request_details) .allow_webhook_event_type_not_found( state .clone() @@ -257,14 +437,13 @@ async fn incoming_webhooks_core( .switch() .attach_printable("Could not find event type in incoming webhook body")? { - Some(event_type) => event_type, - // Early return allows us to acknowledge the webhooks that we do not support + Some(event_type) => Ok(WebhookProcessingResult { + event_type, + source_verified: false, + transform_data: None, + decoded_body: Some(decoded_body), + }), None => { - logger::error!( - webhook_payload =? request_details.body, - "Failed while identifying the event type", - ); - metrics::WEBHOOK_EVENT_TYPE_IDENTIFICATION_FAILURE_COUNT.add( 1, router_env::metric_attributes!( @@ -272,94 +451,101 @@ async fn incoming_webhooks_core( MERCHANT_ID, merchant_context.get_merchant_account().get_id().clone() ), - ("connector", connector_name) + ("connector", connector_name.to_string()) ), ); + Err(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("Failed to identify event type in incoming webhook body") + } + } +} - let response = connector - .get_webhook_api_response(&request_details, None) +/// Extract webhook event object based on transform data availability +fn extract_webhook_event_object( + transform_data: &Option>, + connector: &ConnectorEnum, + request_details: &IncomingWebhookRequestDetails<'_>, +) -> errors::RouterResult> { + match transform_data { + Some(transform_data) => match &transform_data.webhook_content { + Some(webhook_content) => { + let serialized_value = serde_json::to_value(webhook_content) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to serialize UCS webhook content")?; + Ok(Box::new(serialized_value)) + } + None => connector + .get_webhook_resource_object(request_details) .switch() - .attach_printable("Failed while early return in case of event type parsing")?; + .attach_printable("Could not find resource object in incoming webhook body"), + }, + None => connector + .get_webhook_resource_object(request_details) + .switch() + .attach_printable("Could not find resource object in incoming webhook body"), + } +} - return Ok(( - response, - WebhookResponseTracker::NoEffect, - serde_json::Value::Null, - )); +/// Process the main webhook business logic after event type determination +#[allow(clippy::too_many_arguments)] +async fn process_webhook_business_logic( + state: &SessionState, + req_state: ReqState, + merchant_context: &domain::MerchantContext, + connector: &ConnectorEnum, + connector_name: &str, + event_type: webhooks::IncomingWebhookEvent, + source_verified_via_ucs: bool, + webhook_transform_data: &Option>, + request_details: &IncomingWebhookRequestDetails<'_>, + is_relay_webhook: bool, +) -> errors::RouterResult { + let object_ref_id = connector + .get_webhook_object_reference_id(request_details) + .switch() + .attach_printable("Could not find object reference id in incoming webhook body")?; + let connector_enum = api_models::enums::Connector::from_str(connector_name) + .change_context(errors::ApiErrorResponse::InvalidDataValue { + field_name: "connector", + }) + .attach_printable_lazy(|| format!("unable to parse connector name {connector_name:?}"))?; + let connectors_with_source_verification_call = &state.conf.webhook_source_verification_call; + + let merchant_connector_account = match Box::pin(helper_utils::get_mca_from_object_reference_id( + state, + object_ref_id.clone(), + merchant_context, + connector_name, + )) + .await + { + Ok(mca) => mca, + Err(error) => { + let result = + handle_incoming_webhook_error(error, connector, connector_name, request_details); + match result { + Ok((_, webhook_tracker, _)) => return Ok(webhook_tracker), + Err(e) => return Err(e), + } } }; - logger::info!(event_type=?event_type); - let is_webhook_event_supported = !matches!( - event_type, - webhooks::IncomingWebhookEvent::EventNotSupported - ); - let is_webhook_event_enabled = !utils::is_webhook_event_disabled( - &*state.clone().store, - connector_name.as_str(), - merchant_context.get_merchant_account().get_id(), - &event_type, - ) - .await; - - //process webhook further only if webhook event is enabled and is not event_not_supported - let process_webhook_further = is_webhook_event_enabled && is_webhook_event_supported; - - logger::info!(process_webhook=?process_webhook_further); - - let flow_type: api::WebhookFlow = event_type.into(); - let mut event_object: Box = Box::new(serde_json::Value::Null); - let webhook_effect = if process_webhook_further - && !matches!(flow_type, api::WebhookFlow::ReturnResponse) - { - let object_ref_id = connector - .get_webhook_object_reference_id(&request_details) - .switch() - .attach_printable("Could not find object reference id in incoming webhook body")?; - let connector_enum = api_models::enums::Connector::from_str(&connector_name) - .change_context(errors::ApiErrorResponse::InvalidDataValue { - field_name: "connector", - }) - .attach_printable_lazy(|| { - format!("unable to parse connector name {connector_name:?}") - })?; - let connectors_with_source_verification_call = &state.conf.webhook_source_verification_call; - - let merchant_connector_account = match merchant_connector_account { - Some(merchant_connector_account) => merchant_connector_account, - None => { - match Box::pin(helper_utils::get_mca_from_object_reference_id( - &state, - object_ref_id.clone(), - &merchant_context, - &connector_name, - )) - .await - { - Ok(mca) => mca, - Err(error) => { - return handle_incoming_webhook_error( - error, - &connector, - connector_name.as_str(), - &request_details, - ); - } - } - } - }; - - let source_verified = if connectors_with_source_verification_call + let source_verified = if source_verified_via_ucs { + // If UCS handled verification, use that result + source_verified_via_ucs + } else { + // Fall back to traditional source verification + if connectors_with_source_verification_call .connectors_with_webhook_source_verification_call .contains(&connector_enum) { verify_webhook_source_verification_call( connector.clone(), - &state, - &merchant_context, + state, + merchant_context, merchant_connector_account.clone(), - &connector_name, - &request_details, + connector_name, + request_details, ) .await .or_else(|error| match error.current_context() { @@ -375,11 +561,11 @@ async fn incoming_webhooks_core( connector .clone() .verify_webhook_source( - &request_details, + request_details, merchant_context.get_merchant_account().get_id(), merchant_connector_account.connector_webhook_details.clone(), merchant_connector_account.connector_account_details.clone(), - connector_name.as_str(), + connector_name, ) .await .or_else(|error| match error.current_context() { @@ -391,235 +577,233 @@ async fn incoming_webhooks_core( }) .switch() .attach_printable("There was an issue in incoming webhook source verification")? - }; - - if source_verified { - metrics::WEBHOOK_SOURCE_VERIFIED_COUNT.add( - 1, - router_env::metric_attributes!(( - MERCHANT_ID, - merchant_context.get_merchant_account().get_id().clone() - )), - ); - } else if connector.is_webhook_source_verification_mandatory() { - // if webhook consumption is mandatory for connector, fail webhook - // so that merchant can retrigger it after updating merchant_secret - return Err(errors::ApiErrorResponse::WebhookAuthenticationFailed.into()); } + }; - logger::info!(source_verified=?source_verified); - - event_object = connector - .get_webhook_resource_object(&request_details) - .switch() - .attach_printable("Could not find resource object in incoming webhook body")?; - - let webhook_details = api::IncomingWebhookDetails { - object_reference_id: object_ref_id.clone(), - resource_object: serde_json::to_vec(&event_object) - .change_context(errors::ParsingError::EncodeError("byte-vec")) - .attach_printable("Unable to convert webhook payload to a value") - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable( - "There was an issue when encoding the incoming webhook body to bytes", - )?, - }; - - let profile_id = &merchant_connector_account.profile_id; - - let business_profile = state - .store - .find_business_profile_by_profile_id( - key_manager_state, - merchant_context.get_merchant_key_store(), - profile_id, - ) - .await - .to_not_found_response(errors::ApiErrorResponse::ProfileNotFound { - id: profile_id.get_string_repr().to_owned(), - })?; - - // If the incoming webhook is a relay webhook, then we need to trigger the relay webhook flow - let result_response = if is_relay_webhook { - let relay_webhook_response = Box::pin(relay_incoming_webhook_flow( - state.clone(), - merchant_context, - business_profile, - webhook_details, - event_type, - source_verified, - )) - .await - .attach_printable("Incoming webhook flow for relay failed"); - - // Using early return ensures unsupported webhooks are acknowledged to the connector - if let Some(errors::ApiErrorResponse::NotSupported { .. }) = relay_webhook_response - .as_ref() - .err() - .map(|a| a.current_context()) - { - logger::error!( - webhook_payload =? request_details.body, - "Failed while identifying the event type", - ); - - let response = connector - .get_webhook_api_response(&request_details, None) - .switch() - .attach_printable( - "Failed while early return in case of not supported event type in relay webhooks", - )?; - - return Ok(( - response, - WebhookResponseTracker::NoEffect, - serde_json::Value::Null, - )); - }; - - relay_webhook_response - } else { - match flow_type { - api::WebhookFlow::Payment => Box::pin(payments_incoming_webhook_flow( - state.clone(), - req_state, - merchant_context, - business_profile, - webhook_details, - source_verified, - &connector, - &request_details, - event_type, - )) - .await - .attach_printable("Incoming webhook flow for payments failed"), - - api::WebhookFlow::Refund => Box::pin(refunds_incoming_webhook_flow( - state.clone(), - merchant_context, - business_profile, - webhook_details, - connector_name.as_str(), - source_verified, - event_type, - )) - .await - .attach_printable("Incoming webhook flow for refunds failed"), - - api::WebhookFlow::Dispute => Box::pin(disputes_incoming_webhook_flow( - state.clone(), - merchant_context, - business_profile, - webhook_details, - source_verified, - &connector, - &request_details, - event_type, - )) - .await - .attach_printable("Incoming webhook flow for disputes failed"), - - api::WebhookFlow::BankTransfer => Box::pin(bank_transfer_webhook_flow( - state.clone(), - req_state, - merchant_context, - business_profile, - webhook_details, - source_verified, - )) - .await - .attach_printable("Incoming bank-transfer webhook flow failed"), - - api::WebhookFlow::ReturnResponse => Ok(WebhookResponseTracker::NoEffect), - - api::WebhookFlow::Mandate => Box::pin(mandates_incoming_webhook_flow( - state.clone(), - merchant_context, - business_profile, - webhook_details, - source_verified, - event_type, - )) - .await - .attach_printable("Incoming webhook flow for mandates failed"), - - api::WebhookFlow::ExternalAuthentication => { - Box::pin(external_authentication_incoming_webhook_flow( - state.clone(), - req_state, - merchant_context, - source_verified, - event_type, - &request_details, - &connector, - object_ref_id, - business_profile, - merchant_connector_account, - )) - .await - .attach_printable("Incoming webhook flow for external authentication failed") - } - api::WebhookFlow::FraudCheck => Box::pin(frm_incoming_webhook_flow( - state.clone(), - req_state, - merchant_context, - source_verified, - event_type, - object_ref_id, - business_profile, - )) - .await - .attach_printable("Incoming webhook flow for fraud check failed"), - - #[cfg(feature = "payouts")] - api::WebhookFlow::Payout => Box::pin(payouts_incoming_webhook_flow( - state.clone(), - merchant_context, - business_profile, - webhook_details, - event_type, - source_verified, - )) - .await - .attach_printable("Incoming webhook flow for payouts failed"), - - _ => Err(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Unsupported Flow Type received in incoming webhooks"), - } - }; - - match result_response { - Ok(response) => response, - Err(error) => { - return handle_incoming_webhook_error( - error, - &connector, - connector_name.as_str(), - &request_details, - ); - } - } - } else { - metrics::WEBHOOK_INCOMING_FILTERED_COUNT.add( + if source_verified { + metrics::WEBHOOK_SOURCE_VERIFIED_COUNT.add( 1, router_env::metric_attributes!(( MERCHANT_ID, merchant_context.get_merchant_account().get_id().clone() )), ); - WebhookResponseTracker::NoEffect + } else if connector.is_webhook_source_verification_mandatory() { + // if webhook consumption is mandatory for connector, fail webhook + // so that merchant can retrigger it after updating merchant_secret + return Err(errors::ApiErrorResponse::WebhookAuthenticationFailed.into()); + } + + logger::info!(source_verified=?source_verified); + + let event_object: Box = + if let Some(transform_data) = webhook_transform_data { + // Use UCS transform data if available + if let Some(webhook_content) = &transform_data.webhook_content { + // Convert UCS webhook content to appropriate format + Box::new( + serde_json::to_value(webhook_content) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to serialize UCS webhook content")?, + ) + } else { + // Fall back to connector extraction + connector + .get_webhook_resource_object(request_details) + .switch() + .attach_printable("Could not find resource object in incoming webhook body")? + } + } else { + // Use traditional connector extraction + connector + .get_webhook_resource_object(request_details) + .switch() + .attach_printable("Could not find resource object in incoming webhook body")? + }; + + let webhook_details = api::IncomingWebhookDetails { + object_reference_id: object_ref_id.clone(), + resource_object: serde_json::to_vec(&event_object) + .change_context(errors::ParsingError::EncodeError("byte-vec")) + .attach_printable("Unable to convert webhook payload to a value") + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable( + "There was an issue when encoding the incoming webhook body to bytes", + )?, }; - let response = connector - .get_webhook_api_response(&request_details, None) - .switch() - .attach_printable("Could not get incoming webhook api response from connector")?; + let profile_id = &merchant_connector_account.profile_id; + let key_manager_state = &(state).into(); - let serialized_request = event_object - .masked_serialize() - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Could not convert webhook effect to string")?; - Ok((response, webhook_effect, serialized_request)) + let business_profile = state + .store + .find_business_profile_by_profile_id( + key_manager_state, + merchant_context.get_merchant_key_store(), + profile_id, + ) + .await + .to_not_found_response(errors::ApiErrorResponse::ProfileNotFound { + id: profile_id.get_string_repr().to_owned(), + })?; + + // If the incoming webhook is a relay webhook, then we need to trigger the relay webhook flow + let result_response = if is_relay_webhook { + let relay_webhook_response = Box::pin(relay_incoming_webhook_flow( + state.clone(), + merchant_context.clone(), + business_profile, + webhook_details, + event_type, + source_verified, + )) + .await + .attach_printable("Incoming webhook flow for relay failed"); + + // Using early return ensures unsupported webhooks are acknowledged to the connector + if let Some(errors::ApiErrorResponse::NotSupported { .. }) = relay_webhook_response + .as_ref() + .err() + .map(|a| a.current_context()) + { + logger::error!( + webhook_payload =? request_details.body, + "Failed while identifying the event type", + ); + + let _response = connector + .get_webhook_api_response(request_details, None) + .switch() + .attach_printable( + "Failed while early return in case of not supported event type in relay webhooks", + )?; + + return Ok(WebhookResponseTracker::NoEffect); + }; + + relay_webhook_response + } else { + let flow_type: api::WebhookFlow = event_type.into(); + match flow_type { + api::WebhookFlow::Payment => Box::pin(payments_incoming_webhook_flow( + state.clone(), + req_state, + merchant_context.clone(), + business_profile, + webhook_details, + source_verified, + connector, + request_details, + event_type, + )) + .await + .attach_printable("Incoming webhook flow for payments failed"), + + api::WebhookFlow::Refund => Box::pin(refunds_incoming_webhook_flow( + state.clone(), + merchant_context.clone(), + business_profile, + webhook_details, + connector_name, + source_verified, + event_type, + )) + .await + .attach_printable("Incoming webhook flow for refunds failed"), + + api::WebhookFlow::Dispute => Box::pin(disputes_incoming_webhook_flow( + state.clone(), + merchant_context.clone(), + business_profile, + webhook_details, + source_verified, + connector, + request_details, + event_type, + )) + .await + .attach_printable("Incoming webhook flow for disputes failed"), + + api::WebhookFlow::BankTransfer => Box::pin(bank_transfer_webhook_flow( + state.clone(), + req_state, + merchant_context.clone(), + business_profile, + webhook_details, + source_verified, + )) + .await + .attach_printable("Incoming bank-transfer webhook flow failed"), + + api::WebhookFlow::ReturnResponse => Ok(WebhookResponseTracker::NoEffect), + + api::WebhookFlow::Mandate => Box::pin(mandates_incoming_webhook_flow( + state.clone(), + merchant_context.clone(), + business_profile, + webhook_details, + source_verified, + event_type, + )) + .await + .attach_printable("Incoming webhook flow for mandates failed"), + + api::WebhookFlow::ExternalAuthentication => { + Box::pin(external_authentication_incoming_webhook_flow( + state.clone(), + req_state, + merchant_context.clone(), + source_verified, + event_type, + request_details, + connector, + object_ref_id, + business_profile, + merchant_connector_account, + )) + .await + .attach_printable("Incoming webhook flow for external authentication failed") + } + api::WebhookFlow::FraudCheck => Box::pin(frm_incoming_webhook_flow( + state.clone(), + req_state, + merchant_context.clone(), + source_verified, + event_type, + object_ref_id, + business_profile, + )) + .await + .attach_printable("Incoming webhook flow for fraud check failed"), + + #[cfg(feature = "payouts")] + api::WebhookFlow::Payout => Box::pin(payouts_incoming_webhook_flow( + state.clone(), + merchant_context.clone(), + business_profile, + webhook_details, + event_type, + source_verified, + )) + .await + .attach_printable("Incoming webhook flow for payouts failed"), + + _ => Err(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Unsupported Flow Type received in incoming webhooks"), + } + }; + + match result_response { + Ok(response) => Ok(response), + Err(error) => { + let result = + handle_incoming_webhook_error(error, connector, connector_name, request_details); + match result { + Ok((_, webhook_tracker, _)) => Ok(webhook_tracker), + Err(e) => Err(e), + } + } + } } fn handle_incoming_webhook_error( @@ -1346,7 +1530,7 @@ pub async fn get_or_update_dispute_object( Some(dispute) => { logger::info!("Dispute Already exists, Updating the dispute details"); metrics::INCOMING_DISPUTE_WEBHOOK_UPDATE_RECORD_METRIC.add(1, &[]); - crate::core::utils::validate_dispute_stage_and_dispute_status( + core_utils::validate_dispute_stage_and_dispute_status( dispute.dispute_stage, dispute.dispute_status, dispute_details.dispute_stage, @@ -1354,7 +1538,6 @@ pub async fn get_or_update_dispute_object( ) .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) .attach_printable("dispute stage and status validation failed")?; - let update_dispute = diesel_models::dispute::DisputeUpdate::Update { dispute_stage: dispute_details.dispute_stage, dispute_status,