From faed8d352c347339e2e1cb6bdfcaa6e4be07c31a Mon Sep 17 00:00:00 2001 From: Saptak Dutta <138141940+Saptak88@users.noreply.github.com> Date: Mon, 27 Oct 2025 11:44:03 +0530 Subject: [PATCH] feat(core): Added two step payment webhooks processing for Hyperswitch <> UCS Integration (#9374) Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> --- crates/common_enums/src/enums.rs | 1 + .../hyperswitch_interfaces/src/api_client.rs | 75 +----- .../src/unified_connector_service.rs | 34 ++- .../unified_connector_service/transformers.rs | 14 ++ crates/router/src/core/payments.rs | 23 +- crates/router/src/core/payments/flows.rs | 1 + .../src/core/payments/flows/authorize_flow.rs | 1 + .../payments/flows/complete_authorize_flow.rs | 1 + .../src/core/payments/flows/psync_flow.rs | 221 +++++++++++------- .../core/payments/flows/setup_mandate_flow.rs | 1 + crates/router/src/core/payments/helpers.rs | 7 +- .../src/core/unified_connector_service.rs | 56 ++++- .../unified_connector_service/transformers.rs | 49 +++- crates/router/src/core/webhooks/incoming.rs | 108 ++++++--- crates/router/src/services/api.rs | 2 +- 15 files changed, 377 insertions(+), 217 deletions(-) diff --git a/crates/common_enums/src/enums.rs b/crates/common_enums/src/enums.rs index cd3650c077..b8b6da0dcf 100644 --- a/crates/common_enums/src/enums.rs +++ b/crates/common_enums/src/enums.rs @@ -579,6 +579,7 @@ pub enum CallConnectorAction { error_message: Option, }, HandleResponse(Vec), + UCSConsumeResponse(Vec), UCSHandleResponse(Vec), } diff --git a/crates/hyperswitch_interfaces/src/api_client.rs b/crates/hyperswitch_interfaces/src/api_client.rs index b1aedabcbc..5c47dac6a5 100644 --- a/crates/hyperswitch_interfaces/src/api_client.rs +++ b/crates/hyperswitch_interfaces/src/api_client.rs @@ -19,7 +19,6 @@ use masking::Maskable; use reqwest::multipart::Form; use router_env::{instrument, logger, tracing, tracing_actix_web::RequestId}; use serde_json::json; -use unified_connector_service_masking::ExposeInterface; use crate::{ configs, @@ -32,7 +31,6 @@ use crate::{ events::connector_api_logs::ConnectorEvent, metrics, types, types::Proxy, - unified_connector_service, }; /// A trait representing a converter for connector names to their corresponding enum variants. @@ -166,8 +164,14 @@ where }; connector_integration.handle_response(req, None, response) } - common_enums::CallConnectorAction::UCSHandleResponse(transform_data_bytes) => { - handle_ucs_response(router_data, transform_data_bytes) + common_enums::CallConnectorAction::UCSConsumeResponse(_) + | common_enums::CallConnectorAction::UCSHandleResponse(_) => { + Err(ConnectorError::ProcessingStepFailed(Some( + "CallConnectorAction UCSHandleResponse/UCSConsumeResponse used in Direct gateway system flow. These actions are only valid in UCS gateway system" + .to_string() + .into(), + )) + .into()) } common_enums::CallConnectorAction::Avoid => Ok(router_data), common_enums::CallConnectorAction::StatusUpdate { @@ -445,69 +449,6 @@ where } } -/// Handle UCS webhook response processing -pub fn handle_ucs_response( - router_data: RouterData, - transform_data_bytes: Vec, -) -> CustomResult, ConnectorError> -where - T: Clone + Debug + 'static, - Req: Debug + Clone + 'static, - Resp: Debug + Clone + 'static, -{ - let webhook_transform_data: unified_connector_service::WebhookTransformData = - serde_json::from_slice(&transform_data_bytes) - .change_context(ConnectorError::ResponseDeserializationFailed) - .attach_printable("Failed to deserialize UCS webhook transform data")?; - - let webhook_content = webhook_transform_data - .webhook_content - .ok_or(ConnectorError::ResponseDeserializationFailed) - .attach_printable("UCS webhook transform data missing webhook_content")?; - - let payment_get_response = match webhook_content.content { - Some(unified_connector_service_client::payments::webhook_response_content::Content::PaymentsResponse(payments_response)) => { - Ok(payments_response) - }, - Some(unified_connector_service_client::payments::webhook_response_content::Content::RefundsResponse(_)) => { - Err(ConnectorError::ProcessingStepFailed(Some("UCS webhook contains refund response but payment processing was expected".to_string().into())).into()) - }, - Some(unified_connector_service_client::payments::webhook_response_content::Content::DisputesResponse(_)) => { - Err(ConnectorError::ProcessingStepFailed(Some("UCS webhook contains dispute response but payment processing was expected".to_string().into())).into()) - }, - Some(unified_connector_service_client::payments::webhook_response_content::Content::IncompleteTransformation(_)) => { - Err(ConnectorError::ProcessingStepFailed(Some("UCS webhook contains incomplete transformation but payment processing was expected".to_string().into())).into()) - }, - None => { - Err(ConnectorError::ResponseDeserializationFailed) - .attach_printable("UCS webhook content missing payments_response") - } - }?; - - let (router_data_response, status_code) = - unified_connector_service::handle_unified_connector_service_response_for_payment_get( - payment_get_response.clone(), - ) - .change_context(ConnectorError::ProcessingStepFailed(None)) - .attach_printable("Failed to process UCS webhook response using PSync handler")?; - - let mut updated_router_data = router_data; - let router_data_response = router_data_response.map(|(response, status)| { - updated_router_data.status = status; - response - }); - - let _ = router_data_response.map_err(|error_response| { - updated_router_data.response = Err(error_response); - }); - updated_router_data.raw_connector_response = payment_get_response - .raw_connector_response - .map(|raw_connector_response| raw_connector_response.expose().into()); - updated_router_data.connector_http_status_code = Some(status_code); - - Ok(updated_router_data) -} - /// Calls the connector API and handles the response #[instrument(skip_all)] pub async fn call_connector_api( diff --git a/crates/hyperswitch_interfaces/src/unified_connector_service.rs b/crates/hyperswitch_interfaces/src/unified_connector_service.rs index be8d6cc7c8..0547bd9980 100644 --- a/crates/hyperswitch_interfaces/src/unified_connector_service.rs +++ b/crates/hyperswitch_interfaces/src/unified_connector_service.rs @@ -1,5 +1,6 @@ use common_enums::AttemptStatus; use common_utils::errors::CustomResult; +use error_stack::ResultExt; use hyperswitch_domain_models::{ router_data::ErrorResponse, router_response_types::PaymentsResponseData, }; @@ -10,7 +11,9 @@ use crate::helpers::ForeignTryFrom; /// Unified Connector Service (UCS) related transformers pub mod transformers; -pub use transformers::WebhookTransformData; +pub use transformers::{ + UnifiedConnectorServiceError, WebhookTransformData, WebhookTransformationStatus, +}; /// Type alias for return type used by unified connector service response handlers type UnifiedConnectorServiceResult = CustomResult< @@ -18,7 +21,7 @@ type UnifiedConnectorServiceResult = CustomResult< Result<(PaymentsResponseData, AttemptStatus), ErrorResponse>, u16, ), - transformers::UnifiedConnectorServiceError, + UnifiedConnectorServiceError, >; #[allow(missing_docs)] @@ -32,3 +35,30 @@ pub fn handle_unified_connector_service_response_for_payment_get( Ok((router_data_response, status_code)) } + +/// Extracts the payments response from UCS webhook content +pub fn get_payments_response_from_ucs_webhook_content( + webhook_content: payments_grpc::WebhookResponseContent, +) -> CustomResult { + match webhook_content.content { + Some(unified_connector_service_client::payments::webhook_response_content::Content::PaymentsResponse(payments_response)) => { + Ok(payments_response) + }, + Some(unified_connector_service_client::payments::webhook_response_content::Content::RefundsResponse(_)) => { + Err(UnifiedConnectorServiceError::WebhookProcessingFailure) + .attach_printable("UCS webhook contains refunds response but payments response was expected")? + }, + Some(unified_connector_service_client::payments::webhook_response_content::Content::DisputesResponse(_)) => { + Err(UnifiedConnectorServiceError::WebhookProcessingFailure) + .attach_printable("UCS webhook contains disputes response but payments response was expected")? + }, + Some(unified_connector_service_client::payments::webhook_response_content::Content::IncompleteTransformation(_)) => { + Err(UnifiedConnectorServiceError::WebhookProcessingFailure) + .attach_printable("UCS webhook contains incomplete transformation but payments response was expected")? + }, + None => { + Err(UnifiedConnectorServiceError::WebhookProcessingFailure) + .attach_printable("Missing payments response in UCS webhook content")? + } + } +} diff --git a/crates/hyperswitch_interfaces/src/unified_connector_service/transformers.rs b/crates/hyperswitch_interfaces/src/unified_connector_service/transformers.rs index 128814f05a..1d9768fd53 100644 --- a/crates/hyperswitch_interfaces/src/unified_connector_service/transformers.rs +++ b/crates/hyperswitch_interfaces/src/unified_connector_service/transformers.rs @@ -16,6 +16,10 @@ pub enum UnifiedConnectorServiceError { #[error("Failed to encode unified connector service request")] RequestEncodingFailed, + /// Failed to process webhook from unified connector service. + #[error("Failed to process webhook from unified connector service")] + WebhookProcessingFailure, + /// Request encoding failed due to a specific reason. #[error("Request encoding failed : {0}")] RequestEncodingFailedWithReason(String), @@ -90,6 +94,15 @@ pub enum UnifiedConnectorServiceError { WebhookTransformFailure, } +/// UCS Webhook transformation status +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum WebhookTransformationStatus { + /// Transformation completed successfully, no further action needed + Complete, + /// Transformation incomplete, requires second call for final status + Incomplete, +} + #[allow(missing_docs)] /// Webhook transform data structure containing UCS response information #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -98,6 +111,7 @@ pub struct WebhookTransformData { pub source_verified: bool, pub webhook_content: Option, pub response_ref_id: Option, + pub webhook_transformation_status: WebhookTransformationStatus, } impl ForeignTryFrom diff --git a/crates/router/src/core/payments.rs b/crates/router/src/core/payments.rs index 813075ead3..2f42eb33ad 100644 --- a/crates/router/src/core/payments.rs +++ b/crates/router/src/core/payments.rs @@ -4285,18 +4285,14 @@ where merchant_context, &router_data, Some(payment_data), + call_connector_action.clone(), ) .await?; - let is_handle_response_action = matches!( - call_connector_action, - CallConnectorAction::UCSHandleResponse(_) | CallConnectorAction::HandleResponse(_) - ); - record_time_taken_with(|| async { - match (execution_path, is_handle_response_action) { - // Process through UCS when system is UCS and not handling response - (ExecutionPath::UnifiedConnectorService, false) => { + match execution_path { + // Process through UCS when system is UCS and not handling response or if it is a UCS webhook action + ExecutionPath::UnifiedConnectorService => { process_through_ucs( state, req_state, @@ -4304,6 +4300,7 @@ where operation, payment_data, customer, + call_connector_action, validate_result, schedule_time, header_payload, @@ -4317,7 +4314,7 @@ where } // Process through Direct with Shadow UCS - (ExecutionPath::ShadowUnifiedConnectorService, false) => { + ExecutionPath::ShadowUnifiedConnectorService => { process_through_direct_with_shadow_unified_connector_service( state, req_state, @@ -4342,9 +4339,7 @@ where } // Process through Direct gateway - (ExecutionPath::Direct, _) - | (ExecutionPath::UnifiedConnectorService, true) - | (ExecutionPath::ShadowUnifiedConnectorService, true) => { + ExecutionPath::Direct => { process_through_direct( state, req_state, @@ -4821,6 +4816,7 @@ where merchant_context, &router_data, Some(payment_data), + call_connector_action.clone(), ) .await?; @@ -4906,6 +4902,7 @@ where &connector, ExecutionMode::Primary, // UCS is called in primary mode merchant_order_reference_id, + call_connector_action, ) .await?; @@ -4962,6 +4959,7 @@ where merchant_context, &router_data, Some(payment_data), + call_connector_action.clone(), ) .await?; if matches!(execution_path, ExecutionPath::UnifiedConnectorService) { @@ -5018,6 +5016,7 @@ where &connector, ExecutionMode::Primary, //UCS is called in primary mode merchant_order_reference_id, + call_connector_action, ) .await?; diff --git a/crates/router/src/core/payments/flows.rs b/crates/router/src/core/payments/flows.rs index 2a7fb594fc..d2575306c7 100644 --- a/crates/router/src/core/payments/flows.rs +++ b/crates/router/src/core/payments/flows.rs @@ -249,6 +249,7 @@ pub trait Feature { _connector_data: &api::ConnectorData, _unified_connector_service_execution_mode: ExecutionMode, _merchant_order_reference_id: Option, + _call_connector_action: common_enums::CallConnectorAction, ) -> RouterResult<()> where F: Clone, diff --git a/crates/router/src/core/payments/flows/authorize_flow.rs b/crates/router/src/core/payments/flows/authorize_flow.rs index e47b17a3bc..4a42366335 100644 --- a/crates/router/src/core/payments/flows/authorize_flow.rs +++ b/crates/router/src/core/payments/flows/authorize_flow.rs @@ -542,6 +542,7 @@ impl Feature for types::PaymentsAu connector_data: &api::ConnectorData, unified_connector_service_execution_mode: enums::ExecutionMode, merchant_order_reference_id: Option, + _call_connector_action: common_enums::CallConnectorAction, ) -> RouterResult<()> { if self.request.mandate_id.is_some() { Box::pin(call_unified_connector_service_repeat_payment( diff --git a/crates/router/src/core/payments/flows/complete_authorize_flow.rs b/crates/router/src/core/payments/flows/complete_authorize_flow.rs index 944eb2bccf..6f72dfbc96 100644 --- a/crates/router/src/core/payments/flows/complete_authorize_flow.rs +++ b/crates/router/src/core/payments/flows/complete_authorize_flow.rs @@ -283,6 +283,7 @@ impl Feature _connector_data: &api::ConnectorData, _unified_connector_service_execution_mode: common_enums::ExecutionMode, _merchant_order_reference_id: Option, + _call_connector_action: common_enums::CallConnectorAction, ) -> RouterResult<()> { // Call UCS for Authorize flow Ok(()) diff --git a/crates/router/src/core/payments/flows/psync_flow.rs b/crates/router/src/core/payments/flows/psync_flow.rs index 661e980e10..9605457546 100644 --- a/crates/router/src/core/payments/flows/psync_flow.rs +++ b/crates/router/src/core/payments/flows/psync_flow.rs @@ -2,11 +2,14 @@ use std::{collections::HashMap, str::FromStr}; use async_trait::async_trait; use common_enums::{self, enums}; -use common_utils::{id_type, ucs_types}; +use common_utils::{id_type, types::MinorUnit, ucs_types}; use error_stack::ResultExt; use external_services::grpc_client; use hyperswitch_domain_models::payments as domain_payments; -use hyperswitch_interfaces::unified_connector_service::handle_unified_connector_service_response_for_payment_get; +use hyperswitch_interfaces::unified_connector_service::{ + get_payments_response_from_ucs_webhook_content, + handle_unified_connector_service_response_for_payment_get, +}; use unified_connector_service_client::payments as payments_grpc; use unified_connector_service_masking::ExposeInterface; @@ -237,75 +240,21 @@ impl Feature _connector_data: &api::ConnectorData, unified_connector_service_execution_mode: enums::ExecutionMode, merchant_order_reference_id: Option, + call_connector_action: common_enums::CallConnectorAction, ) -> RouterResult<()> { - let connector_name = self.connector.clone(); - let connector_enum = common_enums::connector_enums::Connector::from_str(&connector_name) - .change_context(ApiErrorResponse::IncorrectConnectorNameGiven)?; + match call_connector_action { + common_enums::CallConnectorAction::UCSConsumeResponse(transform_data_bytes) => { + let webhook_content: payments_grpc::WebhookResponseContent = + serde_json::from_slice(&transform_data_bytes) + .change_context(ApiErrorResponse::InternalServerError) + .attach_printable("Failed to deserialize UCS webhook transform data")?; - let is_ucs_psync_disabled = state - .conf - .grpc_client - .unified_connector_service - .as_ref() - .is_some_and(|config| { - config - .ucs_psync_disabled_connectors - .contains(&connector_enum) - }); - - if is_ucs_psync_disabled { - logger::info!( - "UCS PSync call disabled for connector: {}, skipping UCS call", - connector_name - ); - return Ok(()); - } - - let client = state - .grpc_client - .unified_connector_service_client - .clone() - .ok_or(ApiErrorResponse::InternalServerError) - .attach_printable("Failed to fetch Unified Connector Service client")?; - - let payment_get_request = payments_grpc::PaymentServiceGetRequest::foreign_try_from(&*self) - .change_context(ApiErrorResponse::InternalServerError) - .attach_printable("Failed to construct Payment Get Request")?; - - let connector_auth_metadata = build_unified_connector_service_auth_metadata( - merchant_connector_account, - merchant_context, - ) - .change_context(ApiErrorResponse::InternalServerError) - .attach_printable("Failed to construct request metadata")?; - let merchant_reference_id = header_payload - .x_reference_id - .clone() - .or(merchant_order_reference_id) - .map(|id| id_type::PaymentReferenceId::from_str(id.as_str())) - .transpose() - .inspect_err(|err| logger::warn!(error=?err, "Invalid Merchant ReferenceId found")) - .ok() - .flatten() - .map(ucs_types::UcsReferenceId::Payment); - let header_payload = state - .get_grpc_headers_ucs(unified_connector_service_execution_mode) - .external_vault_proxy_metadata(None) - .merchant_reference_id(merchant_reference_id) - .lineage_ids(lineage_ids); - let updated_router_data = Box::pin(ucs_logging_wrapper( - self.clone(), - state, - payment_get_request, - header_payload, - |mut router_data, payment_get_request, grpc_headers| async move { - let response = client - .payment_get(payment_get_request, connector_auth_metadata, grpc_headers) - .await - .change_context(ApiErrorResponse::InternalServerError) - .attach_printable("Failed to get payment")?; - - let payment_get_response = response.into_inner(); + let payment_get_response = + get_payments_response_from_ucs_webhook_content(webhook_content) + .change_context(ApiErrorResponse::WebhookProcessingFailure) + .attach_printable( + "Failed to construct payments response from UCS webhook content", + )?; let (router_data_response, status_code) = handle_unified_connector_service_response_for_payment_get( @@ -315,23 +264,137 @@ impl Feature .attach_printable("Failed to deserialize UCS response")?; let router_data_response = router_data_response.map(|(response, status)| { - router_data.status = status; + self.status = status; response }); - router_data.response = router_data_response; - router_data.raw_connector_response = payment_get_response + self.response = router_data_response; + self.amount_captured = payment_get_response.captured_amount; + self.minor_amount_captured = payment_get_response + .minor_captured_amount + .map(MinorUnit::new); + self.raw_connector_response = payment_get_response .raw_connector_response .clone() .map(|raw_connector_response| raw_connector_response.expose().into()); - router_data.connector_http_status_code = Some(status_code); + self.connector_http_status_code = Some(status_code); + } + common_enums::CallConnectorAction::UCSHandleResponse(_) + | common_enums::CallConnectorAction::Trigger => { + let connector_name = self.connector.clone(); + let connector_enum = + common_enums::connector_enums::Connector::from_str(&connector_name) + .change_context(ApiErrorResponse::IncorrectConnectorNameGiven)?; - Ok((router_data, payment_get_response)) - }, - )) - .await?; + let is_ucs_psync_disabled = state + .conf + .grpc_client + .unified_connector_service + .as_ref() + .is_some_and(|config| { + config + .ucs_psync_disabled_connectors + .contains(&connector_enum) + }); - // Copy back the updated data - *self = updated_router_data; + if is_ucs_psync_disabled { + logger::info!( + "UCS PSync call disabled for connector: {}, skipping UCS call", + connector_name + ); + return Ok(()); + } + + let client = state + .grpc_client + .unified_connector_service_client + .clone() + .ok_or(ApiErrorResponse::InternalServerError) + .attach_printable("Failed to fetch Unified Connector Service client")?; + + let payment_get_request = + payments_grpc::PaymentServiceGetRequest::foreign_try_from(( + &*self, + call_connector_action, + )) + .change_context(ApiErrorResponse::InternalServerError) + .attach_printable("Failed to construct Payment Get Request")?; + + let connector_auth_metadata = build_unified_connector_service_auth_metadata( + merchant_connector_account, + merchant_context, + ) + .change_context(ApiErrorResponse::InternalServerError) + .attach_printable("Failed to construct request metadata")?; + let merchant_reference_id = header_payload + .x_reference_id + .clone() + .or(merchant_order_reference_id) + .map(|id| id_type::PaymentReferenceId::from_str(id.as_str())) + .transpose() + .inspect_err( + |err| logger::warn!(error=?err, "Invalid Merchant ReferenceId found"), + ) + .ok() + .flatten() + .map(ucs_types::UcsReferenceId::Payment); + let header_payload = state + .get_grpc_headers_ucs(unified_connector_service_execution_mode) + .external_vault_proxy_metadata(None) + .merchant_reference_id(merchant_reference_id) + .lineage_ids(lineage_ids); + let updated_router_data = Box::pin(ucs_logging_wrapper( + self.clone(), + state, + payment_get_request, + header_payload, + |mut router_data, payment_get_request, grpc_headers| async move { + let response = client + .payment_get(payment_get_request, connector_auth_metadata, grpc_headers) + .await + .change_context(ApiErrorResponse::InternalServerError) + .attach_printable("Failed to get payment")?; + + let payment_get_response = response.into_inner(); + + let (router_data_response, status_code) = + handle_unified_connector_service_response_for_payment_get( + payment_get_response.clone(), + ) + .change_context(ApiErrorResponse::InternalServerError) + .attach_printable("Failed to deserialize UCS response")?; + + let router_data_response = + router_data_response.map(|(response, status)| { + router_data.status = status; + response + }); + router_data.response = router_data_response; + router_data.amount_captured = payment_get_response.captured_amount; + router_data.minor_amount_captured = payment_get_response + .minor_captured_amount + .map(MinorUnit::new); + router_data.raw_connector_response = payment_get_response + .raw_connector_response + .clone() + .map(|raw_connector_response| raw_connector_response.expose().into()); + router_data.connector_http_status_code = Some(status_code); + + Ok((router_data, payment_get_response)) + }, + )) + .await?; + + // Copy back the updated data + *self = updated_router_data; + } + common_enums::CallConnectorAction::HandleResponse(_) + | common_enums::CallConnectorAction::Avoid + | common_enums::CallConnectorAction::StatusUpdate { .. } => { + Err(ApiErrorResponse::InternalServerError).attach_printable( + "Invalid CallConnectorAction for payment sync via UCS Gateway system", + )? + } + } Ok(()) } } diff --git a/crates/router/src/core/payments/flows/setup_mandate_flow.rs b/crates/router/src/core/payments/flows/setup_mandate_flow.rs index 601cf14540..752441bc44 100644 --- a/crates/router/src/core/payments/flows/setup_mandate_flow.rs +++ b/crates/router/src/core/payments/flows/setup_mandate_flow.rs @@ -289,6 +289,7 @@ impl Feature for types::Setup _connector_data: &api::ConnectorData, unified_connector_service_execution_mode: enums::ExecutionMode, merchant_order_reference_id: Option, + _call_connector_action: common_enums::CallConnectorAction, ) -> RouterResult<()> { let client = state .grpc_client diff --git a/crates/router/src/core/payments/helpers.rs b/crates/router/src/core/payments/helpers.rs index 29bb45be48..bc3ef871b0 100644 --- a/crates/router/src/core/payments/helpers.rs +++ b/crates/router/src/core/payments/helpers.rs @@ -7867,6 +7867,7 @@ pub async fn process_through_ucs<'a, F, RouterDReq, ApiRequest, D>( operation: &'a BoxedOperation<'a, F, ApiRequest, D>, payment_data: &'a mut D, customer: &Option, + call_connector_action: CallConnectorAction, validate_result: &'a OperationsValidateResult, schedule_time: Option, header_payload: domain_payments::HeaderPayload, @@ -7968,6 +7969,7 @@ where connector_data, ExecutionMode::Primary, // UCS is called in primary mode merchant_order_reference_id, + call_connector_action, ) .await?; } @@ -8125,7 +8127,7 @@ where operation, payment_data, customer, - call_connector_action, + call_connector_action.clone(), validate_result, schedule_time, header_payload, @@ -8152,6 +8154,7 @@ where &connector, unified_connector_service_merchant_context, unified_connector_service_merchant_order_reference_id, + call_connector_action, ) .await }); @@ -8173,6 +8176,7 @@ pub async fn execute_shadow_unified_connector_service_call( connector_data: &api::ConnectorData, merchant_context: domain::MerchantContext, merchant_order_reference_id: Option, + call_connector_action: CallConnectorAction, ) where F: Send + Clone + Sync + 'static, RouterDReq: Send + Sync + Clone + 'static + Serialize, @@ -8191,6 +8195,7 @@ pub async fn execute_shadow_unified_connector_service_call( connector_data, ExecutionMode::Shadow, // Shadow mode for UCS merchant_order_reference_id, + call_connector_action, ) .await .map_err(|e| router_env::logger::debug!("Shadow UCS call failed: {:?}", e)); diff --git a/crates/router/src/core/unified_connector_service.rs b/crates/router/src/core/unified_connector_service.rs index f964df75c1..32155400fd 100644 --- a/crates/router/src/core/unified_connector_service.rs +++ b/crates/router/src/core/unified_connector_service.rs @@ -4,8 +4,9 @@ use api_models::admin; #[cfg(feature = "v2")] use base64::Engine; use common_enums::{ - connector_enums::Connector, AttemptStatus, ConnectorIntegrationType, ExecutionMode, - ExecutionPath, GatewaySystem, PaymentMethodType, ShadowRolloutAvailability, UcsAvailability, + connector_enums::Connector, AttemptStatus, CallConnectorAction, ConnectorIntegrationType, + ExecutionMode, ExecutionPath, GatewaySystem, PaymentMethodType, ShadowRolloutAvailability, + UcsAvailability, }; #[cfg(feature = "v2")] use common_utils::consts::BASE64_ENGINE; @@ -65,7 +66,7 @@ use crate::{ pub mod transformers; // Re-export webhook transformer types for easier access -pub use transformers::WebhookTransformData; +pub use transformers::{WebhookTransformData, WebhookTransformationStatus}; /// Type alias for return type used by unified connector service response handlers type UnifiedConnectorServiceResult = CustomResult< @@ -144,6 +145,7 @@ pub async fn should_call_unified_connector_service( merchant_context: &MerchantContext, router_data: &RouterData, payment_data: Option<&D>, + call_connector_action: CallConnectorAction, ) -> RouterResult where D: OperationSessionGetters, @@ -192,15 +194,47 @@ where // Single decision point using pattern matching let (gateway_system, execution_path) = if ucs_availability == UcsAvailability::Disabled { - router_env::logger::debug!("UCS is disabled, using Direct gateway"); - (GatewaySystem::Direct, ExecutionPath::Direct) + match call_connector_action { + CallConnectorAction::UCSConsumeResponse(_) + | CallConnectorAction::UCSHandleResponse(_) => { + Err(errors::ApiErrorResponse::InternalServerError) + .attach_printable("CallConnectorAction UCSHandleResponse/UCSConsumeResponse received but UCS is disabled. These actions are only valid in UCS gateway")? + } + CallConnectorAction::Avoid + | CallConnectorAction::Trigger + | CallConnectorAction::HandleResponse(_) + | CallConnectorAction::StatusUpdate { .. } => { + router_env::logger::debug!("UCS is disabled, using Direct gateway"); + (GatewaySystem::Direct, ExecutionPath::Direct) + } + } } else { - // UCS is enabled, call decide function - decide_execution_path( - connector_integration_type, - previous_gateway, - shadow_rollout_availability, - )? + match call_connector_action { + CallConnectorAction::UCSConsumeResponse(_) + | CallConnectorAction::UCSHandleResponse(_) => { + router_env::logger::info!("CallConnectorAction UCSHandleResponse/UCSConsumeResponse received, using UCS gateway"); + ( + GatewaySystem::UnifiedConnectorService, + ExecutionPath::UnifiedConnectorService, + ) + } + CallConnectorAction::HandleResponse(_) => { + router_env::logger::info!( + "CallConnectorAction HandleResponse received, using Direct gateway" + ); + (GatewaySystem::Direct, ExecutionPath::Direct) + } + CallConnectorAction::Trigger + | CallConnectorAction::Avoid + | CallConnectorAction::StatusUpdate { .. } => { + // UCS is enabled, call decide function + decide_execution_path( + connector_integration_type, + previous_gateway, + shadow_rollout_availability, + )? + } + } }; router_env::logger::info!( diff --git a/crates/router/src/core/unified_connector_service/transformers.rs b/crates/router/src/core/unified_connector_service/transformers.rs index 5957c3cc08..becd2b1a42 100644 --- a/crates/router/src/core/unified_connector_service/transformers.rs +++ b/crates/router/src/core/unified_connector_service/transformers.rs @@ -21,6 +21,7 @@ pub use hyperswitch_interfaces::{ helpers::ForeignTryFrom, unified_connector_service::{ transformers::convert_connector_service_status_code, WebhookTransformData, + WebhookTransformationStatus, }, }; use masking::{ExposeInterface, PeekInterface}; @@ -35,13 +36,19 @@ use crate::{ core::{errors, unified_connector_service}, types::transformers, }; -impl transformers::ForeignTryFrom<&RouterData> - for payments_grpc::PaymentServiceGetRequest +impl + transformers::ForeignTryFrom<( + &RouterData, + common_enums::CallConnectorAction, + )> for payments_grpc::PaymentServiceGetRequest { type Error = error_stack::Report; fn foreign_try_from( - router_data: &RouterData, + (router_data, call_connector_action): ( + &RouterData, + common_enums::CallConnectorAction, + ), ) -> Result { let connector_transaction_id = router_data .request @@ -79,12 +86,32 @@ impl transformers::ForeignTryFrom<&RouterData Some(res), + common_enums::CallConnectorAction::Trigger => None, + common_enums::CallConnectorAction::HandleResponse(_) + | common_enums::CallConnectorAction::UCSConsumeResponse(_) + | common_enums::CallConnectorAction::Avoid + | common_enums::CallConnectorAction::StatusUpdate { .. } => Err( + UnifiedConnectorServiceError::RequestEncodingFailedWithReason( + "Invalid CallConnectorAction for payment sync call via UCS Gateway system" + .to_string(), + ), + )?, + }; + + let capture_method = router_data + .request + .capture_method + .map(payments_grpc::CaptureMethod::foreign_try_from) + .transpose()?; + Ok(Self { transaction_id: connector_transaction_id.or(encoded_data), request_ref_id: connector_ref_id, + capture_method: capture_method.map(|capture_method| capture_method.into()), + handle_response, access_token: None, - capture_method: None, - handle_response: None, amount: router_data.request.amount.get_amount_as_i64(), currency: currency.into(), }) @@ -206,7 +233,7 @@ impl .collect::>() }) .unwrap_or_default(), - test_mode: None, + test_mode: router_data.test_mode, connector_customer_id: router_data.connector_customer.clone(), }) } @@ -1165,6 +1192,15 @@ pub fn transform_ucs_webhook_response( let event_type = api_models::webhooks::IncomingWebhookEvent::from_ucs_event_type(response.event_type); + let webhook_transformation_status = if matches!( + response.transformation_status(), + payments_grpc::WebhookTransformationStatus::Incomplete + ) { + WebhookTransformationStatus::Incomplete + } else { + WebhookTransformationStatus::Complete + }; + Ok(WebhookTransformData { event_type, source_verified: response.source_verified, @@ -1176,6 +1212,7 @@ pub fn transform_ucs_webhook_response( payments_grpc::identifier::IdType::NoResponseIdMarker(_) => None, }) }), + webhook_transformation_status, }) } diff --git a/crates/router/src/core/webhooks/incoming.rs b/crates/router/src/core/webhooks/incoming.rs index bc2e702867..65abf6b2f8 100644 --- a/crates/router/src/core/webhooks/incoming.rs +++ b/crates/router/src/core/webhooks/incoming.rs @@ -25,6 +25,7 @@ use hyperswitch_domain_models::{ use hyperswitch_interfaces::webhooks::{IncomingWebhookFlowError, IncomingWebhookRequestDetails}; use masking::{ExposeInterface, PeekInterface}; use router_env::{instrument, tracing, tracing_actix_web::RequestId}; +use unified_connector_service_client::payments as payments_grpc; use super::{types, utils, MERCHANT_ID}; use crate::{ @@ -492,25 +493,50 @@ async fn process_non_ucs_webhook( } } +/// Extract resource object from UCS WebhookResponseContent +fn get_ucs_webhook_resource_object( + webhook_response_content: &payments_grpc::WebhookResponseContent, +) -> errors::RouterResult> { + let resource_object = match &webhook_response_content.content { + Some(payments_grpc::webhook_response_content::Content::IncompleteTransformation( + incomplete_transformation_response, + )) => { + // Deserialize resource object + serde_json::from_slice::( + &incomplete_transformation_response.resource_object, + ) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to deserialize resource object from UCS webhook response")? + } + _ => { + // Convert UCS webhook content to appropriate format + serde_json::to_value(webhook_response_content) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to serialize UCS webhook content")? + } + }; + Ok(Box::new(resource_object)) +} + /// Extract webhook event object based on transform data availability fn extract_webhook_event_object( - transform_data: &Option>, + webhook_transform_data: &Option>, connector: &ConnectorEnum, request_details: &IncomingWebhookRequestDetails<'_>, ) -> errors::RouterResult> { - match transform_data { - Some(transform_data) => match &transform_data.webhook_content { - Some(webhook_content) => { - let serialized_value = serde_json::to_value(webhook_content) - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Failed to serialize UCS webhook content")?; - Ok(Box::new(serialized_value)) - } - None => connector - .get_webhook_resource_object(request_details) - .switch() - .attach_printable("Could not find resource object in incoming webhook body"), - }, + match webhook_transform_data { + Some(webhook_transform_data) => webhook_transform_data + .webhook_content + .as_ref() + .map(|webhook_response_content| { + get_ucs_webhook_resource_object(webhook_response_content) + }) + .unwrap_or_else(|| { + connector + .get_webhook_resource_object(request_details) + .switch() + .attach_printable("Could not find resource object in incoming webhook body") + }), None => connector .get_webhook_resource_object(request_details) .switch() @@ -633,30 +659,31 @@ async fn process_webhook_business_logic( logger::info!(source_verified=?source_verified); - let event_object: Box = - if let Some(transform_data) = webhook_transform_data { - // Use UCS transform data if available - if let Some(webhook_content) = &transform_data.webhook_content { - // Convert UCS webhook content to appropriate format - Box::new( - serde_json::to_value(webhook_content) - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Failed to serialize UCS webhook content")?, - ) - } else { - // Fall back to connector extraction - connector - .get_webhook_resource_object(request_details) - .switch() - .attach_printable("Could not find resource object in incoming webhook body")? - } - } else { + let event_object: Box = match webhook_transform_data { + Some(webhook_transform_data) => { + // Extract resource_object from UCS webhook content + webhook_transform_data + .webhook_content + .as_ref() + .map(|webhook_response_content| { + get_ucs_webhook_resource_object(webhook_response_content) + }) + .unwrap_or_else(|| { + // Fall back to connector extraction + connector + .get_webhook_resource_object(request_details) + .switch() + .attach_printable("Could not find resource object in incoming webhook body") + })? + } + None => { // Use traditional connector extraction connector .get_webhook_resource_object(request_details) .switch() .attach_printable("Could not find resource object in incoming webhook body")? - }; + } + }; let webhook_details = api::IncomingWebhookDetails { object_reference_id: object_ref_id.clone(), @@ -1006,11 +1033,16 @@ async fn payments_incoming_webhook_flow( match webhook_transform_data.as_ref() { Some(transform_data) => { - // Serialize the transform data to pass to UCS handler - let transform_data_bytes = serde_json::to_vec(transform_data.as_ref()) - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Failed to serialize UCS webhook transform data")?; - payments::CallConnectorAction::UCSHandleResponse(transform_data_bytes) + match transform_data.webhook_transformation_status { + unified_connector_service::WebhookTransformationStatus::Complete => { + // Consume response from UCS + payments::CallConnectorAction::UCSConsumeResponse(resource_object) + } + unified_connector_service::WebhookTransformationStatus::Incomplete => { + // Make a second call to UCS + payments::CallConnectorAction::UCSHandleResponse(resource_object) + } + } } None => payments::CallConnectorAction::HandleResponse(resource_object), } diff --git a/crates/router/src/services/api.rs b/crates/router/src/services/api.rs index 50c7c6471f..c4cbb1ccdb 100644 --- a/crates/router/src/services/api.rs +++ b/crates/router/src/services/api.rs @@ -42,7 +42,7 @@ pub use hyperswitch_interfaces::{ }, api_client::{ call_connector_api, execute_connector_processing_step, handle_response, - handle_ucs_response, store_raw_connector_response_if_required, + store_raw_connector_response_if_required, }, connector_integration_v2::{ BoxedConnectorIntegrationV2, ConnectorIntegrationAnyV2, ConnectorIntegrationV2,