From 8044aa1c48e46ac3cd3a7a1397e6eaecbe78b2df Mon Sep 17 00:00:00 2001 From: Amitsingh Tanwar <126856945+AmitsinghTanwar007@users.noreply.github.com> Date: Thu, 16 Oct 2025 17:30:56 +0530 Subject: [PATCH] =?UTF-8?q?refactor:=20Improve=20readability=20and=20maint?= =?UTF-8?q?ainability=20of=20Unified=20Connector=20Service=E2=80=93related?= =?UTF-8?q?=20code=20(#9820)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> --- config/development.toml | 2 +- crates/common_enums/src/enums.rs | 103 ++++ crates/router/src/core/payments.rs | 505 +++--------------- crates/router/src/core/payments/helpers.rs | 380 ++++++++++++- crates/router/src/core/payments/retry.rs | 8 +- .../src/core/unified_connector_service.rs | 416 +++++++-------- 6 files changed, 753 insertions(+), 661 deletions(-) diff --git a/config/development.toml b/config/development.toml index 38d54e4a1b..ddaacf2a27 100644 --- a/config/development.toml +++ b/config/development.toml @@ -1424,4 +1424,4 @@ token = "123456" # Superposition token org_id = "localorg" # Organization ID in Superposition workspace_id = "dev" # Workspace ID in Superposition polling_interval = 15 # Polling interval in seconds for configuration updates -# request_timeout = # Request timeout in seconds for Superposition API calls (optional, default: none) +# request_timeout = # Request timeout in seconds for Superposition API calls (optional, default: none) \ No newline at end of file diff --git a/crates/common_enums/src/enums.rs b/crates/common_enums/src/enums.rs index e74335fa73..ed01e66e4a 100644 --- a/crates/common_enums/src/enums.rs +++ b/crates/common_enums/src/enums.rs @@ -2301,9 +2301,87 @@ pub enum GatewaySystem { #[default] Direct, UnifiedConnectorService, +} + +#[derive( + Clone, + Copy, + Debug, + Default, + Eq, + PartialOrd, + Ord, + Hash, + PartialEq, + serde::Deserialize, + serde::Serialize, + strum::Display, + strum::VariantNames, + strum::EnumIter, + strum::EnumString, + ToSchema, +)] +#[router_derive::diesel_enum(storage_type = "text")] +#[serde(rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] +/// Indicates the execution path through which the payment is processed. +pub enum ExecutionPath { + #[default] + Direct, + UnifiedConnectorService, ShadowUnifiedConnectorService, } +#[derive( + Clone, + Copy, + Debug, + Eq, + PartialOrd, + Ord, + Hash, + PartialEq, + serde::Deserialize, + serde::Serialize, + strum::Display, + strum::VariantNames, + strum::EnumIter, + strum::EnumString, + ToSchema, +)] +#[router_derive::diesel_enum(storage_type = "text")] +#[serde(rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] +pub enum ShadowRolloutAvailability { + IsAvailable, + NotAvailable, +} + +#[derive( + Clone, + Copy, + Debug, + Eq, + PartialOrd, + Ord, + Hash, + PartialEq, + serde::Deserialize, + serde::Serialize, + strum::Display, + strum::VariantNames, + strum::EnumIter, + strum::EnumString, + ToSchema, +)] +#[router_derive::diesel_enum(storage_type = "text")] +#[serde(rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] +pub enum UcsAvailability { + Enabled, + Disabled, +} + #[derive( Clone, Copy, @@ -2330,6 +2408,31 @@ pub enum ExecutionMode { Shadow, } +#[derive( + Clone, + Copy, + Debug, + Eq, + PartialOrd, + Ord, + Hash, + PartialEq, + serde::Deserialize, + serde::Serialize, + strum::Display, + strum::VariantNames, + strum::EnumIter, + strum::EnumString, + ToSchema, +)] +#[router_derive::diesel_enum(storage_type = "text")] +#[serde(rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] +pub enum ConnectorIntegrationType { + UcsConnector, + DirectConnector, +} + /// The type of the payment that differentiates between normal and various types of mandate payments. Use 'setup_mandate' in case of zero auth flow. #[derive( Clone, diff --git a/crates/router/src/core/payments.rs b/crates/router/src/core/payments.rs index 0fbf9927b3..7f80bcf050 100644 --- a/crates/router/src/core/payments.rs +++ b/crates/router/src/core/payments.rs @@ -22,6 +22,7 @@ use std::{ vec::IntoIter, }; +#[cfg(feature = "v2")] use external_services::grpc_client; #[cfg(feature = "v2")] @@ -36,7 +37,7 @@ use api_models::{ mandates::RecurringDetails, payments::{self as payments_api}, }; -pub use common_enums::enums::{CallConnectorAction, ExecutionMode, GatewaySystem}; +pub use common_enums::enums::{CallConnectorAction, ExecutionMode, ExecutionPath, GatewaySystem}; use common_types::payments as common_payments_types; use common_utils::{ ext_traits::{AsyncExt, StringExt}, @@ -93,15 +94,9 @@ use self::{ operations::{BoxedOperation, Operation, PaymentResponse}, routing::{self as self_routing, SessionFlowRoutingInput}, }; -#[cfg(feature = "v1")] -use super::unified_connector_service::update_gateway_system_in_feature_metadata; use super::{ - errors::StorageErrorExt, - payment_methods::surcharge_decision_configs, - routing::TransactionData, - unified_connector_service::{ - send_comparison_data, should_call_unified_connector_service, ComparisonData, - }, + errors::StorageErrorExt, payment_methods::surcharge_decision_configs, routing::TransactionData, + unified_connector_service::should_call_unified_connector_service, }; #[cfg(feature = "v1")] use crate::core::blocklist::utils as blocklist_utils; @@ -112,6 +107,11 @@ use crate::core::fraud_check as frm_core; #[cfg(feature = "v2")] use crate::core::payment_methods::vault; #[cfg(feature = "v1")] +use crate::core::payments::helpers::{ + process_through_direct, process_through_direct_with_shadow_unified_connector_service, + process_through_ucs, +}; +#[cfg(feature = "v1")] use crate::core::routing::helpers as routing_helpers; #[cfg(all(feature = "v1", feature = "dynamic_routing"))] use crate::types::api::convert_connector_data_to_routable_connectors; @@ -126,7 +126,7 @@ use crate::{ payouts, routing::{self as core_routing}, unified_authentication_service::types::{ClickToPay, UnifiedAuthenticationService}, - utils::{self as core_utils}, + utils as core_utils, }, db::StorageInterface, logger, @@ -553,7 +553,7 @@ pub async fn payments_operation_core<'a, F, Req, Op, FData, D>( header_payload: HeaderPayload, ) -> RouterResult<(D, Req, Option, Option, Option)> where - F: Send + Clone + Sync + 'static + 'a, + F: Send + Clone + Sync + 'static, Req: Authenticate + Clone, Op: Operation + Send + Sync, D: OperationSessionGetters + OperationSessionSetters + Send + Sync + Clone, @@ -568,7 +568,7 @@ where // To perform router related operation for PaymentResponse PaymentResponse: Operation, - FData: Send + Sync + Clone + router_types::Capturable + 'static + 'a + serde::Serialize, + FData: Send + Sync + Clone + router_types::Capturable + 'static + serde::Serialize, { let operation: BoxedOperation<'_, F, Req, D> = Box::new(operation); @@ -2106,7 +2106,7 @@ pub async fn call_surcharge_decision_management_for_session_flow( #[cfg(feature = "v1")] #[allow(clippy::too_many_arguments)] -pub async fn payments_core<'a, F, Res, Req, Op, FData, D>( +pub async fn payments_core( state: SessionState, req_state: ReqState, merchant_context: domain::MerchantContext, @@ -2119,8 +2119,8 @@ pub async fn payments_core<'a, F, Res, Req, Op, FData, D>( header_payload: HeaderPayload, ) -> RouterResponse where - F: Send + Clone + Sync + 'static + 'a, - FData: Send + Sync + Clone + router_types::Capturable + 'static + 'a + serde::Serialize, + F: Send + Clone + Sync + 'static, + FData: Send + Sync + Clone + router_types::Capturable + 'static + serde::Serialize, Op: Operation + Send + Sync + Clone, Req: Debug + Authenticate + Clone, D: OperationSessionGetters + OperationSessionSetters + Send + Sync + Clone, @@ -4265,8 +4265,8 @@ pub async fn decide_unified_connector_service_call<'a, F, RouterDReq, ApiRequest helpers::MerchantConnectorAccountType, )> where - F: Send + Clone + Sync + 'static + 'a, - RouterDReq: Send + Sync + Clone + 'static + 'a + serde::Serialize, + F: Send + Clone + Sync + 'static, + RouterDReq: Send + Sync + Clone + 'static + serde::Serialize, // To create connector flow specific interface data D: OperationSessionGetters + OperationSessionSetters + Send + Sync + Clone, @@ -4293,7 +4293,7 @@ where record_time_taken_with(|| async { match (execution_path, is_handle_response_action) { // Process through UCS when system is UCS and not handling response - (GatewaySystem::UnifiedConnectorService, false) => { + (ExecutionPath::UnifiedConnectorService, false) => { process_through_ucs( state, req_state, @@ -4312,9 +4312,9 @@ where .await } - // Process through Direct gateway - (GatewaySystem::Direct, _) | (GatewaySystem::UnifiedConnectorService, true) => { - process_through_direct( + // Process through Direct with Shadow UCS + (ExecutionPath::ShadowUnifiedConnectorService, false) => { + process_through_direct_with_shadow_unified_connector_service( state, req_state, merchant_context, @@ -4337,9 +4337,11 @@ where .await } - // Process through Direct with Shadow UCS - (GatewaySystem::ShadowUnifiedConnectorService, _) => { - process_through_direct_with_shadow_unified_connector_service( + // Process through Direct gateway + (ExecutionPath::Direct, _) + | (ExecutionPath::UnifiedConnectorService, true) + | (ExecutionPath::ShadowUnifiedConnectorService, true) => { + process_through_direct( state, req_state, merchant_context, @@ -4366,374 +4368,6 @@ where .await } -#[cfg(feature = "v1")] -#[allow(clippy::too_many_arguments)] -#[instrument(skip_all)] -// Helper function to process through UCS -async fn process_through_ucs<'a, F, RouterDReq, ApiRequest, D>( - state: &'a SessionState, - req_state: ReqState, - merchant_context: &'a domain::MerchantContext, - operation: &'a BoxedOperation<'a, F, ApiRequest, D>, - payment_data: &'a mut D, - customer: &Option, - validate_result: &'a operations::ValidateResult, - schedule_time: Option, - header_payload: HeaderPayload, - frm_suggestion: Option, - business_profile: &'a domain::Profile, - merchant_connector_account: helpers::MerchantConnectorAccountType, - mut router_data: RouterData, -) -> RouterResult<( - RouterData, - helpers::MerchantConnectorAccountType, -)> -where - F: Send + Clone + Sync + 'static + 'a, - RouterDReq: Send + Sync + Clone + 'static + 'a + serde::Serialize, - D: OperationSessionGetters + OperationSessionSetters + Send + Sync + Clone, - D: ConstructFlowSpecificData, - RouterData: - Feature + Send + Clone + serde::Serialize, - dyn api::Connector: - services::api::ConnectorIntegration, -{ - router_env::logger::info!( - "Processing payment through UCS gateway system - payment_id={}, attempt_id={}", - payment_data - .get_payment_intent() - .payment_id - .get_string_repr(), - payment_data.get_payment_attempt().attempt_id - ); - - // Add task to process tracker if needed - if should_add_task_to_process_tracker(payment_data) { - operation - .to_domain()? - .add_task_to_process_tracker( - state, - payment_data.get_payment_attempt(), - validate_result.requeue, - schedule_time, - ) - .await - .map_err(|error| logger::error!(process_tracker_error=?error)) - .ok(); - } - - // Update feature metadata to track UCS usage for stickiness - update_gateway_system_in_feature_metadata( - payment_data, - GatewaySystem::UnifiedConnectorService, - )?; - - // Update trackers - (_, *payment_data) = operation - .to_update_tracker()? - .update_trackers( - state, - req_state, - payment_data.clone(), - customer.clone(), - merchant_context.get_merchant_account().storage_scheme, - None, - merchant_context.get_merchant_key_store(), - frm_suggestion, - header_payload.clone(), - ) - .await?; - - // Call UCS - let lineage_ids = grpc_client::LineageIds::new( - business_profile.merchant_id.clone(), - business_profile.get_id().clone(), - ); - - router_data - .call_unified_connector_service( - state, - &header_payload, - lineage_ids, - merchant_connector_account.clone(), - merchant_context, - ExecutionMode::Primary, // UCS is called in primary mode - ) - .await?; - - Ok((router_data, merchant_connector_account)) -} - -#[cfg(feature = "v1")] -#[allow(clippy::too_many_arguments)] -#[instrument(skip_all)] -// Helper function to process through Direct gateway -async fn process_through_direct<'a, F, RouterDReq, ApiRequest, D>( - state: &'a SessionState, - req_state: ReqState, - merchant_context: &'a domain::MerchantContext, - connector: api::ConnectorData, - operation: &'a BoxedOperation<'a, F, ApiRequest, D>, - payment_data: &'a mut D, - customer: &Option, - call_connector_action: CallConnectorAction, - validate_result: &'a operations::ValidateResult, - schedule_time: Option, - header_payload: HeaderPayload, - frm_suggestion: Option, - business_profile: &'a domain::Profile, - is_retry_payment: bool, - all_keys_required: Option, - merchant_connector_account: helpers::MerchantConnectorAccountType, - router_data: RouterData, - tokenization_action: TokenizationAction, -) -> RouterResult<( - RouterData, - helpers::MerchantConnectorAccountType, -)> -where - F: Send + Clone + Sync + 'static + 'a, - RouterDReq: Send + Sync + Clone + 'static + 'a + serde::Serialize, - D: OperationSessionGetters + OperationSessionSetters + Send + Sync + Clone, - D: ConstructFlowSpecificData, - RouterData: - Feature + Send + Clone + serde::Serialize, - dyn api::Connector: - services::api::ConnectorIntegration, -{ - router_env::logger::info!( - "Processing payment through Direct gateway system - payment_id={}, attempt_id={}", - payment_data - .get_payment_intent() - .payment_id - .get_string_repr(), - payment_data.get_payment_attempt().attempt_id - ); - - // Update feature metadata to track Direct routing usage for stickiness - update_gateway_system_in_feature_metadata(payment_data, GatewaySystem::Direct)?; - - call_connector_service( - state, - req_state, - merchant_context, - connector, - operation, - payment_data, - customer, - call_connector_action, - validate_result, - schedule_time, - header_payload, - frm_suggestion, - business_profile, - is_retry_payment, - all_keys_required, - merchant_connector_account, - router_data, - tokenization_action, - ) - .await -} - -#[cfg(feature = "v1")] -#[allow(clippy::too_many_arguments)] -#[instrument(skip_all)] -// Helper function to process through Direct with Shadow UCS -async fn process_through_direct_with_shadow_unified_connector_service< - 'a, - F, - RouterDReq, - ApiRequest, - D, ->( - state: &'a SessionState, - req_state: ReqState, - merchant_context: &'a domain::MerchantContext, - connector: api::ConnectorData, - operation: &'a BoxedOperation<'a, F, ApiRequest, D>, - payment_data: &'a mut D, - customer: &Option, - call_connector_action: CallConnectorAction, - validate_result: &'a operations::ValidateResult, - schedule_time: Option, - header_payload: HeaderPayload, - frm_suggestion: Option, - business_profile: &'a domain::Profile, - is_retry_payment: bool, - all_keys_required: Option, - merchant_connector_account: helpers::MerchantConnectorAccountType, - router_data: RouterData, - tokenization_action: TokenizationAction, -) -> RouterResult<( - RouterData, - helpers::MerchantConnectorAccountType, -)> -where - F: Send + Clone + Sync + 'static + 'a, - RouterDReq: Send + Sync + Clone + 'static + 'a + serde::Serialize, - D: OperationSessionGetters + OperationSessionSetters + Send + Sync + Clone, - D: ConstructFlowSpecificData, - RouterData: - Feature + Send + Clone + serde::Serialize, - dyn api::Connector: - services::api::ConnectorIntegration, -{ - router_env::logger::info!( - "Processing payment through Direct gateway system with UCS in shadow mode - payment_id={}, attempt_id={}", - payment_data.get_payment_intent().payment_id.get_string_repr(), - payment_data.get_payment_attempt().attempt_id - ); - - // Clone data needed for shadow UCS call - let unified_connector_service_router_data = router_data.clone(); - let unified_connector_service_merchant_connector_account = merchant_connector_account.clone(); - let unified_connector_service_merchant_context = merchant_context.clone(); - let unified_connector_service_header_payload = header_payload.clone(); - let unified_connector_service_state = state.clone(); - - let lineage_ids = grpc_client::LineageIds::new( - business_profile.merchant_id.clone(), - business_profile.get_id().clone(), - ); - - // Update feature metadata to track Direct routing usage for stickiness - update_gateway_system_in_feature_metadata(payment_data, GatewaySystem::Direct)?; - - // Call Direct connector service - let result = call_connector_service( - state, - req_state, - merchant_context, - connector, - operation, - payment_data, - customer, - call_connector_action, - validate_result, - schedule_time, - header_payload, - frm_suggestion, - business_profile, - is_retry_payment, - all_keys_required, - merchant_connector_account, - router_data, - tokenization_action, - ) - .await?; - - // Spawn shadow UCS call in background - let direct_router_data = result.0.clone(); - tokio::spawn(async move { - execute_shadow_unified_connector_service_call( - unified_connector_service_state, - unified_connector_service_router_data, - direct_router_data, - unified_connector_service_header_payload, - lineage_ids, - unified_connector_service_merchant_connector_account, - unified_connector_service_merchant_context, - ) - .await - }); - - Ok(result) -} - -#[cfg(feature = "v1")] -#[allow(clippy::too_many_arguments)] -#[instrument(skip_all)] -// Helper function to execute shadow UCS call -async fn execute_shadow_unified_connector_service_call( - state: SessionState, - mut unified_connector_service_router_data: RouterData< - F, - RouterDReq, - router_types::PaymentsResponseData, - >, - direct_router_data: RouterData, - header_payload: HeaderPayload, - lineage_ids: grpc_client::LineageIds, - merchant_connector_account: helpers::MerchantConnectorAccountType, - merchant_context: domain::MerchantContext, -) where - F: Send + Clone + Sync + 'static, - RouterDReq: Send + Sync + Clone + 'static + serde::Serialize, - RouterData: - Feature + Send + Clone + serde::Serialize, - dyn api::Connector: - services::api::ConnectorIntegration, -{ - // Call UCS in shadow mode - let _unified_connector_service_result = unified_connector_service_router_data - .call_unified_connector_service( - &state, - &header_payload, - lineage_ids, - merchant_connector_account, - &merchant_context, - ExecutionMode::Shadow, // Shadow mode for UCS - ) - .await - .map_err(|e| logger::debug!("Shadow UCS call failed: {:?}", e)); - - // Compare results - match serialize_router_data_and_send_to_comparison_service( - &state, - direct_router_data, - unified_connector_service_router_data, - ) - .await - { - Ok(_) => logger::debug!("Shadow UCS comparison completed successfully"), - Err(e) => logger::debug!("Shadow UCS comparison failed: {:?}", e), - } -} - -async fn serialize_router_data_and_send_to_comparison_service( - state: &SessionState, - hyperswitch_router_data: RouterData, - unified_connector_service_router_data: RouterData< - F, - RouterDReq, - router_types::PaymentsResponseData, - >, -) -> RouterResult<()> -where - F: Send + Clone + Sync + 'static, - RouterDReq: Send + Sync + Clone + 'static + serde::Serialize, -{ - logger::info!("Simulating UCS call for shadow mode comparison"); - let hyperswitch_data = match serde_json::to_value(hyperswitch_router_data) { - Ok(data) => Secret::new(data), - Err(_) => { - logger::debug!("Failed to serialize HS router data"); - return Ok(()); - } - }; - - let unified_connector_service_data = - match serde_json::to_value(unified_connector_service_router_data) { - Ok(data) => Secret::new(data), - Err(_) => { - logger::debug!("Failed to serialize UCS router data"); - return Ok(()); - } - }; - - let comparison_data = ComparisonData { - hyperswitch_data, - unified_connector_service_data, - }; - let _ = send_comparison_data(state, comparison_data) - .await - .map_err(|e| { - logger::debug!("Failed to send comparison data: {:?}", e); - }); - Ok(()) -} - async fn record_time_taken_with(f: F) -> RouterResult where F: FnOnce() -> Fut, @@ -5178,7 +4812,7 @@ where .await?; // do order creation - let should_call_unified_connector_service = should_call_unified_connector_service( + let execution_path = should_call_unified_connector_service( state, merchant_context, &router_data, @@ -5186,50 +4820,49 @@ where ) .await?; - let (connector_request, should_continue_further) = if matches!( - should_call_unified_connector_service, - GatewaySystem::Direct | GatewaySystem::ShadowUnifiedConnectorService - ) { - let mut should_continue_further = true; + let (connector_request, should_continue_further) = + if matches!(execution_path, ExecutionPath::Direct) { + let mut should_continue_further = true; - let should_continue = match router_data - .create_order_at_connector(state, &connector, should_continue_further) - .await? - { - Some(create_order_response) => { - if let Ok(order_id) = create_order_response.clone().create_order_result { - payment_data.set_connector_response_reference_id(Some(order_id)) + let should_continue = match router_data + .create_order_at_connector(state, &connector, should_continue_further) + .await? + { + Some(create_order_response) => { + if let Ok(order_id) = create_order_response.clone().create_order_result { + payment_data.set_connector_response_reference_id(Some(order_id)) + } + + // Set the response in routerdata response to carry forward + router_data.update_router_data_with_create_order_response( + create_order_response.clone(), + ); + create_order_response.create_order_result.ok().map(|_| ()) } + // If create order is not required, then we can proceed with further processing + None => Some(()), + }; - // Set the response in routerdata response to carry forward - router_data - .update_router_data_with_create_order_response(create_order_response.clone()); - create_order_response.create_order_result.ok().map(|_| ()) - } - // If create order is not required, then we can proceed with further processing - None => Some(()), + let should_continue: (Option, bool) = + match should_continue { + Some(_) => { + router_data + .build_flow_specific_connector_request( + state, + &connector, + call_connector_action.clone(), + ) + .await? + } + None => (None, false), + }; + should_continue + } else { + // If unified connector service is called, these values are not used + // as the request is built in the unified connector service call + (None, false) }; - let should_continue: (Option, bool) = match should_continue - { - Some(_) => { - router_data - .build_flow_specific_connector_request( - state, - &connector, - call_connector_action.clone(), - ) - .await? - } - None => (None, false), - }; - should_continue - } else { - // If unified connector service is called, these values are not used - // as the request is built in the unified connector service call - (None, false) - }; - (_, *payment_data) = operation .to_update_tracker()? .update_trackers( @@ -5246,7 +4879,7 @@ where .await?; record_time_taken_with(|| async { - if matches!(should_call_unified_connector_service, GatewaySystem::UnifiedConnectorService) { + if matches!(execution_path, ExecutionPath::UnifiedConnectorService) { router_env::logger::info!( "Processing payment through UCS gateway system- payment_id={}, attempt_id={}", payment_data.get_payment_intent().id.get_string_repr(), @@ -5313,14 +4946,14 @@ where services::api::ConnectorIntegration, { record_time_taken_with(|| async { - let execution = should_call_unified_connector_service( + let execution_path = should_call_unified_connector_service( state, merchant_context, &router_data, Some(payment_data), ) .await?; - if matches!(execution, GatewaySystem::UnifiedConnectorService) { + if matches!(execution_path, ExecutionPath::UnifiedConnectorService) { router_env::logger::info!( "Executing payment through UCS gateway system - payment_id={}, attempt_id={}", payment_data.get_payment_intent().id.get_string_repr(), @@ -5371,7 +5004,7 @@ where Ok(router_data) } else { - if matches!(execution, GatewaySystem::ShadowUnifiedConnectorService) { + if matches!(execution_path, ExecutionPath::ShadowUnifiedConnectorService) { router_env::logger::info!( "Shadow UCS mode not implemented in v2, processing through direct path - payment_id={}, attempt_id={}", payment_data.get_payment_intent().id.get_string_repr(), diff --git a/crates/router/src/core/payments/helpers.rs b/crates/router/src/core/payments/helpers.rs index 84b8bed45b..3f799d9ade 100644 --- a/crates/router/src/core/payments/helpers.rs +++ b/crates/router/src/core/payments/helpers.rs @@ -11,6 +11,8 @@ use api_models::{ payments::{additional_info as payment_additional_types, RequestSurchargeDetails}, }; use base64::Engine; +#[cfg(feature = "v1")] +use common_enums::enums::{CallConnectorAction, ExecutionMode, GatewaySystem}; use common_enums::ConnectorType; #[cfg(feature = "v2")] use common_utils::id_type::GenerateId; @@ -29,6 +31,8 @@ use common_utils::{ use diesel_models::enums; // TODO : Evaluate all the helper functions () use error_stack::{report, ResultExt}; +#[cfg(feature = "v1")] +use external_services::grpc_client; use futures::future::Either; #[cfg(feature = "v1")] use hyperswitch_domain_models::payments::payment_intent::CustomerData; @@ -67,6 +71,21 @@ use super::{ operations::{BoxedOperation, Operation, PaymentResponse}, CustomerDetails, PaymentData, }; +#[cfg(feature = "v1")] +use crate::core::{ + payments::{ + call_connector_service, + flows::{ConstructFlowSpecificData, Feature}, + operations::ValidateResult as OperationsValidateResult, + should_add_task_to_process_tracker, OperationSessionGetters, OperationSessionSetters, + TokenizationAction, + }, + unified_connector_service::{ + send_comparison_data, update_gateway_system_in_feature_metadata, ComparisonData, + }, +}; +#[cfg(feature = "v1")] +use crate::routes; use crate::{ configs::settings::{ConnectorRequestReferenceIdConfig, TempLockerEnableConfig}, connector, @@ -5627,7 +5646,7 @@ pub async fn get_apple_pay_retryable_connectors( ) -> CustomResult>, errors::ApiErrorResponse> where F: Send + Clone, - D: payments::OperationSessionGetters + Send, + D: OperationSessionGetters + Send, { let profile_id = business_profile.get_id(); @@ -7327,7 +7346,7 @@ pub async fn override_setup_future_usage_to_on_session( ) -> CustomResult<(), errors::ApiErrorResponse> where F: Clone, - D: payments::OperationSessionGetters + payments::OperationSessionSetters + Send, + D: OperationSessionGetters + OperationSessionSetters + Send, { if payment_data.get_payment_intent().setup_future_usage == Some(enums::FutureUsage::OffSession) { @@ -7835,3 +7854,360 @@ pub fn is_stored_credential( is_stored_credential_prev } } + +#[cfg(feature = "v1")] +#[allow(clippy::too_many_arguments)] +#[instrument(skip_all)] +// Helper function to process through UCS gateway +pub async fn process_through_ucs<'a, F, RouterDReq, ApiRequest, D>( + state: &'a SessionState, + req_state: routes::app::ReqState, + merchant_context: &'a domain::MerchantContext, + operation: &'a BoxedOperation<'a, F, ApiRequest, D>, + payment_data: &'a mut D, + customer: &Option, + validate_result: &'a OperationsValidateResult, + schedule_time: Option, + header_payload: domain_payments::HeaderPayload, + frm_suggestion: Option, + business_profile: &'a domain::Profile, + merchant_connector_account: MerchantConnectorAccountType, + mut router_data: RouterData, +) -> RouterResult<( + RouterData, + MerchantConnectorAccountType, +)> +where + F: Send + Clone + Sync + 'static, + RouterDReq: Send + Sync + Clone + 'static + Serialize, + D: OperationSessionGetters + OperationSessionSetters + Send + Sync + Clone, + D: ConstructFlowSpecificData, + RouterData: + Feature + Send + Clone + Serialize, + dyn api::Connector: services::api::ConnectorIntegration, +{ + router_env::logger::info!( + "Processing payment through UCS gateway system - payment_id={}, attempt_id={}", + payment_data + .get_payment_intent() + .payment_id + .get_string_repr(), + payment_data.get_payment_attempt().attempt_id + ); + + // Add task to process tracker if needed + if should_add_task_to_process_tracker(payment_data) { + operation + .to_domain()? + .add_task_to_process_tracker( + state, + payment_data.get_payment_attempt(), + validate_result.requeue, + schedule_time, + ) + .await + .map_err(|error| router_env::logger::error!(process_tracker_error=?error)) + .ok(); + } + + // Update feature metadata to track UCS usage for stickiness + update_gateway_system_in_feature_metadata( + payment_data, + GatewaySystem::UnifiedConnectorService, + )?; + + // Update trackers + (_, *payment_data) = operation + .to_update_tracker()? + .update_trackers( + state, + req_state, + payment_data.clone(), + customer.clone(), + merchant_context.get_merchant_account().storage_scheme, + None, + merchant_context.get_merchant_key_store(), + frm_suggestion, + header_payload.clone(), + ) + .await?; + + // Call UCS + let lineage_ids = grpc_client::LineageIds::new( + business_profile.merchant_id.clone(), + business_profile.get_id().clone(), + ); + + router_data + .call_unified_connector_service( + state, + &header_payload, + lineage_ids, + merchant_connector_account.clone(), + merchant_context, + ExecutionMode::Primary, // UCS is called in primary mode + ) + .await?; + + Ok((router_data, merchant_connector_account)) +} + +#[cfg(feature = "v1")] +#[allow(clippy::too_many_arguments)] +#[instrument(skip_all)] +// Helper function to process through Direct gateway +pub async fn process_through_direct<'a, F, RouterDReq, ApiRequest, D>( + state: &'a SessionState, + req_state: routes::app::ReqState, + merchant_context: &'a domain::MerchantContext, + connector: api::ConnectorData, + operation: &'a BoxedOperation<'a, F, ApiRequest, D>, + payment_data: &'a mut D, + customer: &Option, + call_connector_action: CallConnectorAction, + validate_result: &'a OperationsValidateResult, + schedule_time: Option, + header_payload: domain_payments::HeaderPayload, + frm_suggestion: Option, + business_profile: &'a domain::Profile, + is_retry_payment: bool, + all_keys_required: Option, + merchant_connector_account: MerchantConnectorAccountType, + router_data: RouterData, + tokenization_action: TokenizationAction, +) -> RouterResult<( + RouterData, + MerchantConnectorAccountType, +)> +where + F: Send + Clone + Sync + 'static, + RouterDReq: Send + Sync + Clone + 'static + Serialize, + D: OperationSessionGetters + OperationSessionSetters + Send + Sync + Clone, + D: ConstructFlowSpecificData, + RouterData: + Feature + Send + Clone + Serialize, + dyn api::Connector: services::api::ConnectorIntegration, +{ + router_env::logger::info!( + "Processing payment through Direct gateway system - payment_id={}, attempt_id={}", + payment_data + .get_payment_intent() + .payment_id + .get_string_repr(), + payment_data.get_payment_attempt().attempt_id + ); + + // Update feature metadata to track Direct routing usage for stickiness + update_gateway_system_in_feature_metadata(payment_data, GatewaySystem::Direct)?; + + call_connector_service( + state, + req_state, + merchant_context, + connector, + operation, + payment_data, + customer, + call_connector_action, + validate_result, + schedule_time, + header_payload, + frm_suggestion, + business_profile, + is_retry_payment, + all_keys_required, + merchant_connector_account, + router_data, + tokenization_action, + ) + .await +} + +#[cfg(feature = "v1")] +#[allow(clippy::too_many_arguments)] +#[instrument(skip_all)] +// Helper function to process through Direct with Shadow UCS +pub async fn process_through_direct_with_shadow_unified_connector_service< + 'a, + F, + RouterDReq, + ApiRequest, + D, +>( + state: &'a SessionState, + req_state: routes::app::ReqState, + merchant_context: &'a domain::MerchantContext, + connector: api::ConnectorData, + operation: &'a BoxedOperation<'a, F, ApiRequest, D>, + payment_data: &'a mut D, + customer: &Option, + call_connector_action: CallConnectorAction, + validate_result: &'a OperationsValidateResult, + schedule_time: Option, + header_payload: domain_payments::HeaderPayload, + frm_suggestion: Option, + business_profile: &'a domain::Profile, + is_retry_payment: bool, + all_keys_required: Option, + merchant_connector_account: MerchantConnectorAccountType, + router_data: RouterData, + tokenization_action: TokenizationAction, +) -> RouterResult<( + RouterData, + MerchantConnectorAccountType, +)> +where + F: Send + Clone + Sync + 'static, + RouterDReq: Send + Sync + Clone + 'static + Serialize, + D: OperationSessionGetters + OperationSessionSetters + Send + Sync + Clone, + D: ConstructFlowSpecificData, + RouterData: + Feature + Send + Clone + Serialize, + dyn api::Connector: services::api::ConnectorIntegration, +{ + router_env::logger::info!( + "Processing payment through Direct gateway system with UCS in shadow mode - payment_id={}, attempt_id={}", + payment_data.get_payment_intent().payment_id.get_string_repr(), + payment_data.get_payment_attempt().attempt_id + ); + + // Clone data needed for shadow UCS call + let unified_connector_service_router_data = router_data.clone(); + let unified_connector_service_merchant_connector_account = merchant_connector_account.clone(); + let unified_connector_service_merchant_context = merchant_context.clone(); + let unified_connector_service_header_payload = header_payload.clone(); + let unified_connector_service_state = state.clone(); + + let lineage_ids = grpc_client::LineageIds::new( + business_profile.merchant_id.clone(), + business_profile.get_id().clone(), + ); + + // Update feature metadata to track Direct routing usage for stickiness + update_gateway_system_in_feature_metadata(payment_data, GatewaySystem::Direct)?; + + // Call Direct connector service + let result = call_connector_service( + state, + req_state, + merchant_context, + connector, + operation, + payment_data, + customer, + call_connector_action, + validate_result, + schedule_time, + header_payload, + frm_suggestion, + business_profile, + is_retry_payment, + all_keys_required, + merchant_connector_account, + router_data, + tokenization_action, + ) + .await?; + + // Spawn shadow UCS call in background + let direct_router_data = result.0.clone(); + tokio::spawn(async move { + execute_shadow_unified_connector_service_call( + unified_connector_service_state, + unified_connector_service_router_data, + direct_router_data, + unified_connector_service_header_payload, + lineage_ids, + unified_connector_service_merchant_connector_account, + unified_connector_service_merchant_context, + ) + .await + }); + + Ok(result) +} + +#[cfg(feature = "v1")] +#[allow(clippy::too_many_arguments)] +#[instrument(skip_all)] +// Helper function to execute shadow UCS call +pub async fn execute_shadow_unified_connector_service_call( + state: SessionState, + mut unified_connector_service_router_data: RouterData, + direct_router_data: RouterData, + header_payload: domain_payments::HeaderPayload, + lineage_ids: grpc_client::LineageIds, + merchant_connector_account: MerchantConnectorAccountType, + merchant_context: domain::MerchantContext, +) where + F: Send + Clone + Sync + 'static, + RouterDReq: Send + Sync + Clone + 'static + Serialize, + RouterData: + Feature + Send + Clone + Serialize, + dyn api::Connector: services::api::ConnectorIntegration, +{ + // Call UCS in shadow mode + let _unified_connector_service_result = unified_connector_service_router_data + .call_unified_connector_service( + &state, + &header_payload, + lineage_ids, + merchant_connector_account, + &merchant_context, + ExecutionMode::Shadow, // Shadow mode for UCS + ) + .await + .map_err(|e| router_env::logger::debug!("Shadow UCS call failed: {:?}", e)); + + // Compare results + match serialize_router_data_and_send_to_comparison_service( + &state, + direct_router_data, + unified_connector_service_router_data, + ) + .await + { + Ok(_) => router_env::logger::debug!("Shadow UCS comparison completed successfully"), + Err(e) => router_env::logger::debug!("Shadow UCS comparison failed: {:?}", e), + } +} + +#[cfg(feature = "v1")] +pub async fn serialize_router_data_and_send_to_comparison_service( + state: &SessionState, + hyperswitch_router_data: RouterData, + unified_connector_service_router_data: RouterData, +) -> RouterResult<()> +where + F: Send + Clone + Sync + 'static, + RouterDReq: Send + Sync + Clone + 'static + Serialize, +{ + router_env::logger::info!("Simulating UCS call for shadow mode comparison"); + let hyperswitch_data = match serde_json::to_value(hyperswitch_router_data) { + Ok(data) => masking::Secret::new(data), + Err(_) => { + router_env::logger::debug!("Failed to serialize HS router data"); + return Ok(()); + } + }; + + let unified_connector_service_data = + match serde_json::to_value(unified_connector_service_router_data) { + Ok(data) => masking::Secret::new(data), + Err(_) => { + router_env::logger::debug!("Failed to serialize UCS router data"); + return Ok(()); + } + }; + + let comparison_data = ComparisonData { + hyperswitch_data, + unified_connector_service_data, + }; + let _ = send_comparison_data(state, comparison_data) + .await + .map_err(|e| { + router_env::logger::debug!("Failed to send comparison data: {:?}", e); + }); + Ok(()) +} diff --git a/crates/router/src/core/payments/retry.rs b/crates/router/src/core/payments/retry.rs index 367abc4f23..5f9ff29e67 100644 --- a/crates/router/src/core/payments/retry.rs +++ b/crates/router/src/core/payments/retry.rs @@ -48,8 +48,8 @@ pub async fn do_gsm_actions<'a, F, ApiRequest, FData, D>( business_profile: &domain::Profile, ) -> RouterResult> where - F: Clone + Send + Sync + 'static + 'a, - FData: Send + Sync + types::Capturable + Clone + 'static + 'a + serde::Serialize, + F: Clone + Send + Sync + 'static, + FData: Send + Sync + types::Capturable + Clone + 'static + serde::Serialize, payments::PaymentResponse: operations::Operation, D: payments::OperationSessionGetters + payments::OperationSessionSetters @@ -356,8 +356,8 @@ pub async fn do_retry<'a, F, ApiRequest, FData, D>( routing_decision: Option, ) -> RouterResult> where - F: Clone + Send + Sync + 'static + 'a, - FData: Send + Sync + types::Capturable + Clone + 'static + 'a + serde::Serialize, + F: Clone + Send + Sync + 'static, + FData: Send + Sync + types::Capturable + Clone + 'static + serde::Serialize, payments::PaymentResponse: operations::Operation, D: payments::OperationSessionGetters + payments::OperationSessionSetters diff --git a/crates/router/src/core/unified_connector_service.rs b/crates/router/src/core/unified_connector_service.rs index 255256b89b..f964df75c1 100644 --- a/crates/router/src/core/unified_connector_service.rs +++ b/crates/router/src/core/unified_connector_service.rs @@ -4,7 +4,8 @@ use api_models::admin; #[cfg(feature = "v2")] use base64::Engine; use common_enums::{ - connector_enums::Connector, AttemptStatus, ExecutionMode, GatewaySystem, PaymentMethodType, + connector_enums::Connector, AttemptStatus, ConnectorIntegrationType, ExecutionMode, + ExecutionPath, GatewaySystem, PaymentMethodType, ShadowRolloutAvailability, UcsAvailability, }; #[cfg(feature = "v2")] use common_utils::consts::BASE64_ENGINE; @@ -75,12 +76,75 @@ type UnifiedConnectorServiceResult = CustomResult< UnifiedConnectorServiceError, >; +/// Checks if the Unified Connector Service (UCS) is available for use +async fn check_ucs_availability(state: &SessionState) -> UcsAvailability { + let is_client_available = state.grpc_client.unified_connector_service_client.is_some(); + + let is_enabled = is_ucs_enabled(state, consts::UCS_ENABLED).await; + + match (is_client_available, is_enabled) { + (true, true) => { + router_env::logger::debug!("UCS is available and enabled"); + UcsAvailability::Enabled + } + _ => { + router_env::logger::debug!( + "UCS client is {} and UCS is {} in configuration", + if is_client_available { + "available" + } else { + "not available" + }, + if is_enabled { "enabled" } else { "not enabled" } + ); + UcsAvailability::Disabled + } + } +} + +/// Determines the connector integration type based on UCS configuration or on both +async fn determine_connector_integration_type( + state: &SessionState, + connector: Connector, + config_key: &str, +) -> RouterResult { + match state.conf.grpc_client.unified_connector_service.as_ref() { + Some(ucs_config) => { + let is_ucs_only = ucs_config.ucs_only_connectors.contains(&connector); + let is_rollout_enabled = should_execute_based_on_rollout(state, config_key).await?; + + if is_ucs_only || is_rollout_enabled { + router_env::logger::debug!( + connector = ?connector, + ucs_only_list = is_ucs_only, + rollout_enabled = is_rollout_enabled, + "Using UcsConnector" + ); + Ok(ConnectorIntegrationType::UcsConnector) + } else { + router_env::logger::debug!( + connector = ?connector, + "Using DirectConnector - not in ucs_only_list and rollout not enabled" + ); + Ok(ConnectorIntegrationType::DirectConnector) + } + } + None => { + router_env::logger::debug!( + connector = ?connector, + "UCS config not present, using DirectConnector" + ); + Ok(ConnectorIntegrationType::DirectConnector) + } + } +} + pub async fn should_call_unified_connector_service( state: &SessionState, merchant_context: &MerchantContext, router_data: &RouterData, payment_data: Option<&D>, -) -> RouterResult +) -> RouterResult where D: OperationSessionGetters, { @@ -98,63 +162,10 @@ where let payment_method = router_data.payment_method.to_string(); let flow_name = get_flow_name::()?; - // Compute all relevant conditions - let ucs_client_available = state.grpc_client.unified_connector_service_client.is_some(); - let ucs_enabled = is_ucs_enabled(state, consts::UCS_ENABLED).await; - - // Log if UCS client is not available - if !ucs_client_available { - router_env::logger::debug!( - "UCS client not available - merchant_id={}, connector={}", - merchant_id, - connector_name - ); - } - - // Log if UCS is not enabled - if !ucs_enabled { - router_env::logger::debug!( - "UCS not enabled in configuration - merchant_id={}, connector={}", - merchant_id, - connector_name - ); - } - - let ucs_config = state.conf.grpc_client.unified_connector_service.as_ref(); - - // Log if UCS configuration is missing - if ucs_config.is_none() { - router_env::logger::debug!( - "UCS configuration not found - merchant_id={}, connector={}", - merchant_id, - connector_name - ); - } - - let ucs_only_connector = - ucs_config.is_some_and(|config| config.ucs_only_connectors.contains(&connector_enum)); - - let previous_gateway = payment_data.and_then(extract_gateway_system_from_payment_intent); - - // Log previous gateway state - match previous_gateway { - Some(gateway) => { - router_env::logger::debug!( - "Previous gateway system found: {:?} - merchant_id={}, connector={}", - gateway, - merchant_id, - connector_name - ); - } - None => { - router_env::logger::debug!( - "No previous gateway system found (new payment) - merchant_id={}, connector={}", - merchant_id, - connector_name - ); - } - } + // Check UCS availability using idiomatic helper + let ucs_availability = check_ucs_availability(state).await; + // Build rollout keys let rollout_key = format!( "{}_{}_{}_{}_{}", consts::UCS_ROLLOUT_PERCENT_CONFIG_PREFIX, @@ -163,179 +174,148 @@ where payment_method, flow_name ); + + // Determine connector integration type + let connector_integration_type = + determine_connector_integration_type(state, connector_enum, &rollout_key).await?; + + // Extract previous gateway from payment data + let previous_gateway = payment_data.and_then(extract_gateway_system_from_payment_intent); let shadow_rollout_key = format!("{}_shadow", rollout_key); - let rollout_enabled = should_execute_based_on_rollout(state, &rollout_key).await?; - let shadow_rollout_enabled = - should_execute_based_on_rollout(state, &shadow_rollout_key).await?; - - router_env::logger::debug!( - "Rollout status - rollout_enabled={}, shadow_rollout_enabled={}, rollout_key={}, merchant_id={}, connector={}", - rollout_enabled, - shadow_rollout_enabled, - rollout_key, - merchant_id, - connector_name - ); + let shadow_rollout_availability = + if should_execute_based_on_rollout(state, &shadow_rollout_key).await? { + ShadowRolloutAvailability::IsAvailable + } else { + ShadowRolloutAvailability::NotAvailable + }; // Single decision point using pattern matching - // Tuple structure: (ucs_infrastructure_ready, ucs_only_connector, previous_gateway, shadow_rollout_enabled, rollout_enabled) - let decision = match ( - ucs_client_available && ucs_enabled, - ucs_only_connector, - previous_gateway, - shadow_rollout_enabled, - rollout_enabled, - ) { - // ==================== DIRECT GATEWAY DECISIONS ==================== - // All patterns that result in Direct routing - - // UCS infrastructure not available (any configuration) - // - Client not available OR not enabled - // - Regardless of: ucs_only_connector, previous_gateway, rollouts - (false, _, _, _, _) | - - // UCS available but no rollouts active and no previous gateway - // - Infrastructure ready - // - Not a UCS-only connector - // - New payment (no previous gateway) - // - No shadow rollout - // - No full rollout - (true, false, None, false, false) | - - // UCS available, continuing with Direct, no rollouts - // - Infrastructure ready - // - Not a UCS-only connector - // - Previous gateway was Direct - // - No shadow rollout - // - No full rollout - (true, false, Some(GatewaySystem::Direct), false, false) | - - // UCS available, previous Shadow but rollout ended - // - Infrastructure ready - // - Not a UCS-only connector - // - Previous gateway was Shadow - // - No shadow rollout (ended) - // - No full rollout - (true, false, Some(GatewaySystem::ShadowUnifiedConnectorService), false, false) => { - router_env::logger::debug!( - "Routing to Direct: ucs_ready={}, ucs_only={}, previous={:?}, shadow_rollout={}, full_rollout={} - merchant_id={}, connector={}", - ucs_client_available && ucs_enabled, - ucs_only_connector, - previous_gateway, - shadow_rollout_enabled, - rollout_enabled, - merchant_id, - connector_name - ); - GatewaySystem::Direct - } - - // ==================== SHADOW UCS DECISIONS ==================== - // All patterns that result in Shadow UCS routing - - // Shadow rollout for new payment (no previous gateway) - // - Infrastructure ready - // - Not a UCS-only connector - // - No previous gateway - // - Shadow rollout enabled - // - Full rollout: any (false or true, shadow takes precedence) - (true, false, None, true, _) | - - // Shadow rollout with previous Direct gateway - // - Infrastructure ready - // - Not a UCS-only connector - // - Previous gateway was Direct - // - Shadow rollout enabled - // - Full rollout: any (shadow takes precedence) - (true, false, Some(GatewaySystem::Direct), true, _) | - - // Shadow rollout with previous Shadow gateway (continuation) - // - Infrastructure ready - // - Not a UCS-only connector - // - Previous gateway was Shadow - // - Shadow rollout enabled - // - Full rollout: any - (true, false, Some(GatewaySystem::ShadowUnifiedConnectorService), true, _) => { - router_env::logger::debug!( - "Routing to ShadowUnifiedConnectorService: ucs_ready={}, ucs_only={}, previous={:?}, shadow_rollout={}, full_rollout={} - merchant_id={}, connector={}", - ucs_client_available && ucs_enabled, - ucs_only_connector, - previous_gateway, - shadow_rollout_enabled, - rollout_enabled, - merchant_id, - connector_name - ); - GatewaySystem::ShadowUnifiedConnectorService - } - - // ==================== UNIFIED CONNECTOR SERVICE DECISIONS ==================== - // All patterns that result in UCS routing - - // UCS-only connector (mandatory UCS) - // - Infrastructure ready - // - UCS-only connector flag set - // - Any previous gateway - // - Any shadow rollout state - // - Any full rollout state - (true, true, _, _, _) | - - // Sticky routing: Continue with previous UCS - // - Infrastructure ready - // - Not a UCS-only connector - // - Previous gateway was UCS - // - Any shadow rollout state (doesn't affect existing UCS) - // - Any full rollout state - (true, false, Some(GatewaySystem::UnifiedConnectorService), _, _) | - - // Full rollout: New payment - // - Infrastructure ready - // - Not a UCS-only connector - // - No previous gateway - // - No shadow rollout (shadow would take precedence) - // - Full rollout enabled - (true, false, None, false, true) | - - // Full rollout: Switch from Direct - // - Infrastructure ready - // - Not a UCS-only connector - // - Previous gateway was Direct - // - No shadow rollout (shadow would take precedence) - // - Full rollout enabled - (true, false, Some(GatewaySystem::Direct), false, true) | - - // Full rollout: Promote from Shadow - // - Infrastructure ready - // - Not a UCS-only connector - // - Previous gateway was Shadow - // - Shadow rollout ended (now false) - // - Full rollout enabled - (true, false, Some(GatewaySystem::ShadowUnifiedConnectorService), false, true) => { - router_env::logger::debug!( - "Routing to UnifiedConnectorService: ucs_ready={}, ucs_only={}, previous={:?}, shadow_rollout={}, full_rollout={} - merchant_id={}, connector={}", - ucs_client_available && ucs_enabled, - ucs_only_connector, - previous_gateway, - shadow_rollout_enabled, - rollout_enabled, - merchant_id, - connector_name - ); - GatewaySystem::UnifiedConnectorService - } + let (gateway_system, execution_path) = if ucs_availability == UcsAvailability::Disabled { + router_env::logger::debug!("UCS is disabled, using Direct gateway"); + (GatewaySystem::Direct, ExecutionPath::Direct) + } else { + // UCS is enabled, call decide function + decide_execution_path( + connector_integration_type, + previous_gateway, + shadow_rollout_availability, + )? }; router_env::logger::info!( - "Payment gateway system decision: {:?} - merchant_id={}, connector={}, payment_method={}, flow={}", - decision, + "Payment gateway decision: gateway={:?}, execution_path={:?} - merchant_id={}, connector={}, payment_method={}, flow={}", + gateway_system, + execution_path, merchant_id, connector_name, payment_method, flow_name ); - Ok(decision) + Ok(execution_path) +} + +fn decide_execution_path( + connector_type: ConnectorIntegrationType, + previous_gateway: Option, + shadow_rollout_enabled: ShadowRolloutAvailability, +) -> RouterResult<(GatewaySystem, ExecutionPath)> { + match (connector_type, previous_gateway, shadow_rollout_enabled) { + // Case 1: DirectConnector with no previous gateway and no shadow rollout + // This is a fresh payment request for a direct connector - use direct gateway + ( + ConnectorIntegrationType::DirectConnector, + None, + ShadowRolloutAvailability::NotAvailable, + ) => Ok((GatewaySystem::Direct, ExecutionPath::Direct)), + + // Case 2: DirectConnector previously used Direct gateway, no shadow rollout + // Continue using the same direct gateway for consistency + ( + ConnectorIntegrationType::DirectConnector, + Some(GatewaySystem::Direct), + ShadowRolloutAvailability::NotAvailable, + ) => Ok((GatewaySystem::Direct, ExecutionPath::Direct)), + + // Case 3: DirectConnector previously used UCS, but now switching back to Direct (no shadow) + // Migration scenario: UCS was used before, but now we're reverting to Direct + ( + ConnectorIntegrationType::DirectConnector, + Some(GatewaySystem::UnifiedConnectorService), + ShadowRolloutAvailability::NotAvailable, + ) => Ok((GatewaySystem::Direct, ExecutionPath::Direct)), + + // Case 4: UcsConnector configuration, but previously used Direct gateway (no shadow) + // Maintain Direct for backward compatibility - don't switch mid-transaction + ( + ConnectorIntegrationType::UcsConnector, + Some(GatewaySystem::Direct), + ShadowRolloutAvailability::NotAvailable, + ) => Ok((GatewaySystem::Direct, ExecutionPath::Direct)), + + // Case 5: DirectConnector with no previous gateway, shadow rollout enabled + // Use Direct as primary, but also execute UCS in shadow mode for comparison + ( + ConnectorIntegrationType::DirectConnector, + None, + ShadowRolloutAvailability::IsAvailable, + ) => Ok(( + GatewaySystem::Direct, + ExecutionPath::ShadowUnifiedConnectorService, + )), + + // Case 6: DirectConnector previously used Direct, shadow rollout enabled + // Continue with Direct as primary, execute UCS in shadow mode for testing + ( + ConnectorIntegrationType::DirectConnector, + Some(GatewaySystem::Direct), + ShadowRolloutAvailability::IsAvailable, + ) => Ok(( + GatewaySystem::Direct, + ExecutionPath::ShadowUnifiedConnectorService, + )), + + // Case 7: DirectConnector previously used UCS, shadow rollout enabled + // Revert to Direct as primary, but keep UCS in shadow mode for comparison + ( + ConnectorIntegrationType::DirectConnector, + Some(GatewaySystem::UnifiedConnectorService), + ShadowRolloutAvailability::IsAvailable, + ) => Ok(( + GatewaySystem::Direct, + ExecutionPath::ShadowUnifiedConnectorService, + )), + + // Case 8: UcsConnector configuration, previously used Direct, shadow rollout enabled + // Maintain Direct as primary for transaction consistency, shadow UCS for testing + ( + ConnectorIntegrationType::UcsConnector, + Some(GatewaySystem::Direct), + ShadowRolloutAvailability::IsAvailable, + ) => Ok(( + GatewaySystem::Direct, + ExecutionPath::ShadowUnifiedConnectorService, + )), + + // Case 9: UcsConnector with no previous gateway (regardless of shadow rollout) + // Fresh payment for a UCS-enabled connector - use UCS as primary + (ConnectorIntegrationType::UcsConnector, None, _) => Ok(( + GatewaySystem::UnifiedConnectorService, + ExecutionPath::UnifiedConnectorService, + )), + + // Case 10: UcsConnector previously used UCS (regardless of shadow rollout) + // Continue using UCS for consistency in the payment flow + ( + ConnectorIntegrationType::UcsConnector, + Some(GatewaySystem::UnifiedConnectorService), + _, + ) => Ok(( + GatewaySystem::UnifiedConnectorService, + ExecutionPath::UnifiedConnectorService, + )), + } } /// Extracts the gateway system from the payment intent's feature metadata