mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-11-01 19:42:27 +08:00
chore(router): choose proxy based on config (#9821)
Co-authored-by: Amitsingh Tanwar <amitsingh.tanwar@juspay.in> Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Co-authored-by: Jeeva Ramachandran <jeeva.ramachandran@juspay.in>
This commit is contained in:
@ -428,6 +428,15 @@ impl Default for Proxy {
|
||||
}
|
||||
}
|
||||
|
||||
/// Proxy override configuration for rollout-based proxy switching
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct ProxyOverride {
|
||||
/// Override HTTP proxy URL
|
||||
pub http_url: Option<String>,
|
||||
/// Override HTTPS proxy URL
|
||||
pub https_url: Option<String>,
|
||||
}
|
||||
|
||||
/// Type alias for `ConnectorIntegrationV2<CreateConnectorCustomer, PaymentFlowData, ConnectorCustomerData, PaymentsResponseData>`
|
||||
pub type CreateCustomerTypeV2 = dyn ConnectorIntegrationV2<
|
||||
CreateConnectorCustomer,
|
||||
|
||||
@ -4280,7 +4280,7 @@ where
|
||||
dyn api::Connector:
|
||||
services::api::ConnectorIntegration<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
{
|
||||
let execution_path = should_call_unified_connector_service(
|
||||
let (execution_path, updated_state) = should_call_unified_connector_service(
|
||||
state,
|
||||
merchant_context,
|
||||
&router_data,
|
||||
@ -4294,7 +4294,7 @@ where
|
||||
// Process through UCS when system is UCS and not handling response or if it is a UCS webhook action
|
||||
ExecutionPath::UnifiedConnectorService => {
|
||||
process_through_ucs(
|
||||
state,
|
||||
&updated_state,
|
||||
req_state,
|
||||
merchant_context,
|
||||
operation,
|
||||
@ -4316,7 +4316,7 @@ where
|
||||
// Process through Direct with Shadow UCS
|
||||
ExecutionPath::ShadowUnifiedConnectorService => {
|
||||
process_through_direct_with_shadow_unified_connector_service(
|
||||
state,
|
||||
&updated_state,
|
||||
req_state,
|
||||
merchant_context,
|
||||
connector,
|
||||
@ -4413,8 +4413,9 @@ where
|
||||
D: ConstructFlowSpecificData<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
RouterData<F, RouterDReq, router_types::PaymentsResponseData>: Feature<F, RouterDReq> + Send,
|
||||
// To construct connector flow specific api
|
||||
dyn api::Connector:
|
||||
services::api::ConnectorIntegration<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
dyn api::Connector: services::api::ConnectorIntegration<F, RouterDReq, router_types::PaymentsResponseData>
|
||||
+ Send
|
||||
+ Sync,
|
||||
{
|
||||
let add_access_token_result = router_data
|
||||
.add_access_token(
|
||||
@ -4811,7 +4812,7 @@ where
|
||||
.await?;
|
||||
|
||||
// do order creation
|
||||
let execution_path = should_call_unified_connector_service(
|
||||
let (execution_path, updated_state) = should_call_unified_connector_service(
|
||||
state,
|
||||
merchant_context,
|
||||
&router_data,
|
||||
@ -4954,7 +4955,7 @@ where
|
||||
services::api::ConnectorIntegration<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
{
|
||||
record_time_taken_with(|| async {
|
||||
let execution_path = should_call_unified_connector_service(
|
||||
let (execution, updated_state) = should_call_unified_connector_service(
|
||||
state,
|
||||
merchant_context,
|
||||
&router_data,
|
||||
@ -4962,7 +4963,7 @@ where
|
||||
call_connector_action.clone(),
|
||||
)
|
||||
.await?;
|
||||
if matches!(execution_path, ExecutionPath::UnifiedConnectorService) {
|
||||
if matches!(execution, ExecutionPath::UnifiedConnectorService) {
|
||||
router_env::logger::info!(
|
||||
"Executing payment through UCS gateway system - payment_id={}, attempt_id={}",
|
||||
payment_data.get_payment_intent().id.get_string_repr(),
|
||||
@ -5022,7 +5023,7 @@ where
|
||||
|
||||
Ok(router_data)
|
||||
} else {
|
||||
if matches!(execution_path, ExecutionPath::ShadowUnifiedConnectorService) {
|
||||
if matches!(execution, ExecutionPath::ShadowUnifiedConnectorService) {
|
||||
router_env::logger::info!(
|
||||
"Shadow UCS mode not implemented in v2, processing through direct path - payment_id={}, attempt_id={}",
|
||||
payment_data.get_payment_intent().id.get_string_repr(),
|
||||
@ -5036,8 +5037,15 @@ where
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
let session_state = if matches!(execution, ExecutionPath::ShadowUnifiedConnectorService) {
|
||||
&updated_state
|
||||
} else {
|
||||
state
|
||||
};
|
||||
|
||||
call_connector_service(
|
||||
state,
|
||||
session_state,
|
||||
req_state,
|
||||
merchant_context,
|
||||
connector,
|
||||
|
||||
@ -2126,34 +2126,145 @@ pub async fn is_ucs_enabled(state: &SessionState, config_key: &str) -> bool {
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct RolloutConfig {
|
||||
pub rollout_percent: f64,
|
||||
pub http_url: Option<String>,
|
||||
pub https_url: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for RolloutConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
rollout_percent: 0.0,
|
||||
http_url: None,
|
||||
https_url: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Re-export ProxyOverride from hyperswitch_interfaces
|
||||
pub use hyperswitch_interfaces::types::ProxyOverride;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RolloutExecutionResult {
|
||||
pub should_execute: bool,
|
||||
pub proxy_override: Option<ProxyOverride>,
|
||||
}
|
||||
|
||||
/// Validates a proxy URL, filtering out invalid ones and logging warnings
|
||||
fn validate_proxy_url(url: Option<String>, url_type: &str) -> Option<String> {
|
||||
url.and_then(|url_str| {
|
||||
if url_str.trim().is_empty() || url::Url::parse(&url_str).is_err() {
|
||||
logger::warn!(
|
||||
invalid_url = %url_str,
|
||||
url_type = url_type,
|
||||
"Invalid proxy URL in rollout config, ignoring"
|
||||
);
|
||||
None
|
||||
} else {
|
||||
Some(url_str)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Creates proxy override with validated URLs and logging
|
||||
fn create_proxy_override(
|
||||
http_url: Option<String>,
|
||||
https_url: Option<String>,
|
||||
) -> Option<ProxyOverride> {
|
||||
let validated_http = validate_proxy_url(http_url, "HTTP");
|
||||
let validated_https = validate_proxy_url(https_url, "HTTPS");
|
||||
|
||||
if validated_http.is_some() || validated_https.is_some() {
|
||||
if let Some(ref http_url) = validated_http {
|
||||
logger::info!(http_url = %http_url, "Using validated HTTP proxy URL from rollout config");
|
||||
}
|
||||
if let Some(ref https_url) = validated_https {
|
||||
logger::info!(https_url = %https_url, "Using validated HTTPS proxy URL from rollout config");
|
||||
}
|
||||
Some(ProxyOverride {
|
||||
http_url: validated_http,
|
||||
https_url: validated_https,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn should_execute_based_on_rollout(
|
||||
state: &SessionState,
|
||||
config_key: &str,
|
||||
) -> RouterResult<bool> {
|
||||
) -> RouterResult<RolloutExecutionResult> {
|
||||
let db = state.store.as_ref();
|
||||
|
||||
match db.find_config_by_key(config_key).await {
|
||||
Ok(rollout_config) => match rollout_config.config.parse::<f64>() {
|
||||
Ok(rollout_percent) => {
|
||||
if !(0.0..=1.0).contains(&rollout_percent) {
|
||||
logger::warn!(
|
||||
rollout_percent,
|
||||
"Rollout percent out of bounds. Must be between 0.0 and 1.0"
|
||||
Ok(rollout_config) => {
|
||||
// Try to parse as JSON first (new format), fallback to float (legacy format)
|
||||
let config_result = match serde_json::from_str::<RolloutConfig>(&rollout_config.config)
|
||||
{
|
||||
Ok(config) => Ok(config),
|
||||
Err(err) => {
|
||||
logger::debug!(
|
||||
error = ?err,
|
||||
config = %rollout_config.config,
|
||||
"Config not in JSON format, trying legacy float format"
|
||||
);
|
||||
return Ok(false);
|
||||
// Fallback to legacy format (simple float)
|
||||
rollout_config.config.parse::<f64>()
|
||||
.map(|percent| RolloutConfig {
|
||||
rollout_percent: percent,
|
||||
http_url: None,
|
||||
https_url: None,
|
||||
})
|
||||
.map_err(|err| {
|
||||
logger::error!(error = ?err, "Failed to parse rollout config as either JSON or float");
|
||||
err
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
let sampled_value: f64 = rand::thread_rng().gen_range(0.0..1.0);
|
||||
Ok(sampled_value < rollout_percent)
|
||||
match config_result {
|
||||
Ok(config) => {
|
||||
if !(0.0..=1.0).contains(&config.rollout_percent) {
|
||||
logger::warn!(
|
||||
rollout_percent = config.rollout_percent,
|
||||
"Rollout percent out of bounds. Must be between 0.0 and 1.0"
|
||||
);
|
||||
let proxy_override =
|
||||
create_proxy_override(config.http_url, config.https_url);
|
||||
|
||||
return Ok(RolloutExecutionResult {
|
||||
should_execute: false,
|
||||
proxy_override,
|
||||
});
|
||||
}
|
||||
|
||||
let sampled_value: f64 = rand::thread_rng().gen_range(0.0..1.0);
|
||||
let should_execute = sampled_value < config.rollout_percent;
|
||||
|
||||
let proxy_override = create_proxy_override(config.http_url, config.https_url);
|
||||
|
||||
Ok(RolloutExecutionResult {
|
||||
should_execute,
|
||||
proxy_override,
|
||||
})
|
||||
}
|
||||
Err(err) => {
|
||||
logger::error!(error = ?err, "Failed to parse rollout config");
|
||||
Ok(RolloutExecutionResult {
|
||||
should_execute: false,
|
||||
proxy_override: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
logger::error!(error = ?err, "Failed to parse rollout percent");
|
||||
Ok(false)
|
||||
}
|
||||
},
|
||||
}
|
||||
Err(err) => {
|
||||
logger::error!(error = ?err, "Failed to fetch rollout config from DB");
|
||||
Ok(false)
|
||||
Ok(RolloutExecutionResult {
|
||||
should_execute: false,
|
||||
proxy_override: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -51,7 +51,8 @@ use crate::{
|
||||
errors::{self, RouterResult},
|
||||
payments::{
|
||||
helpers::{
|
||||
is_ucs_enabled, should_execute_based_on_rollout, MerchantConnectorAccountType,
|
||||
self, is_ucs_enabled, should_execute_based_on_rollout,
|
||||
MerchantConnectorAccountType, ProxyOverride,
|
||||
},
|
||||
OperationSessionGetters, OperationSessionSetters,
|
||||
},
|
||||
@ -77,6 +78,25 @@ type UnifiedConnectorServiceResult = CustomResult<
|
||||
UnifiedConnectorServiceError,
|
||||
>;
|
||||
|
||||
/// Gets the rollout percentage for a given config key
|
||||
async fn get_rollout_percentage(state: &SessionState, config_key: &str) -> Option<f64> {
|
||||
let db = state.store.as_ref();
|
||||
|
||||
match db.find_config_by_key(config_key).await {
|
||||
Ok(rollout_config) => {
|
||||
// Try to parse as JSON first (new format), fallback to float (legacy format)
|
||||
match serde_json::from_str::<helpers::RolloutConfig>(&rollout_config.config) {
|
||||
Ok(config) => Some(config.rollout_percent),
|
||||
Err(_) => {
|
||||
// Fallback to legacy format (simple float)
|
||||
rollout_config.config.parse::<f64>().ok()
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks if the Unified Connector Service (UCS) is available for use
|
||||
async fn check_ucs_availability(state: &SessionState) -> UcsAvailability {
|
||||
let is_client_available = state.grpc_client.unified_connector_service_client.is_some();
|
||||
@ -112,13 +132,13 @@ async fn determine_connector_integration_type(
|
||||
match state.conf.grpc_client.unified_connector_service.as_ref() {
|
||||
Some(ucs_config) => {
|
||||
let is_ucs_only = ucs_config.ucs_only_connectors.contains(&connector);
|
||||
let is_rollout_enabled = should_execute_based_on_rollout(state, config_key).await?;
|
||||
let rollout_result = should_execute_based_on_rollout(state, config_key).await?;
|
||||
|
||||
if is_ucs_only || is_rollout_enabled {
|
||||
if is_ucs_only || rollout_result.should_execute {
|
||||
router_env::logger::debug!(
|
||||
connector = ?connector,
|
||||
ucs_only_list = is_ucs_only,
|
||||
rollout_enabled = is_rollout_enabled,
|
||||
rollout_enabled = rollout_result.should_execute,
|
||||
"Using UcsConnector"
|
||||
);
|
||||
Ok(ConnectorIntegrationType::UcsConnector)
|
||||
@ -146,7 +166,7 @@ pub async fn should_call_unified_connector_service<F: Clone, T, D>(
|
||||
router_data: &RouterData<F, T, PaymentsResponseData>,
|
||||
payment_data: Option<&D>,
|
||||
call_connector_action: CallConnectorAction,
|
||||
) -> RouterResult<ExecutionPath>
|
||||
) -> RouterResult<(ExecutionPath, SessionState)>
|
||||
where
|
||||
D: OperationSessionGetters<F>,
|
||||
{
|
||||
@ -185,15 +205,36 @@ where
|
||||
let previous_gateway = payment_data.and_then(extract_gateway_system_from_payment_intent);
|
||||
let shadow_rollout_key = format!("{}_shadow", rollout_key);
|
||||
|
||||
// Check both rollout keys to determine priority based on shadow percentage
|
||||
let rollout_result = should_execute_based_on_rollout(state, &rollout_key).await?;
|
||||
let shadow_rollout_result = should_execute_based_on_rollout(state, &shadow_rollout_key).await?;
|
||||
|
||||
// Get shadow percentage to determine priority
|
||||
let shadow_percentage = get_rollout_percentage(state, &shadow_rollout_key)
|
||||
.await
|
||||
.unwrap_or(0.0);
|
||||
|
||||
let shadow_rollout_availability =
|
||||
if should_execute_based_on_rollout(state, &shadow_rollout_key).await? {
|
||||
if shadow_rollout_result.should_execute && shadow_percentage != 0.0 {
|
||||
// Shadow is present and percentage is non-zero, use shadow
|
||||
router_env::logger::debug!(
|
||||
shadow_percentage = shadow_percentage,
|
||||
"Shadow rollout is present with non-zero percentage, using shadow"
|
||||
);
|
||||
ShadowRolloutAvailability::IsAvailable
|
||||
} else if rollout_result.should_execute {
|
||||
// Either shadow is 0.0 or not present, use rollout if available
|
||||
router_env::logger::debug!(
|
||||
shadow_percentage = shadow_percentage,
|
||||
"Shadow rollout is 0.0 or not present, using rollout"
|
||||
);
|
||||
ShadowRolloutAvailability::IsAvailable
|
||||
} else {
|
||||
ShadowRolloutAvailability::NotAvailable
|
||||
};
|
||||
|
||||
// Single decision point using pattern matching
|
||||
let (gateway_system, execution_path) = if ucs_availability == UcsAvailability::Disabled {
|
||||
let (_gateway_system, execution_path) = if ucs_availability == UcsAvailability::Disabled {
|
||||
match call_connector_action {
|
||||
CallConnectorAction::UCSConsumeResponse(_)
|
||||
| CallConnectorAction::UCSHandleResponse(_) => {
|
||||
@ -237,17 +278,58 @@ where
|
||||
}
|
||||
};
|
||||
|
||||
router_env::logger::info!(
|
||||
"Payment gateway decision: gateway={:?}, execution_path={:?} - merchant_id={}, connector={}, payment_method={}, flow={}",
|
||||
gateway_system,
|
||||
execution_path,
|
||||
merchant_id,
|
||||
connector_name,
|
||||
payment_method,
|
||||
flow_name
|
||||
);
|
||||
router_env::logger::info!( "Payment gateway decision: execution_path={:?} - merchant_id={}, connector={}, payment_method={}, flow={}", execution_path, merchant_id, connector_name, payment_method, flow_name );
|
||||
|
||||
Ok(execution_path)
|
||||
// Handle proxy configuration for Shadow UCS flows
|
||||
let session_state = match execution_path {
|
||||
ExecutionPath::ShadowUnifiedConnectorService => {
|
||||
// For shadow UCS, use rollout_result for proxy configuration since it takes priority
|
||||
match &rollout_result.proxy_override {
|
||||
Some(proxy_override) => {
|
||||
router_env::logger::debug!(
|
||||
proxy_override = ?proxy_override,
|
||||
"Creating updated session state with proxy configuration for Shadow UCS"
|
||||
);
|
||||
create_updated_session_state_with_proxy(state.clone(), proxy_override)
|
||||
}
|
||||
None => {
|
||||
router_env::logger::debug!(
|
||||
"No proxy override available for Shadow UCS, using original state"
|
||||
);
|
||||
state.clone()
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
// For Direct and UCS flows, use original state
|
||||
state.clone()
|
||||
}
|
||||
};
|
||||
|
||||
Ok((execution_path, session_state))
|
||||
}
|
||||
|
||||
/// Creates a new SessionState with proxy configuration updated from the override
|
||||
fn create_updated_session_state_with_proxy(
|
||||
state: SessionState,
|
||||
proxy_override: &ProxyOverride,
|
||||
) -> SessionState {
|
||||
let mut updated_state = state;
|
||||
|
||||
// Create updated configuration with proxy overrides
|
||||
let mut updated_conf = (*updated_state.conf).clone();
|
||||
|
||||
// Update proxy URLs with overrides, falling back to existing values
|
||||
if let Some(ref http_url) = proxy_override.http_url {
|
||||
updated_conf.proxy.http_url = Some(http_url.clone());
|
||||
}
|
||||
if let Some(ref https_url) = proxy_override.https_url {
|
||||
updated_conf.proxy.https_url = Some(https_url.clone());
|
||||
}
|
||||
|
||||
updated_state.conf = std::sync::Arc::new(updated_conf);
|
||||
|
||||
updated_state
|
||||
}
|
||||
|
||||
fn decide_execution_path(
|
||||
@ -332,12 +414,23 @@ fn decide_execution_path(
|
||||
ExecutionPath::ShadowUnifiedConnectorService,
|
||||
)),
|
||||
|
||||
// Case 9: UcsConnector with no previous gateway (regardless of shadow rollout)
|
||||
// Case 9a: UcsConnector with no previous gateway and shadow rollout enabled
|
||||
// Fresh payment for UCS-enabled connector with shadow mode - use shadow UCS
|
||||
(ConnectorIntegrationType::UcsConnector, None, ShadowRolloutAvailability::IsAvailable) => {
|
||||
Ok((
|
||||
GatewaySystem::UnifiedConnectorService,
|
||||
ExecutionPath::ShadowUnifiedConnectorService,
|
||||
))
|
||||
}
|
||||
|
||||
// Case 9b: UcsConnector with no previous gateway and no shadow rollout
|
||||
// Fresh payment for a UCS-enabled connector - use UCS as primary
|
||||
(ConnectorIntegrationType::UcsConnector, None, _) => Ok((
|
||||
GatewaySystem::UnifiedConnectorService,
|
||||
ExecutionPath::UnifiedConnectorService,
|
||||
)),
|
||||
(ConnectorIntegrationType::UcsConnector, None, ShadowRolloutAvailability::NotAvailable) => {
|
||||
Ok((
|
||||
GatewaySystem::UnifiedConnectorService,
|
||||
ExecutionPath::UnifiedConnectorService,
|
||||
))
|
||||
}
|
||||
|
||||
// Case 10: UcsConnector previously used UCS (regardless of shadow rollout)
|
||||
// Continue using UCS for consistency in the payment flow
|
||||
@ -447,9 +540,9 @@ pub async fn should_call_unified_connector_service_for_webhooks(
|
||||
connector_name
|
||||
);
|
||||
|
||||
let should_execute = should_execute_based_on_rollout(state, &config_key).await?;
|
||||
let rollout_result = should_execute_based_on_rollout(state, &config_key).await?;
|
||||
|
||||
Ok(should_execute)
|
||||
Ok(rollout_result.should_execute)
|
||||
}
|
||||
|
||||
pub fn build_unified_connector_service_payment_method(
|
||||
|
||||
Reference in New Issue
Block a user