From 4aa9f44d4b1ffb726f60410226c8092d8a8bc11c Mon Sep 17 00:00:00 2001 From: Shivansh Mathur <104988143+su-shivanshmathur@users.noreply.github.com> Date: Wed, 29 Oct 2025 13:35:17 +0530 Subject: [PATCH] chore(router): choose proxy based on config (#9821) Co-authored-by: Amitsingh Tanwar Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Co-authored-by: Jeeva Ramachandran --- crates/hyperswitch_interfaces/src/types.rs | 9 ++ crates/router/src/core/payments.rs | 28 ++-- crates/router/src/core/payments/helpers.rs | 143 ++++++++++++++++-- .../src/core/unified_connector_service.rs | 141 ++++++++++++++--- 4 files changed, 271 insertions(+), 50 deletions(-) diff --git a/crates/hyperswitch_interfaces/src/types.rs b/crates/hyperswitch_interfaces/src/types.rs index 82486f5801..7f59aa864b 100644 --- a/crates/hyperswitch_interfaces/src/types.rs +++ b/crates/hyperswitch_interfaces/src/types.rs @@ -428,6 +428,15 @@ impl Default for Proxy { } } +/// Proxy override configuration for rollout-based proxy switching +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct ProxyOverride { + /// Override HTTP proxy URL + pub http_url: Option, + /// Override HTTPS proxy URL + pub https_url: Option, +} + /// Type alias for `ConnectorIntegrationV2` pub type CreateCustomerTypeV2 = dyn ConnectorIntegrationV2< CreateConnectorCustomer, diff --git a/crates/router/src/core/payments.rs b/crates/router/src/core/payments.rs index ba771c2e0f..f5e4884087 100644 --- a/crates/router/src/core/payments.rs +++ b/crates/router/src/core/payments.rs @@ -4280,7 +4280,7 @@ where dyn api::Connector: services::api::ConnectorIntegration, { - let execution_path = should_call_unified_connector_service( + let (execution_path, updated_state) = should_call_unified_connector_service( state, merchant_context, &router_data, @@ -4294,7 +4294,7 @@ where // Process through UCS when system is UCS and not handling response or if it is a UCS webhook action ExecutionPath::UnifiedConnectorService => { process_through_ucs( - state, + &updated_state, req_state, merchant_context, operation, @@ -4316,7 +4316,7 @@ where // Process through Direct with Shadow UCS ExecutionPath::ShadowUnifiedConnectorService => { process_through_direct_with_shadow_unified_connector_service( - state, + &updated_state, req_state, merchant_context, connector, @@ -4413,8 +4413,9 @@ where D: ConstructFlowSpecificData, RouterData: Feature + Send, // To construct connector flow specific api - dyn api::Connector: - services::api::ConnectorIntegration, + dyn api::Connector: services::api::ConnectorIntegration + + Send + + Sync, { let add_access_token_result = router_data .add_access_token( @@ -4811,7 +4812,7 @@ where .await?; // do order creation - let execution_path = should_call_unified_connector_service( + let (execution_path, updated_state) = should_call_unified_connector_service( state, merchant_context, &router_data, @@ -4954,7 +4955,7 @@ where services::api::ConnectorIntegration, { record_time_taken_with(|| async { - let execution_path = should_call_unified_connector_service( + let (execution, updated_state) = should_call_unified_connector_service( state, merchant_context, &router_data, @@ -4962,7 +4963,7 @@ where call_connector_action.clone(), ) .await?; - if matches!(execution_path, ExecutionPath::UnifiedConnectorService) { + if matches!(execution, ExecutionPath::UnifiedConnectorService) { router_env::logger::info!( "Executing payment through UCS gateway system - payment_id={}, attempt_id={}", payment_data.get_payment_intent().id.get_string_repr(), @@ -5022,7 +5023,7 @@ where Ok(router_data) } else { - if matches!(execution_path, ExecutionPath::ShadowUnifiedConnectorService) { + if matches!(execution, ExecutionPath::ShadowUnifiedConnectorService) { router_env::logger::info!( "Shadow UCS mode not implemented in v2, processing through direct path - payment_id={}, attempt_id={}", payment_data.get_payment_intent().id.get_string_repr(), @@ -5036,8 +5037,15 @@ where ); } + + let session_state = if matches!(execution, ExecutionPath::ShadowUnifiedConnectorService) { + &updated_state + } else { + state + }; + call_connector_service( - state, + session_state, req_state, merchant_context, connector, diff --git a/crates/router/src/core/payments/helpers.rs b/crates/router/src/core/payments/helpers.rs index bc3ef871b0..dc07123537 100644 --- a/crates/router/src/core/payments/helpers.rs +++ b/crates/router/src/core/payments/helpers.rs @@ -2126,34 +2126,145 @@ pub async fn is_ucs_enabled(state: &SessionState, config_key: &str) -> bool { .unwrap_or(false) } +#[derive(Debug, Clone, Deserialize)] +pub struct RolloutConfig { + pub rollout_percent: f64, + pub http_url: Option, + pub https_url: Option, +} + +impl Default for RolloutConfig { + fn default() -> Self { + Self { + rollout_percent: 0.0, + http_url: None, + https_url: None, + } + } +} + +// Re-export ProxyOverride from hyperswitch_interfaces +pub use hyperswitch_interfaces::types::ProxyOverride; + +#[derive(Debug, Clone)] +pub struct RolloutExecutionResult { + pub should_execute: bool, + pub proxy_override: Option, +} + +/// Validates a proxy URL, filtering out invalid ones and logging warnings +fn validate_proxy_url(url: Option, url_type: &str) -> Option { + url.and_then(|url_str| { + if url_str.trim().is_empty() || url::Url::parse(&url_str).is_err() { + logger::warn!( + invalid_url = %url_str, + url_type = url_type, + "Invalid proxy URL in rollout config, ignoring" + ); + None + } else { + Some(url_str) + } + }) +} + +/// Creates proxy override with validated URLs and logging +fn create_proxy_override( + http_url: Option, + https_url: Option, +) -> Option { + let validated_http = validate_proxy_url(http_url, "HTTP"); + let validated_https = validate_proxy_url(https_url, "HTTPS"); + + if validated_http.is_some() || validated_https.is_some() { + if let Some(ref http_url) = validated_http { + logger::info!(http_url = %http_url, "Using validated HTTP proxy URL from rollout config"); + } + if let Some(ref https_url) = validated_https { + logger::info!(https_url = %https_url, "Using validated HTTPS proxy URL from rollout config"); + } + Some(ProxyOverride { + http_url: validated_http, + https_url: validated_https, + }) + } else { + None + } +} + pub async fn should_execute_based_on_rollout( state: &SessionState, config_key: &str, -) -> RouterResult { +) -> RouterResult { let db = state.store.as_ref(); match db.find_config_by_key(config_key).await { - Ok(rollout_config) => match rollout_config.config.parse::() { - Ok(rollout_percent) => { - if !(0.0..=1.0).contains(&rollout_percent) { - logger::warn!( - rollout_percent, - "Rollout percent out of bounds. Must be between 0.0 and 1.0" + Ok(rollout_config) => { + // Try to parse as JSON first (new format), fallback to float (legacy format) + let config_result = match serde_json::from_str::(&rollout_config.config) + { + Ok(config) => Ok(config), + Err(err) => { + logger::debug!( + error = ?err, + config = %rollout_config.config, + "Config not in JSON format, trying legacy float format" ); - return Ok(false); + // Fallback to legacy format (simple float) + rollout_config.config.parse::() + .map(|percent| RolloutConfig { + rollout_percent: percent, + http_url: None, + https_url: None, + }) + .map_err(|err| { + logger::error!(error = ?err, "Failed to parse rollout config as either JSON or float"); + err + }) } + }; - let sampled_value: f64 = rand::thread_rng().gen_range(0.0..1.0); - Ok(sampled_value < rollout_percent) + match config_result { + Ok(config) => { + if !(0.0..=1.0).contains(&config.rollout_percent) { + logger::warn!( + rollout_percent = config.rollout_percent, + "Rollout percent out of bounds. Must be between 0.0 and 1.0" + ); + let proxy_override = + create_proxy_override(config.http_url, config.https_url); + + return Ok(RolloutExecutionResult { + should_execute: false, + proxy_override, + }); + } + + let sampled_value: f64 = rand::thread_rng().gen_range(0.0..1.0); + let should_execute = sampled_value < config.rollout_percent; + + let proxy_override = create_proxy_override(config.http_url, config.https_url); + + Ok(RolloutExecutionResult { + should_execute, + proxy_override, + }) + } + Err(err) => { + logger::error!(error = ?err, "Failed to parse rollout config"); + Ok(RolloutExecutionResult { + should_execute: false, + proxy_override: None, + }) + } } - Err(err) => { - logger::error!(error = ?err, "Failed to parse rollout percent"); - Ok(false) - } - }, + } Err(err) => { logger::error!(error = ?err, "Failed to fetch rollout config from DB"); - Ok(false) + Ok(RolloutExecutionResult { + should_execute: false, + proxy_override: None, + }) } } } diff --git a/crates/router/src/core/unified_connector_service.rs b/crates/router/src/core/unified_connector_service.rs index 5748e0ad6d..d8aef22f6d 100644 --- a/crates/router/src/core/unified_connector_service.rs +++ b/crates/router/src/core/unified_connector_service.rs @@ -51,7 +51,8 @@ use crate::{ errors::{self, RouterResult}, payments::{ helpers::{ - is_ucs_enabled, should_execute_based_on_rollout, MerchantConnectorAccountType, + self, is_ucs_enabled, should_execute_based_on_rollout, + MerchantConnectorAccountType, ProxyOverride, }, OperationSessionGetters, OperationSessionSetters, }, @@ -77,6 +78,25 @@ type UnifiedConnectorServiceResult = CustomResult< UnifiedConnectorServiceError, >; +/// Gets the rollout percentage for a given config key +async fn get_rollout_percentage(state: &SessionState, config_key: &str) -> Option { + let db = state.store.as_ref(); + + match db.find_config_by_key(config_key).await { + Ok(rollout_config) => { + // Try to parse as JSON first (new format), fallback to float (legacy format) + match serde_json::from_str::(&rollout_config.config) { + Ok(config) => Some(config.rollout_percent), + Err(_) => { + // Fallback to legacy format (simple float) + rollout_config.config.parse::().ok() + } + } + } + Err(_) => None, + } +} + /// Checks if the Unified Connector Service (UCS) is available for use async fn check_ucs_availability(state: &SessionState) -> UcsAvailability { let is_client_available = state.grpc_client.unified_connector_service_client.is_some(); @@ -112,13 +132,13 @@ async fn determine_connector_integration_type( match state.conf.grpc_client.unified_connector_service.as_ref() { Some(ucs_config) => { let is_ucs_only = ucs_config.ucs_only_connectors.contains(&connector); - let is_rollout_enabled = should_execute_based_on_rollout(state, config_key).await?; + let rollout_result = should_execute_based_on_rollout(state, config_key).await?; - if is_ucs_only || is_rollout_enabled { + if is_ucs_only || rollout_result.should_execute { router_env::logger::debug!( connector = ?connector, ucs_only_list = is_ucs_only, - rollout_enabled = is_rollout_enabled, + rollout_enabled = rollout_result.should_execute, "Using UcsConnector" ); Ok(ConnectorIntegrationType::UcsConnector) @@ -146,7 +166,7 @@ pub async fn should_call_unified_connector_service( router_data: &RouterData, payment_data: Option<&D>, call_connector_action: CallConnectorAction, -) -> RouterResult +) -> RouterResult<(ExecutionPath, SessionState)> where D: OperationSessionGetters, { @@ -185,15 +205,36 @@ where let previous_gateway = payment_data.and_then(extract_gateway_system_from_payment_intent); let shadow_rollout_key = format!("{}_shadow", rollout_key); + // Check both rollout keys to determine priority based on shadow percentage + let rollout_result = should_execute_based_on_rollout(state, &rollout_key).await?; + let shadow_rollout_result = should_execute_based_on_rollout(state, &shadow_rollout_key).await?; + + // Get shadow percentage to determine priority + let shadow_percentage = get_rollout_percentage(state, &shadow_rollout_key) + .await + .unwrap_or(0.0); + let shadow_rollout_availability = - if should_execute_based_on_rollout(state, &shadow_rollout_key).await? { + if shadow_rollout_result.should_execute && shadow_percentage != 0.0 { + // Shadow is present and percentage is non-zero, use shadow + router_env::logger::debug!( + shadow_percentage = shadow_percentage, + "Shadow rollout is present with non-zero percentage, using shadow" + ); + ShadowRolloutAvailability::IsAvailable + } else if rollout_result.should_execute { + // Either shadow is 0.0 or not present, use rollout if available + router_env::logger::debug!( + shadow_percentage = shadow_percentage, + "Shadow rollout is 0.0 or not present, using rollout" + ); ShadowRolloutAvailability::IsAvailable } else { ShadowRolloutAvailability::NotAvailable }; // Single decision point using pattern matching - let (gateway_system, execution_path) = if ucs_availability == UcsAvailability::Disabled { + let (_gateway_system, execution_path) = if ucs_availability == UcsAvailability::Disabled { match call_connector_action { CallConnectorAction::UCSConsumeResponse(_) | CallConnectorAction::UCSHandleResponse(_) => { @@ -237,17 +278,58 @@ where } }; - router_env::logger::info!( - "Payment gateway decision: gateway={:?}, execution_path={:?} - merchant_id={}, connector={}, payment_method={}, flow={}", - gateway_system, - execution_path, - merchant_id, - connector_name, - payment_method, - flow_name - ); + router_env::logger::info!( "Payment gateway decision: execution_path={:?} - merchant_id={}, connector={}, payment_method={}, flow={}", execution_path, merchant_id, connector_name, payment_method, flow_name ); - Ok(execution_path) + // Handle proxy configuration for Shadow UCS flows + let session_state = match execution_path { + ExecutionPath::ShadowUnifiedConnectorService => { + // For shadow UCS, use rollout_result for proxy configuration since it takes priority + match &rollout_result.proxy_override { + Some(proxy_override) => { + router_env::logger::debug!( + proxy_override = ?proxy_override, + "Creating updated session state with proxy configuration for Shadow UCS" + ); + create_updated_session_state_with_proxy(state.clone(), proxy_override) + } + None => { + router_env::logger::debug!( + "No proxy override available for Shadow UCS, using original state" + ); + state.clone() + } + } + } + _ => { + // For Direct and UCS flows, use original state + state.clone() + } + }; + + Ok((execution_path, session_state)) +} + +/// Creates a new SessionState with proxy configuration updated from the override +fn create_updated_session_state_with_proxy( + state: SessionState, + proxy_override: &ProxyOverride, +) -> SessionState { + let mut updated_state = state; + + // Create updated configuration with proxy overrides + let mut updated_conf = (*updated_state.conf).clone(); + + // Update proxy URLs with overrides, falling back to existing values + if let Some(ref http_url) = proxy_override.http_url { + updated_conf.proxy.http_url = Some(http_url.clone()); + } + if let Some(ref https_url) = proxy_override.https_url { + updated_conf.proxy.https_url = Some(https_url.clone()); + } + + updated_state.conf = std::sync::Arc::new(updated_conf); + + updated_state } fn decide_execution_path( @@ -332,12 +414,23 @@ fn decide_execution_path( ExecutionPath::ShadowUnifiedConnectorService, )), - // Case 9: UcsConnector with no previous gateway (regardless of shadow rollout) + // Case 9a: UcsConnector with no previous gateway and shadow rollout enabled + // Fresh payment for UCS-enabled connector with shadow mode - use shadow UCS + (ConnectorIntegrationType::UcsConnector, None, ShadowRolloutAvailability::IsAvailable) => { + Ok(( + GatewaySystem::UnifiedConnectorService, + ExecutionPath::ShadowUnifiedConnectorService, + )) + } + + // Case 9b: UcsConnector with no previous gateway and no shadow rollout // Fresh payment for a UCS-enabled connector - use UCS as primary - (ConnectorIntegrationType::UcsConnector, None, _) => Ok(( - GatewaySystem::UnifiedConnectorService, - ExecutionPath::UnifiedConnectorService, - )), + (ConnectorIntegrationType::UcsConnector, None, ShadowRolloutAvailability::NotAvailable) => { + Ok(( + GatewaySystem::UnifiedConnectorService, + ExecutionPath::UnifiedConnectorService, + )) + } // Case 10: UcsConnector previously used UCS (regardless of shadow rollout) // Continue using UCS for consistency in the payment flow @@ -447,9 +540,9 @@ pub async fn should_call_unified_connector_service_for_webhooks( connector_name ); - let should_execute = should_execute_based_on_rollout(state, &config_key).await?; + let rollout_result = should_execute_based_on_rollout(state, &config_key).await?; - Ok(should_execute) + Ok(rollout_result.should_execute) } pub fn build_unified_connector_service_payment_method(