mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-29 09:07:09 +08:00
refactor: Improve readability and maintainability of Unified Connector Service–related code (#9820)
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
@ -2301,9 +2301,87 @@ pub enum GatewaySystem {
|
||||
#[default]
|
||||
Direct,
|
||||
UnifiedConnectorService,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Clone,
|
||||
Copy,
|
||||
Debug,
|
||||
Default,
|
||||
Eq,
|
||||
PartialOrd,
|
||||
Ord,
|
||||
Hash,
|
||||
PartialEq,
|
||||
serde::Deserialize,
|
||||
serde::Serialize,
|
||||
strum::Display,
|
||||
strum::VariantNames,
|
||||
strum::EnumIter,
|
||||
strum::EnumString,
|
||||
ToSchema,
|
||||
)]
|
||||
#[router_derive::diesel_enum(storage_type = "text")]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
/// Indicates the execution path through which the payment is processed.
|
||||
pub enum ExecutionPath {
|
||||
#[default]
|
||||
Direct,
|
||||
UnifiedConnectorService,
|
||||
ShadowUnifiedConnectorService,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Clone,
|
||||
Copy,
|
||||
Debug,
|
||||
Eq,
|
||||
PartialOrd,
|
||||
Ord,
|
||||
Hash,
|
||||
PartialEq,
|
||||
serde::Deserialize,
|
||||
serde::Serialize,
|
||||
strum::Display,
|
||||
strum::VariantNames,
|
||||
strum::EnumIter,
|
||||
strum::EnumString,
|
||||
ToSchema,
|
||||
)]
|
||||
#[router_derive::diesel_enum(storage_type = "text")]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub enum ShadowRolloutAvailability {
|
||||
IsAvailable,
|
||||
NotAvailable,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Clone,
|
||||
Copy,
|
||||
Debug,
|
||||
Eq,
|
||||
PartialOrd,
|
||||
Ord,
|
||||
Hash,
|
||||
PartialEq,
|
||||
serde::Deserialize,
|
||||
serde::Serialize,
|
||||
strum::Display,
|
||||
strum::VariantNames,
|
||||
strum::EnumIter,
|
||||
strum::EnumString,
|
||||
ToSchema,
|
||||
)]
|
||||
#[router_derive::diesel_enum(storage_type = "text")]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub enum UcsAvailability {
|
||||
Enabled,
|
||||
Disabled,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Clone,
|
||||
Copy,
|
||||
@ -2330,6 +2408,31 @@ pub enum ExecutionMode {
|
||||
Shadow,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Clone,
|
||||
Copy,
|
||||
Debug,
|
||||
Eq,
|
||||
PartialOrd,
|
||||
Ord,
|
||||
Hash,
|
||||
PartialEq,
|
||||
serde::Deserialize,
|
||||
serde::Serialize,
|
||||
strum::Display,
|
||||
strum::VariantNames,
|
||||
strum::EnumIter,
|
||||
strum::EnumString,
|
||||
ToSchema,
|
||||
)]
|
||||
#[router_derive::diesel_enum(storage_type = "text")]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub enum ConnectorIntegrationType {
|
||||
UcsConnector,
|
||||
DirectConnector,
|
||||
}
|
||||
|
||||
/// The type of the payment that differentiates between normal and various types of mandate payments. Use 'setup_mandate' in case of zero auth flow.
|
||||
#[derive(
|
||||
Clone,
|
||||
|
||||
@ -22,6 +22,7 @@ use std::{
|
||||
vec::IntoIter,
|
||||
};
|
||||
|
||||
#[cfg(feature = "v2")]
|
||||
use external_services::grpc_client;
|
||||
|
||||
#[cfg(feature = "v2")]
|
||||
@ -36,7 +37,7 @@ use api_models::{
|
||||
mandates::RecurringDetails,
|
||||
payments::{self as payments_api},
|
||||
};
|
||||
pub use common_enums::enums::{CallConnectorAction, ExecutionMode, GatewaySystem};
|
||||
pub use common_enums::enums::{CallConnectorAction, ExecutionMode, ExecutionPath, GatewaySystem};
|
||||
use common_types::payments as common_payments_types;
|
||||
use common_utils::{
|
||||
ext_traits::{AsyncExt, StringExt},
|
||||
@ -93,15 +94,9 @@ use self::{
|
||||
operations::{BoxedOperation, Operation, PaymentResponse},
|
||||
routing::{self as self_routing, SessionFlowRoutingInput},
|
||||
};
|
||||
#[cfg(feature = "v1")]
|
||||
use super::unified_connector_service::update_gateway_system_in_feature_metadata;
|
||||
use super::{
|
||||
errors::StorageErrorExt,
|
||||
payment_methods::surcharge_decision_configs,
|
||||
routing::TransactionData,
|
||||
unified_connector_service::{
|
||||
send_comparison_data, should_call_unified_connector_service, ComparisonData,
|
||||
},
|
||||
errors::StorageErrorExt, payment_methods::surcharge_decision_configs, routing::TransactionData,
|
||||
unified_connector_service::should_call_unified_connector_service,
|
||||
};
|
||||
#[cfg(feature = "v1")]
|
||||
use crate::core::blocklist::utils as blocklist_utils;
|
||||
@ -112,6 +107,11 @@ use crate::core::fraud_check as frm_core;
|
||||
#[cfg(feature = "v2")]
|
||||
use crate::core::payment_methods::vault;
|
||||
#[cfg(feature = "v1")]
|
||||
use crate::core::payments::helpers::{
|
||||
process_through_direct, process_through_direct_with_shadow_unified_connector_service,
|
||||
process_through_ucs,
|
||||
};
|
||||
#[cfg(feature = "v1")]
|
||||
use crate::core::routing::helpers as routing_helpers;
|
||||
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
|
||||
use crate::types::api::convert_connector_data_to_routable_connectors;
|
||||
@ -126,7 +126,7 @@ use crate::{
|
||||
payouts,
|
||||
routing::{self as core_routing},
|
||||
unified_authentication_service::types::{ClickToPay, UnifiedAuthenticationService},
|
||||
utils::{self as core_utils},
|
||||
utils as core_utils,
|
||||
},
|
||||
db::StorageInterface,
|
||||
logger,
|
||||
@ -553,7 +553,7 @@ pub async fn payments_operation_core<'a, F, Req, Op, FData, D>(
|
||||
header_payload: HeaderPayload,
|
||||
) -> RouterResult<(D, Req, Option<domain::Customer>, Option<u16>, Option<u128>)>
|
||||
where
|
||||
F: Send + Clone + Sync + 'static + 'a,
|
||||
F: Send + Clone + Sync + 'static,
|
||||
Req: Authenticate + Clone,
|
||||
Op: Operation<F, Req, Data = D> + Send + Sync,
|
||||
D: OperationSessionGetters<F> + OperationSessionSetters<F> + Send + Sync + Clone,
|
||||
@ -568,7 +568,7 @@ where
|
||||
|
||||
// To perform router related operation for PaymentResponse
|
||||
PaymentResponse: Operation<F, FData, Data = D>,
|
||||
FData: Send + Sync + Clone + router_types::Capturable + 'static + 'a + serde::Serialize,
|
||||
FData: Send + Sync + Clone + router_types::Capturable + 'static + serde::Serialize,
|
||||
{
|
||||
let operation: BoxedOperation<'_, F, Req, D> = Box::new(operation);
|
||||
|
||||
@ -2106,7 +2106,7 @@ pub async fn call_surcharge_decision_management_for_session_flow(
|
||||
|
||||
#[cfg(feature = "v1")]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn payments_core<'a, F, Res, Req, Op, FData, D>(
|
||||
pub async fn payments_core<F, Res, Req, Op, FData, D>(
|
||||
state: SessionState,
|
||||
req_state: ReqState,
|
||||
merchant_context: domain::MerchantContext,
|
||||
@ -2119,8 +2119,8 @@ pub async fn payments_core<'a, F, Res, Req, Op, FData, D>(
|
||||
header_payload: HeaderPayload,
|
||||
) -> RouterResponse<Res>
|
||||
where
|
||||
F: Send + Clone + Sync + 'static + 'a,
|
||||
FData: Send + Sync + Clone + router_types::Capturable + 'static + 'a + serde::Serialize,
|
||||
F: Send + Clone + Sync + 'static,
|
||||
FData: Send + Sync + Clone + router_types::Capturable + 'static + serde::Serialize,
|
||||
Op: Operation<F, Req, Data = D> + Send + Sync + Clone,
|
||||
Req: Debug + Authenticate + Clone,
|
||||
D: OperationSessionGetters<F> + OperationSessionSetters<F> + Send + Sync + Clone,
|
||||
@ -4265,8 +4265,8 @@ pub async fn decide_unified_connector_service_call<'a, F, RouterDReq, ApiRequest
|
||||
helpers::MerchantConnectorAccountType,
|
||||
)>
|
||||
where
|
||||
F: Send + Clone + Sync + 'static + 'a,
|
||||
RouterDReq: Send + Sync + Clone + 'static + 'a + serde::Serialize,
|
||||
F: Send + Clone + Sync + 'static,
|
||||
RouterDReq: Send + Sync + Clone + 'static + serde::Serialize,
|
||||
|
||||
// To create connector flow specific interface data
|
||||
D: OperationSessionGetters<F> + OperationSessionSetters<F> + Send + Sync + Clone,
|
||||
@ -4293,7 +4293,7 @@ where
|
||||
record_time_taken_with(|| async {
|
||||
match (execution_path, is_handle_response_action) {
|
||||
// Process through UCS when system is UCS and not handling response
|
||||
(GatewaySystem::UnifiedConnectorService, false) => {
|
||||
(ExecutionPath::UnifiedConnectorService, false) => {
|
||||
process_through_ucs(
|
||||
state,
|
||||
req_state,
|
||||
@ -4312,9 +4312,9 @@ where
|
||||
.await
|
||||
}
|
||||
|
||||
// Process through Direct gateway
|
||||
(GatewaySystem::Direct, _) | (GatewaySystem::UnifiedConnectorService, true) => {
|
||||
process_through_direct(
|
||||
// Process through Direct with Shadow UCS
|
||||
(ExecutionPath::ShadowUnifiedConnectorService, false) => {
|
||||
process_through_direct_with_shadow_unified_connector_service(
|
||||
state,
|
||||
req_state,
|
||||
merchant_context,
|
||||
@ -4337,9 +4337,11 @@ where
|
||||
.await
|
||||
}
|
||||
|
||||
// Process through Direct with Shadow UCS
|
||||
(GatewaySystem::ShadowUnifiedConnectorService, _) => {
|
||||
process_through_direct_with_shadow_unified_connector_service(
|
||||
// Process through Direct gateway
|
||||
(ExecutionPath::Direct, _)
|
||||
| (ExecutionPath::UnifiedConnectorService, true)
|
||||
| (ExecutionPath::ShadowUnifiedConnectorService, true) => {
|
||||
process_through_direct(
|
||||
state,
|
||||
req_state,
|
||||
merchant_context,
|
||||
@ -4366,374 +4368,6 @@ where
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg(feature = "v1")]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip_all)]
|
||||
// Helper function to process through UCS
|
||||
async fn process_through_ucs<'a, F, RouterDReq, ApiRequest, D>(
|
||||
state: &'a SessionState,
|
||||
req_state: ReqState,
|
||||
merchant_context: &'a domain::MerchantContext,
|
||||
operation: &'a BoxedOperation<'a, F, ApiRequest, D>,
|
||||
payment_data: &'a mut D,
|
||||
customer: &Option<domain::Customer>,
|
||||
validate_result: &'a operations::ValidateResult,
|
||||
schedule_time: Option<time::PrimitiveDateTime>,
|
||||
header_payload: HeaderPayload,
|
||||
frm_suggestion: Option<storage_enums::FrmSuggestion>,
|
||||
business_profile: &'a domain::Profile,
|
||||
merchant_connector_account: helpers::MerchantConnectorAccountType,
|
||||
mut router_data: RouterData<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
) -> RouterResult<(
|
||||
RouterData<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
helpers::MerchantConnectorAccountType,
|
||||
)>
|
||||
where
|
||||
F: Send + Clone + Sync + 'static + 'a,
|
||||
RouterDReq: Send + Sync + Clone + 'static + 'a + serde::Serialize,
|
||||
D: OperationSessionGetters<F> + OperationSessionSetters<F> + Send + Sync + Clone,
|
||||
D: ConstructFlowSpecificData<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
RouterData<F, RouterDReq, router_types::PaymentsResponseData>:
|
||||
Feature<F, RouterDReq> + Send + Clone + serde::Serialize,
|
||||
dyn api::Connector:
|
||||
services::api::ConnectorIntegration<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
{
|
||||
router_env::logger::info!(
|
||||
"Processing payment through UCS gateway system - payment_id={}, attempt_id={}",
|
||||
payment_data
|
||||
.get_payment_intent()
|
||||
.payment_id
|
||||
.get_string_repr(),
|
||||
payment_data.get_payment_attempt().attempt_id
|
||||
);
|
||||
|
||||
// Add task to process tracker if needed
|
||||
if should_add_task_to_process_tracker(payment_data) {
|
||||
operation
|
||||
.to_domain()?
|
||||
.add_task_to_process_tracker(
|
||||
state,
|
||||
payment_data.get_payment_attempt(),
|
||||
validate_result.requeue,
|
||||
schedule_time,
|
||||
)
|
||||
.await
|
||||
.map_err(|error| logger::error!(process_tracker_error=?error))
|
||||
.ok();
|
||||
}
|
||||
|
||||
// Update feature metadata to track UCS usage for stickiness
|
||||
update_gateway_system_in_feature_metadata(
|
||||
payment_data,
|
||||
GatewaySystem::UnifiedConnectorService,
|
||||
)?;
|
||||
|
||||
// Update trackers
|
||||
(_, *payment_data) = operation
|
||||
.to_update_tracker()?
|
||||
.update_trackers(
|
||||
state,
|
||||
req_state,
|
||||
payment_data.clone(),
|
||||
customer.clone(),
|
||||
merchant_context.get_merchant_account().storage_scheme,
|
||||
None,
|
||||
merchant_context.get_merchant_key_store(),
|
||||
frm_suggestion,
|
||||
header_payload.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Call UCS
|
||||
let lineage_ids = grpc_client::LineageIds::new(
|
||||
business_profile.merchant_id.clone(),
|
||||
business_profile.get_id().clone(),
|
||||
);
|
||||
|
||||
router_data
|
||||
.call_unified_connector_service(
|
||||
state,
|
||||
&header_payload,
|
||||
lineage_ids,
|
||||
merchant_connector_account.clone(),
|
||||
merchant_context,
|
||||
ExecutionMode::Primary, // UCS is called in primary mode
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok((router_data, merchant_connector_account))
|
||||
}
|
||||
|
||||
#[cfg(feature = "v1")]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip_all)]
|
||||
// Helper function to process through Direct gateway
|
||||
async fn process_through_direct<'a, F, RouterDReq, ApiRequest, D>(
|
||||
state: &'a SessionState,
|
||||
req_state: ReqState,
|
||||
merchant_context: &'a domain::MerchantContext,
|
||||
connector: api::ConnectorData,
|
||||
operation: &'a BoxedOperation<'a, F, ApiRequest, D>,
|
||||
payment_data: &'a mut D,
|
||||
customer: &Option<domain::Customer>,
|
||||
call_connector_action: CallConnectorAction,
|
||||
validate_result: &'a operations::ValidateResult,
|
||||
schedule_time: Option<time::PrimitiveDateTime>,
|
||||
header_payload: HeaderPayload,
|
||||
frm_suggestion: Option<storage_enums::FrmSuggestion>,
|
||||
business_profile: &'a domain::Profile,
|
||||
is_retry_payment: bool,
|
||||
all_keys_required: Option<bool>,
|
||||
merchant_connector_account: helpers::MerchantConnectorAccountType,
|
||||
router_data: RouterData<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
tokenization_action: TokenizationAction,
|
||||
) -> RouterResult<(
|
||||
RouterData<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
helpers::MerchantConnectorAccountType,
|
||||
)>
|
||||
where
|
||||
F: Send + Clone + Sync + 'static + 'a,
|
||||
RouterDReq: Send + Sync + Clone + 'static + 'a + serde::Serialize,
|
||||
D: OperationSessionGetters<F> + OperationSessionSetters<F> + Send + Sync + Clone,
|
||||
D: ConstructFlowSpecificData<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
RouterData<F, RouterDReq, router_types::PaymentsResponseData>:
|
||||
Feature<F, RouterDReq> + Send + Clone + serde::Serialize,
|
||||
dyn api::Connector:
|
||||
services::api::ConnectorIntegration<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
{
|
||||
router_env::logger::info!(
|
||||
"Processing payment through Direct gateway system - payment_id={}, attempt_id={}",
|
||||
payment_data
|
||||
.get_payment_intent()
|
||||
.payment_id
|
||||
.get_string_repr(),
|
||||
payment_data.get_payment_attempt().attempt_id
|
||||
);
|
||||
|
||||
// Update feature metadata to track Direct routing usage for stickiness
|
||||
update_gateway_system_in_feature_metadata(payment_data, GatewaySystem::Direct)?;
|
||||
|
||||
call_connector_service(
|
||||
state,
|
||||
req_state,
|
||||
merchant_context,
|
||||
connector,
|
||||
operation,
|
||||
payment_data,
|
||||
customer,
|
||||
call_connector_action,
|
||||
validate_result,
|
||||
schedule_time,
|
||||
header_payload,
|
||||
frm_suggestion,
|
||||
business_profile,
|
||||
is_retry_payment,
|
||||
all_keys_required,
|
||||
merchant_connector_account,
|
||||
router_data,
|
||||
tokenization_action,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg(feature = "v1")]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip_all)]
|
||||
// Helper function to process through Direct with Shadow UCS
|
||||
async fn process_through_direct_with_shadow_unified_connector_service<
|
||||
'a,
|
||||
F,
|
||||
RouterDReq,
|
||||
ApiRequest,
|
||||
D,
|
||||
>(
|
||||
state: &'a SessionState,
|
||||
req_state: ReqState,
|
||||
merchant_context: &'a domain::MerchantContext,
|
||||
connector: api::ConnectorData,
|
||||
operation: &'a BoxedOperation<'a, F, ApiRequest, D>,
|
||||
payment_data: &'a mut D,
|
||||
customer: &Option<domain::Customer>,
|
||||
call_connector_action: CallConnectorAction,
|
||||
validate_result: &'a operations::ValidateResult,
|
||||
schedule_time: Option<time::PrimitiveDateTime>,
|
||||
header_payload: HeaderPayload,
|
||||
frm_suggestion: Option<storage_enums::FrmSuggestion>,
|
||||
business_profile: &'a domain::Profile,
|
||||
is_retry_payment: bool,
|
||||
all_keys_required: Option<bool>,
|
||||
merchant_connector_account: helpers::MerchantConnectorAccountType,
|
||||
router_data: RouterData<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
tokenization_action: TokenizationAction,
|
||||
) -> RouterResult<(
|
||||
RouterData<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
helpers::MerchantConnectorAccountType,
|
||||
)>
|
||||
where
|
||||
F: Send + Clone + Sync + 'static + 'a,
|
||||
RouterDReq: Send + Sync + Clone + 'static + 'a + serde::Serialize,
|
||||
D: OperationSessionGetters<F> + OperationSessionSetters<F> + Send + Sync + Clone,
|
||||
D: ConstructFlowSpecificData<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
RouterData<F, RouterDReq, router_types::PaymentsResponseData>:
|
||||
Feature<F, RouterDReq> + Send + Clone + serde::Serialize,
|
||||
dyn api::Connector:
|
||||
services::api::ConnectorIntegration<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
{
|
||||
router_env::logger::info!(
|
||||
"Processing payment through Direct gateway system with UCS in shadow mode - payment_id={}, attempt_id={}",
|
||||
payment_data.get_payment_intent().payment_id.get_string_repr(),
|
||||
payment_data.get_payment_attempt().attempt_id
|
||||
);
|
||||
|
||||
// Clone data needed for shadow UCS call
|
||||
let unified_connector_service_router_data = router_data.clone();
|
||||
let unified_connector_service_merchant_connector_account = merchant_connector_account.clone();
|
||||
let unified_connector_service_merchant_context = merchant_context.clone();
|
||||
let unified_connector_service_header_payload = header_payload.clone();
|
||||
let unified_connector_service_state = state.clone();
|
||||
|
||||
let lineage_ids = grpc_client::LineageIds::new(
|
||||
business_profile.merchant_id.clone(),
|
||||
business_profile.get_id().clone(),
|
||||
);
|
||||
|
||||
// Update feature metadata to track Direct routing usage for stickiness
|
||||
update_gateway_system_in_feature_metadata(payment_data, GatewaySystem::Direct)?;
|
||||
|
||||
// Call Direct connector service
|
||||
let result = call_connector_service(
|
||||
state,
|
||||
req_state,
|
||||
merchant_context,
|
||||
connector,
|
||||
operation,
|
||||
payment_data,
|
||||
customer,
|
||||
call_connector_action,
|
||||
validate_result,
|
||||
schedule_time,
|
||||
header_payload,
|
||||
frm_suggestion,
|
||||
business_profile,
|
||||
is_retry_payment,
|
||||
all_keys_required,
|
||||
merchant_connector_account,
|
||||
router_data,
|
||||
tokenization_action,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Spawn shadow UCS call in background
|
||||
let direct_router_data = result.0.clone();
|
||||
tokio::spawn(async move {
|
||||
execute_shadow_unified_connector_service_call(
|
||||
unified_connector_service_state,
|
||||
unified_connector_service_router_data,
|
||||
direct_router_data,
|
||||
unified_connector_service_header_payload,
|
||||
lineage_ids,
|
||||
unified_connector_service_merchant_connector_account,
|
||||
unified_connector_service_merchant_context,
|
||||
)
|
||||
.await
|
||||
});
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
#[cfg(feature = "v1")]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip_all)]
|
||||
// Helper function to execute shadow UCS call
|
||||
async fn execute_shadow_unified_connector_service_call<F, RouterDReq>(
|
||||
state: SessionState,
|
||||
mut unified_connector_service_router_data: RouterData<
|
||||
F,
|
||||
RouterDReq,
|
||||
router_types::PaymentsResponseData,
|
||||
>,
|
||||
direct_router_data: RouterData<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
header_payload: HeaderPayload,
|
||||
lineage_ids: grpc_client::LineageIds,
|
||||
merchant_connector_account: helpers::MerchantConnectorAccountType,
|
||||
merchant_context: domain::MerchantContext,
|
||||
) where
|
||||
F: Send + Clone + Sync + 'static,
|
||||
RouterDReq: Send + Sync + Clone + 'static + serde::Serialize,
|
||||
RouterData<F, RouterDReq, router_types::PaymentsResponseData>:
|
||||
Feature<F, RouterDReq> + Send + Clone + serde::Serialize,
|
||||
dyn api::Connector:
|
||||
services::api::ConnectorIntegration<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
{
|
||||
// Call UCS in shadow mode
|
||||
let _unified_connector_service_result = unified_connector_service_router_data
|
||||
.call_unified_connector_service(
|
||||
&state,
|
||||
&header_payload,
|
||||
lineage_ids,
|
||||
merchant_connector_account,
|
||||
&merchant_context,
|
||||
ExecutionMode::Shadow, // Shadow mode for UCS
|
||||
)
|
||||
.await
|
||||
.map_err(|e| logger::debug!("Shadow UCS call failed: {:?}", e));
|
||||
|
||||
// Compare results
|
||||
match serialize_router_data_and_send_to_comparison_service(
|
||||
&state,
|
||||
direct_router_data,
|
||||
unified_connector_service_router_data,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => logger::debug!("Shadow UCS comparison completed successfully"),
|
||||
Err(e) => logger::debug!("Shadow UCS comparison failed: {:?}", e),
|
||||
}
|
||||
}
|
||||
|
||||
async fn serialize_router_data_and_send_to_comparison_service<F, RouterDReq>(
|
||||
state: &SessionState,
|
||||
hyperswitch_router_data: RouterData<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
unified_connector_service_router_data: RouterData<
|
||||
F,
|
||||
RouterDReq,
|
||||
router_types::PaymentsResponseData,
|
||||
>,
|
||||
) -> RouterResult<()>
|
||||
where
|
||||
F: Send + Clone + Sync + 'static,
|
||||
RouterDReq: Send + Sync + Clone + 'static + serde::Serialize,
|
||||
{
|
||||
logger::info!("Simulating UCS call for shadow mode comparison");
|
||||
let hyperswitch_data = match serde_json::to_value(hyperswitch_router_data) {
|
||||
Ok(data) => Secret::new(data),
|
||||
Err(_) => {
|
||||
logger::debug!("Failed to serialize HS router data");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let unified_connector_service_data =
|
||||
match serde_json::to_value(unified_connector_service_router_data) {
|
||||
Ok(data) => Secret::new(data),
|
||||
Err(_) => {
|
||||
logger::debug!("Failed to serialize UCS router data");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let comparison_data = ComparisonData {
|
||||
hyperswitch_data,
|
||||
unified_connector_service_data,
|
||||
};
|
||||
let _ = send_comparison_data(state, comparison_data)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
logger::debug!("Failed to send comparison data: {:?}", e);
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn record_time_taken_with<F, Fut, R>(f: F) -> RouterResult<R>
|
||||
where
|
||||
F: FnOnce() -> Fut,
|
||||
@ -5178,7 +4812,7 @@ where
|
||||
.await?;
|
||||
|
||||
// do order creation
|
||||
let should_call_unified_connector_service = should_call_unified_connector_service(
|
||||
let execution_path = should_call_unified_connector_service(
|
||||
state,
|
||||
merchant_context,
|
||||
&router_data,
|
||||
@ -5186,10 +4820,8 @@ where
|
||||
)
|
||||
.await?;
|
||||
|
||||
let (connector_request, should_continue_further) = if matches!(
|
||||
should_call_unified_connector_service,
|
||||
GatewaySystem::Direct | GatewaySystem::ShadowUnifiedConnectorService
|
||||
) {
|
||||
let (connector_request, should_continue_further) =
|
||||
if matches!(execution_path, ExecutionPath::Direct) {
|
||||
let mut should_continue_further = true;
|
||||
|
||||
let should_continue = match router_data
|
||||
@ -5202,16 +4834,17 @@ where
|
||||
}
|
||||
|
||||
// Set the response in routerdata response to carry forward
|
||||
router_data
|
||||
.update_router_data_with_create_order_response(create_order_response.clone());
|
||||
router_data.update_router_data_with_create_order_response(
|
||||
create_order_response.clone(),
|
||||
);
|
||||
create_order_response.create_order_result.ok().map(|_| ())
|
||||
}
|
||||
// If create order is not required, then we can proceed with further processing
|
||||
None => Some(()),
|
||||
};
|
||||
|
||||
let should_continue: (Option<common_utils::request::Request>, bool) = match should_continue
|
||||
{
|
||||
let should_continue: (Option<common_utils::request::Request>, bool) =
|
||||
match should_continue {
|
||||
Some(_) => {
|
||||
router_data
|
||||
.build_flow_specific_connector_request(
|
||||
@ -5246,7 +4879,7 @@ where
|
||||
.await?;
|
||||
|
||||
record_time_taken_with(|| async {
|
||||
if matches!(should_call_unified_connector_service, GatewaySystem::UnifiedConnectorService) {
|
||||
if matches!(execution_path, ExecutionPath::UnifiedConnectorService) {
|
||||
router_env::logger::info!(
|
||||
"Processing payment through UCS gateway system- payment_id={}, attempt_id={}",
|
||||
payment_data.get_payment_intent().id.get_string_repr(),
|
||||
@ -5313,14 +4946,14 @@ where
|
||||
services::api::ConnectorIntegration<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
{
|
||||
record_time_taken_with(|| async {
|
||||
let execution = should_call_unified_connector_service(
|
||||
let execution_path = should_call_unified_connector_service(
|
||||
state,
|
||||
merchant_context,
|
||||
&router_data,
|
||||
Some(payment_data),
|
||||
)
|
||||
.await?;
|
||||
if matches!(execution, GatewaySystem::UnifiedConnectorService) {
|
||||
if matches!(execution_path, ExecutionPath::UnifiedConnectorService) {
|
||||
router_env::logger::info!(
|
||||
"Executing payment through UCS gateway system - payment_id={}, attempt_id={}",
|
||||
payment_data.get_payment_intent().id.get_string_repr(),
|
||||
@ -5371,7 +5004,7 @@ where
|
||||
|
||||
Ok(router_data)
|
||||
} else {
|
||||
if matches!(execution, GatewaySystem::ShadowUnifiedConnectorService) {
|
||||
if matches!(execution_path, ExecutionPath::ShadowUnifiedConnectorService) {
|
||||
router_env::logger::info!(
|
||||
"Shadow UCS mode not implemented in v2, processing through direct path - payment_id={}, attempt_id={}",
|
||||
payment_data.get_payment_intent().id.get_string_repr(),
|
||||
|
||||
@ -11,6 +11,8 @@ use api_models::{
|
||||
payments::{additional_info as payment_additional_types, RequestSurchargeDetails},
|
||||
};
|
||||
use base64::Engine;
|
||||
#[cfg(feature = "v1")]
|
||||
use common_enums::enums::{CallConnectorAction, ExecutionMode, GatewaySystem};
|
||||
use common_enums::ConnectorType;
|
||||
#[cfg(feature = "v2")]
|
||||
use common_utils::id_type::GenerateId;
|
||||
@ -29,6 +31,8 @@ use common_utils::{
|
||||
use diesel_models::enums;
|
||||
// TODO : Evaluate all the helper functions ()
|
||||
use error_stack::{report, ResultExt};
|
||||
#[cfg(feature = "v1")]
|
||||
use external_services::grpc_client;
|
||||
use futures::future::Either;
|
||||
#[cfg(feature = "v1")]
|
||||
use hyperswitch_domain_models::payments::payment_intent::CustomerData;
|
||||
@ -67,6 +71,21 @@ use super::{
|
||||
operations::{BoxedOperation, Operation, PaymentResponse},
|
||||
CustomerDetails, PaymentData,
|
||||
};
|
||||
#[cfg(feature = "v1")]
|
||||
use crate::core::{
|
||||
payments::{
|
||||
call_connector_service,
|
||||
flows::{ConstructFlowSpecificData, Feature},
|
||||
operations::ValidateResult as OperationsValidateResult,
|
||||
should_add_task_to_process_tracker, OperationSessionGetters, OperationSessionSetters,
|
||||
TokenizationAction,
|
||||
},
|
||||
unified_connector_service::{
|
||||
send_comparison_data, update_gateway_system_in_feature_metadata, ComparisonData,
|
||||
},
|
||||
};
|
||||
#[cfg(feature = "v1")]
|
||||
use crate::routes;
|
||||
use crate::{
|
||||
configs::settings::{ConnectorRequestReferenceIdConfig, TempLockerEnableConfig},
|
||||
connector,
|
||||
@ -5627,7 +5646,7 @@ pub async fn get_apple_pay_retryable_connectors<F, D>(
|
||||
) -> CustomResult<Option<Vec<api::ConnectorRoutingData>>, errors::ApiErrorResponse>
|
||||
where
|
||||
F: Send + Clone,
|
||||
D: payments::OperationSessionGetters<F> + Send,
|
||||
D: OperationSessionGetters<F> + Send,
|
||||
{
|
||||
let profile_id = business_profile.get_id();
|
||||
|
||||
@ -7327,7 +7346,7 @@ pub async fn override_setup_future_usage_to_on_session<F, D>(
|
||||
) -> CustomResult<(), errors::ApiErrorResponse>
|
||||
where
|
||||
F: Clone,
|
||||
D: payments::OperationSessionGetters<F> + payments::OperationSessionSetters<F> + Send,
|
||||
D: OperationSessionGetters<F> + OperationSessionSetters<F> + Send,
|
||||
{
|
||||
if payment_data.get_payment_intent().setup_future_usage == Some(enums::FutureUsage::OffSession)
|
||||
{
|
||||
@ -7835,3 +7854,360 @@ pub fn is_stored_credential(
|
||||
is_stored_credential_prev
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "v1")]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip_all)]
|
||||
// Helper function to process through UCS gateway
|
||||
pub async fn process_through_ucs<'a, F, RouterDReq, ApiRequest, D>(
|
||||
state: &'a SessionState,
|
||||
req_state: routes::app::ReqState,
|
||||
merchant_context: &'a domain::MerchantContext,
|
||||
operation: &'a BoxedOperation<'a, F, ApiRequest, D>,
|
||||
payment_data: &'a mut D,
|
||||
customer: &Option<domain::Customer>,
|
||||
validate_result: &'a OperationsValidateResult,
|
||||
schedule_time: Option<time::PrimitiveDateTime>,
|
||||
header_payload: domain_payments::HeaderPayload,
|
||||
frm_suggestion: Option<enums::FrmSuggestion>,
|
||||
business_profile: &'a domain::Profile,
|
||||
merchant_connector_account: MerchantConnectorAccountType,
|
||||
mut router_data: RouterData<F, RouterDReq, PaymentsResponseData>,
|
||||
) -> RouterResult<(
|
||||
RouterData<F, RouterDReq, PaymentsResponseData>,
|
||||
MerchantConnectorAccountType,
|
||||
)>
|
||||
where
|
||||
F: Send + Clone + Sync + 'static,
|
||||
RouterDReq: Send + Sync + Clone + 'static + Serialize,
|
||||
D: OperationSessionGetters<F> + OperationSessionSetters<F> + Send + Sync + Clone,
|
||||
D: ConstructFlowSpecificData<F, RouterDReq, PaymentsResponseData>,
|
||||
RouterData<F, RouterDReq, PaymentsResponseData>:
|
||||
Feature<F, RouterDReq> + Send + Clone + Serialize,
|
||||
dyn api::Connector: services::api::ConnectorIntegration<F, RouterDReq, PaymentsResponseData>,
|
||||
{
|
||||
router_env::logger::info!(
|
||||
"Processing payment through UCS gateway system - payment_id={}, attempt_id={}",
|
||||
payment_data
|
||||
.get_payment_intent()
|
||||
.payment_id
|
||||
.get_string_repr(),
|
||||
payment_data.get_payment_attempt().attempt_id
|
||||
);
|
||||
|
||||
// Add task to process tracker if needed
|
||||
if should_add_task_to_process_tracker(payment_data) {
|
||||
operation
|
||||
.to_domain()?
|
||||
.add_task_to_process_tracker(
|
||||
state,
|
||||
payment_data.get_payment_attempt(),
|
||||
validate_result.requeue,
|
||||
schedule_time,
|
||||
)
|
||||
.await
|
||||
.map_err(|error| router_env::logger::error!(process_tracker_error=?error))
|
||||
.ok();
|
||||
}
|
||||
|
||||
// Update feature metadata to track UCS usage for stickiness
|
||||
update_gateway_system_in_feature_metadata(
|
||||
payment_data,
|
||||
GatewaySystem::UnifiedConnectorService,
|
||||
)?;
|
||||
|
||||
// Update trackers
|
||||
(_, *payment_data) = operation
|
||||
.to_update_tracker()?
|
||||
.update_trackers(
|
||||
state,
|
||||
req_state,
|
||||
payment_data.clone(),
|
||||
customer.clone(),
|
||||
merchant_context.get_merchant_account().storage_scheme,
|
||||
None,
|
||||
merchant_context.get_merchant_key_store(),
|
||||
frm_suggestion,
|
||||
header_payload.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Call UCS
|
||||
let lineage_ids = grpc_client::LineageIds::new(
|
||||
business_profile.merchant_id.clone(),
|
||||
business_profile.get_id().clone(),
|
||||
);
|
||||
|
||||
router_data
|
||||
.call_unified_connector_service(
|
||||
state,
|
||||
&header_payload,
|
||||
lineage_ids,
|
||||
merchant_connector_account.clone(),
|
||||
merchant_context,
|
||||
ExecutionMode::Primary, // UCS is called in primary mode
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok((router_data, merchant_connector_account))
|
||||
}
|
||||
|
||||
#[cfg(feature = "v1")]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip_all)]
|
||||
// Helper function to process through Direct gateway
|
||||
pub async fn process_through_direct<'a, F, RouterDReq, ApiRequest, D>(
|
||||
state: &'a SessionState,
|
||||
req_state: routes::app::ReqState,
|
||||
merchant_context: &'a domain::MerchantContext,
|
||||
connector: api::ConnectorData,
|
||||
operation: &'a BoxedOperation<'a, F, ApiRequest, D>,
|
||||
payment_data: &'a mut D,
|
||||
customer: &Option<domain::Customer>,
|
||||
call_connector_action: CallConnectorAction,
|
||||
validate_result: &'a OperationsValidateResult,
|
||||
schedule_time: Option<time::PrimitiveDateTime>,
|
||||
header_payload: domain_payments::HeaderPayload,
|
||||
frm_suggestion: Option<enums::FrmSuggestion>,
|
||||
business_profile: &'a domain::Profile,
|
||||
is_retry_payment: bool,
|
||||
all_keys_required: Option<bool>,
|
||||
merchant_connector_account: MerchantConnectorAccountType,
|
||||
router_data: RouterData<F, RouterDReq, PaymentsResponseData>,
|
||||
tokenization_action: TokenizationAction,
|
||||
) -> RouterResult<(
|
||||
RouterData<F, RouterDReq, PaymentsResponseData>,
|
||||
MerchantConnectorAccountType,
|
||||
)>
|
||||
where
|
||||
F: Send + Clone + Sync + 'static,
|
||||
RouterDReq: Send + Sync + Clone + 'static + Serialize,
|
||||
D: OperationSessionGetters<F> + OperationSessionSetters<F> + Send + Sync + Clone,
|
||||
D: ConstructFlowSpecificData<F, RouterDReq, PaymentsResponseData>,
|
||||
RouterData<F, RouterDReq, PaymentsResponseData>:
|
||||
Feature<F, RouterDReq> + Send + Clone + Serialize,
|
||||
dyn api::Connector: services::api::ConnectorIntegration<F, RouterDReq, PaymentsResponseData>,
|
||||
{
|
||||
router_env::logger::info!(
|
||||
"Processing payment through Direct gateway system - payment_id={}, attempt_id={}",
|
||||
payment_data
|
||||
.get_payment_intent()
|
||||
.payment_id
|
||||
.get_string_repr(),
|
||||
payment_data.get_payment_attempt().attempt_id
|
||||
);
|
||||
|
||||
// Update feature metadata to track Direct routing usage for stickiness
|
||||
update_gateway_system_in_feature_metadata(payment_data, GatewaySystem::Direct)?;
|
||||
|
||||
call_connector_service(
|
||||
state,
|
||||
req_state,
|
||||
merchant_context,
|
||||
connector,
|
||||
operation,
|
||||
payment_data,
|
||||
customer,
|
||||
call_connector_action,
|
||||
validate_result,
|
||||
schedule_time,
|
||||
header_payload,
|
||||
frm_suggestion,
|
||||
business_profile,
|
||||
is_retry_payment,
|
||||
all_keys_required,
|
||||
merchant_connector_account,
|
||||
router_data,
|
||||
tokenization_action,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg(feature = "v1")]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip_all)]
|
||||
// Helper function to process through Direct with Shadow UCS
|
||||
pub async fn process_through_direct_with_shadow_unified_connector_service<
|
||||
'a,
|
||||
F,
|
||||
RouterDReq,
|
||||
ApiRequest,
|
||||
D,
|
||||
>(
|
||||
state: &'a SessionState,
|
||||
req_state: routes::app::ReqState,
|
||||
merchant_context: &'a domain::MerchantContext,
|
||||
connector: api::ConnectorData,
|
||||
operation: &'a BoxedOperation<'a, F, ApiRequest, D>,
|
||||
payment_data: &'a mut D,
|
||||
customer: &Option<domain::Customer>,
|
||||
call_connector_action: CallConnectorAction,
|
||||
validate_result: &'a OperationsValidateResult,
|
||||
schedule_time: Option<time::PrimitiveDateTime>,
|
||||
header_payload: domain_payments::HeaderPayload,
|
||||
frm_suggestion: Option<enums::FrmSuggestion>,
|
||||
business_profile: &'a domain::Profile,
|
||||
is_retry_payment: bool,
|
||||
all_keys_required: Option<bool>,
|
||||
merchant_connector_account: MerchantConnectorAccountType,
|
||||
router_data: RouterData<F, RouterDReq, PaymentsResponseData>,
|
||||
tokenization_action: TokenizationAction,
|
||||
) -> RouterResult<(
|
||||
RouterData<F, RouterDReq, PaymentsResponseData>,
|
||||
MerchantConnectorAccountType,
|
||||
)>
|
||||
where
|
||||
F: Send + Clone + Sync + 'static,
|
||||
RouterDReq: Send + Sync + Clone + 'static + Serialize,
|
||||
D: OperationSessionGetters<F> + OperationSessionSetters<F> + Send + Sync + Clone,
|
||||
D: ConstructFlowSpecificData<F, RouterDReq, PaymentsResponseData>,
|
||||
RouterData<F, RouterDReq, PaymentsResponseData>:
|
||||
Feature<F, RouterDReq> + Send + Clone + Serialize,
|
||||
dyn api::Connector: services::api::ConnectorIntegration<F, RouterDReq, PaymentsResponseData>,
|
||||
{
|
||||
router_env::logger::info!(
|
||||
"Processing payment through Direct gateway system with UCS in shadow mode - payment_id={}, attempt_id={}",
|
||||
payment_data.get_payment_intent().payment_id.get_string_repr(),
|
||||
payment_data.get_payment_attempt().attempt_id
|
||||
);
|
||||
|
||||
// Clone data needed for shadow UCS call
|
||||
let unified_connector_service_router_data = router_data.clone();
|
||||
let unified_connector_service_merchant_connector_account = merchant_connector_account.clone();
|
||||
let unified_connector_service_merchant_context = merchant_context.clone();
|
||||
let unified_connector_service_header_payload = header_payload.clone();
|
||||
let unified_connector_service_state = state.clone();
|
||||
|
||||
let lineage_ids = grpc_client::LineageIds::new(
|
||||
business_profile.merchant_id.clone(),
|
||||
business_profile.get_id().clone(),
|
||||
);
|
||||
|
||||
// Update feature metadata to track Direct routing usage for stickiness
|
||||
update_gateway_system_in_feature_metadata(payment_data, GatewaySystem::Direct)?;
|
||||
|
||||
// Call Direct connector service
|
||||
let result = call_connector_service(
|
||||
state,
|
||||
req_state,
|
||||
merchant_context,
|
||||
connector,
|
||||
operation,
|
||||
payment_data,
|
||||
customer,
|
||||
call_connector_action,
|
||||
validate_result,
|
||||
schedule_time,
|
||||
header_payload,
|
||||
frm_suggestion,
|
||||
business_profile,
|
||||
is_retry_payment,
|
||||
all_keys_required,
|
||||
merchant_connector_account,
|
||||
router_data,
|
||||
tokenization_action,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Spawn shadow UCS call in background
|
||||
let direct_router_data = result.0.clone();
|
||||
tokio::spawn(async move {
|
||||
execute_shadow_unified_connector_service_call(
|
||||
unified_connector_service_state,
|
||||
unified_connector_service_router_data,
|
||||
direct_router_data,
|
||||
unified_connector_service_header_payload,
|
||||
lineage_ids,
|
||||
unified_connector_service_merchant_connector_account,
|
||||
unified_connector_service_merchant_context,
|
||||
)
|
||||
.await
|
||||
});
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
#[cfg(feature = "v1")]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip_all)]
|
||||
// Helper function to execute shadow UCS call
|
||||
pub async fn execute_shadow_unified_connector_service_call<F, RouterDReq>(
|
||||
state: SessionState,
|
||||
mut unified_connector_service_router_data: RouterData<F, RouterDReq, PaymentsResponseData>,
|
||||
direct_router_data: RouterData<F, RouterDReq, PaymentsResponseData>,
|
||||
header_payload: domain_payments::HeaderPayload,
|
||||
lineage_ids: grpc_client::LineageIds,
|
||||
merchant_connector_account: MerchantConnectorAccountType,
|
||||
merchant_context: domain::MerchantContext,
|
||||
) where
|
||||
F: Send + Clone + Sync + 'static,
|
||||
RouterDReq: Send + Sync + Clone + 'static + Serialize,
|
||||
RouterData<F, RouterDReq, PaymentsResponseData>:
|
||||
Feature<F, RouterDReq> + Send + Clone + Serialize,
|
||||
dyn api::Connector: services::api::ConnectorIntegration<F, RouterDReq, PaymentsResponseData>,
|
||||
{
|
||||
// Call UCS in shadow mode
|
||||
let _unified_connector_service_result = unified_connector_service_router_data
|
||||
.call_unified_connector_service(
|
||||
&state,
|
||||
&header_payload,
|
||||
lineage_ids,
|
||||
merchant_connector_account,
|
||||
&merchant_context,
|
||||
ExecutionMode::Shadow, // Shadow mode for UCS
|
||||
)
|
||||
.await
|
||||
.map_err(|e| router_env::logger::debug!("Shadow UCS call failed: {:?}", e));
|
||||
|
||||
// Compare results
|
||||
match serialize_router_data_and_send_to_comparison_service(
|
||||
&state,
|
||||
direct_router_data,
|
||||
unified_connector_service_router_data,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => router_env::logger::debug!("Shadow UCS comparison completed successfully"),
|
||||
Err(e) => router_env::logger::debug!("Shadow UCS comparison failed: {:?}", e),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "v1")]
|
||||
pub async fn serialize_router_data_and_send_to_comparison_service<F, RouterDReq>(
|
||||
state: &SessionState,
|
||||
hyperswitch_router_data: RouterData<F, RouterDReq, PaymentsResponseData>,
|
||||
unified_connector_service_router_data: RouterData<F, RouterDReq, PaymentsResponseData>,
|
||||
) -> RouterResult<()>
|
||||
where
|
||||
F: Send + Clone + Sync + 'static,
|
||||
RouterDReq: Send + Sync + Clone + 'static + Serialize,
|
||||
{
|
||||
router_env::logger::info!("Simulating UCS call for shadow mode comparison");
|
||||
let hyperswitch_data = match serde_json::to_value(hyperswitch_router_data) {
|
||||
Ok(data) => masking::Secret::new(data),
|
||||
Err(_) => {
|
||||
router_env::logger::debug!("Failed to serialize HS router data");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let unified_connector_service_data =
|
||||
match serde_json::to_value(unified_connector_service_router_data) {
|
||||
Ok(data) => masking::Secret::new(data),
|
||||
Err(_) => {
|
||||
router_env::logger::debug!("Failed to serialize UCS router data");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let comparison_data = ComparisonData {
|
||||
hyperswitch_data,
|
||||
unified_connector_service_data,
|
||||
};
|
||||
let _ = send_comparison_data(state, comparison_data)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
router_env::logger::debug!("Failed to send comparison data: {:?}", e);
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -48,8 +48,8 @@ pub async fn do_gsm_actions<'a, F, ApiRequest, FData, D>(
|
||||
business_profile: &domain::Profile,
|
||||
) -> RouterResult<types::RouterData<F, FData, types::PaymentsResponseData>>
|
||||
where
|
||||
F: Clone + Send + Sync + 'static + 'a,
|
||||
FData: Send + Sync + types::Capturable + Clone + 'static + 'a + serde::Serialize,
|
||||
F: Clone + Send + Sync + 'static,
|
||||
FData: Send + Sync + types::Capturable + Clone + 'static + serde::Serialize,
|
||||
payments::PaymentResponse: operations::Operation<F, FData>,
|
||||
D: payments::OperationSessionGetters<F>
|
||||
+ payments::OperationSessionSetters<F>
|
||||
@ -356,8 +356,8 @@ pub async fn do_retry<'a, F, ApiRequest, FData, D>(
|
||||
routing_decision: Option<routing_helpers::RoutingDecisionData>,
|
||||
) -> RouterResult<types::RouterData<F, FData, types::PaymentsResponseData>>
|
||||
where
|
||||
F: Clone + Send + Sync + 'static + 'a,
|
||||
FData: Send + Sync + types::Capturable + Clone + 'static + 'a + serde::Serialize,
|
||||
F: Clone + Send + Sync + 'static,
|
||||
FData: Send + Sync + types::Capturable + Clone + 'static + serde::Serialize,
|
||||
payments::PaymentResponse: operations::Operation<F, FData>,
|
||||
D: payments::OperationSessionGetters<F>
|
||||
+ payments::OperationSessionSetters<F>
|
||||
|
||||
@ -4,7 +4,8 @@ use api_models::admin;
|
||||
#[cfg(feature = "v2")]
|
||||
use base64::Engine;
|
||||
use common_enums::{
|
||||
connector_enums::Connector, AttemptStatus, ExecutionMode, GatewaySystem, PaymentMethodType,
|
||||
connector_enums::Connector, AttemptStatus, ConnectorIntegrationType, ExecutionMode,
|
||||
ExecutionPath, GatewaySystem, PaymentMethodType, ShadowRolloutAvailability, UcsAvailability,
|
||||
};
|
||||
#[cfg(feature = "v2")]
|
||||
use common_utils::consts::BASE64_ENGINE;
|
||||
@ -75,12 +76,75 @@ type UnifiedConnectorServiceResult = CustomResult<
|
||||
UnifiedConnectorServiceError,
|
||||
>;
|
||||
|
||||
/// Checks if the Unified Connector Service (UCS) is available for use
|
||||
async fn check_ucs_availability(state: &SessionState) -> UcsAvailability {
|
||||
let is_client_available = state.grpc_client.unified_connector_service_client.is_some();
|
||||
|
||||
let is_enabled = is_ucs_enabled(state, consts::UCS_ENABLED).await;
|
||||
|
||||
match (is_client_available, is_enabled) {
|
||||
(true, true) => {
|
||||
router_env::logger::debug!("UCS is available and enabled");
|
||||
UcsAvailability::Enabled
|
||||
}
|
||||
_ => {
|
||||
router_env::logger::debug!(
|
||||
"UCS client is {} and UCS is {} in configuration",
|
||||
if is_client_available {
|
||||
"available"
|
||||
} else {
|
||||
"not available"
|
||||
},
|
||||
if is_enabled { "enabled" } else { "not enabled" }
|
||||
);
|
||||
UcsAvailability::Disabled
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Determines the connector integration type based on UCS configuration or on both
|
||||
async fn determine_connector_integration_type(
|
||||
state: &SessionState,
|
||||
connector: Connector,
|
||||
config_key: &str,
|
||||
) -> RouterResult<ConnectorIntegrationType> {
|
||||
match state.conf.grpc_client.unified_connector_service.as_ref() {
|
||||
Some(ucs_config) => {
|
||||
let is_ucs_only = ucs_config.ucs_only_connectors.contains(&connector);
|
||||
let is_rollout_enabled = should_execute_based_on_rollout(state, config_key).await?;
|
||||
|
||||
if is_ucs_only || is_rollout_enabled {
|
||||
router_env::logger::debug!(
|
||||
connector = ?connector,
|
||||
ucs_only_list = is_ucs_only,
|
||||
rollout_enabled = is_rollout_enabled,
|
||||
"Using UcsConnector"
|
||||
);
|
||||
Ok(ConnectorIntegrationType::UcsConnector)
|
||||
} else {
|
||||
router_env::logger::debug!(
|
||||
connector = ?connector,
|
||||
"Using DirectConnector - not in ucs_only_list and rollout not enabled"
|
||||
);
|
||||
Ok(ConnectorIntegrationType::DirectConnector)
|
||||
}
|
||||
}
|
||||
None => {
|
||||
router_env::logger::debug!(
|
||||
connector = ?connector,
|
||||
"UCS config not present, using DirectConnector"
|
||||
);
|
||||
Ok(ConnectorIntegrationType::DirectConnector)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn should_call_unified_connector_service<F: Clone, T, D>(
|
||||
state: &SessionState,
|
||||
merchant_context: &MerchantContext,
|
||||
router_data: &RouterData<F, T, PaymentsResponseData>,
|
||||
payment_data: Option<&D>,
|
||||
) -> RouterResult<GatewaySystem>
|
||||
) -> RouterResult<ExecutionPath>
|
||||
where
|
||||
D: OperationSessionGetters<F>,
|
||||
{
|
||||
@ -98,63 +162,10 @@ where
|
||||
let payment_method = router_data.payment_method.to_string();
|
||||
let flow_name = get_flow_name::<F>()?;
|
||||
|
||||
// Compute all relevant conditions
|
||||
let ucs_client_available = state.grpc_client.unified_connector_service_client.is_some();
|
||||
let ucs_enabled = is_ucs_enabled(state, consts::UCS_ENABLED).await;
|
||||
|
||||
// Log if UCS client is not available
|
||||
if !ucs_client_available {
|
||||
router_env::logger::debug!(
|
||||
"UCS client not available - merchant_id={}, connector={}",
|
||||
merchant_id,
|
||||
connector_name
|
||||
);
|
||||
}
|
||||
|
||||
// Log if UCS is not enabled
|
||||
if !ucs_enabled {
|
||||
router_env::logger::debug!(
|
||||
"UCS not enabled in configuration - merchant_id={}, connector={}",
|
||||
merchant_id,
|
||||
connector_name
|
||||
);
|
||||
}
|
||||
|
||||
let ucs_config = state.conf.grpc_client.unified_connector_service.as_ref();
|
||||
|
||||
// Log if UCS configuration is missing
|
||||
if ucs_config.is_none() {
|
||||
router_env::logger::debug!(
|
||||
"UCS configuration not found - merchant_id={}, connector={}",
|
||||
merchant_id,
|
||||
connector_name
|
||||
);
|
||||
}
|
||||
|
||||
let ucs_only_connector =
|
||||
ucs_config.is_some_and(|config| config.ucs_only_connectors.contains(&connector_enum));
|
||||
|
||||
let previous_gateway = payment_data.and_then(extract_gateway_system_from_payment_intent);
|
||||
|
||||
// Log previous gateway state
|
||||
match previous_gateway {
|
||||
Some(gateway) => {
|
||||
router_env::logger::debug!(
|
||||
"Previous gateway system found: {:?} - merchant_id={}, connector={}",
|
||||
gateway,
|
||||
merchant_id,
|
||||
connector_name
|
||||
);
|
||||
}
|
||||
None => {
|
||||
router_env::logger::debug!(
|
||||
"No previous gateway system found (new payment) - merchant_id={}, connector={}",
|
||||
merchant_id,
|
||||
connector_name
|
||||
);
|
||||
}
|
||||
}
|
||||
// Check UCS availability using idiomatic helper
|
||||
let ucs_availability = check_ucs_availability(state).await;
|
||||
|
||||
// Build rollout keys
|
||||
let rollout_key = format!(
|
||||
"{}_{}_{}_{}_{}",
|
||||
consts::UCS_ROLLOUT_PERCENT_CONFIG_PREFIX,
|
||||
@ -163,179 +174,148 @@ where
|
||||
payment_method,
|
||||
flow_name
|
||||
);
|
||||
|
||||
// Determine connector integration type
|
||||
let connector_integration_type =
|
||||
determine_connector_integration_type(state, connector_enum, &rollout_key).await?;
|
||||
|
||||
// Extract previous gateway from payment data
|
||||
let previous_gateway = payment_data.and_then(extract_gateway_system_from_payment_intent);
|
||||
let shadow_rollout_key = format!("{}_shadow", rollout_key);
|
||||
|
||||
let rollout_enabled = should_execute_based_on_rollout(state, &rollout_key).await?;
|
||||
let shadow_rollout_enabled =
|
||||
should_execute_based_on_rollout(state, &shadow_rollout_key).await?;
|
||||
|
||||
router_env::logger::debug!(
|
||||
"Rollout status - rollout_enabled={}, shadow_rollout_enabled={}, rollout_key={}, merchant_id={}, connector={}",
|
||||
rollout_enabled,
|
||||
shadow_rollout_enabled,
|
||||
rollout_key,
|
||||
merchant_id,
|
||||
connector_name
|
||||
);
|
||||
let shadow_rollout_availability =
|
||||
if should_execute_based_on_rollout(state, &shadow_rollout_key).await? {
|
||||
ShadowRolloutAvailability::IsAvailable
|
||||
} else {
|
||||
ShadowRolloutAvailability::NotAvailable
|
||||
};
|
||||
|
||||
// Single decision point using pattern matching
|
||||
// Tuple structure: (ucs_infrastructure_ready, ucs_only_connector, previous_gateway, shadow_rollout_enabled, rollout_enabled)
|
||||
let decision = match (
|
||||
ucs_client_available && ucs_enabled,
|
||||
ucs_only_connector,
|
||||
let (gateway_system, execution_path) = if ucs_availability == UcsAvailability::Disabled {
|
||||
router_env::logger::debug!("UCS is disabled, using Direct gateway");
|
||||
(GatewaySystem::Direct, ExecutionPath::Direct)
|
||||
} else {
|
||||
// UCS is enabled, call decide function
|
||||
decide_execution_path(
|
||||
connector_integration_type,
|
||||
previous_gateway,
|
||||
shadow_rollout_enabled,
|
||||
rollout_enabled,
|
||||
) {
|
||||
// ==================== DIRECT GATEWAY DECISIONS ====================
|
||||
// All patterns that result in Direct routing
|
||||
|
||||
// UCS infrastructure not available (any configuration)
|
||||
// - Client not available OR not enabled
|
||||
// - Regardless of: ucs_only_connector, previous_gateway, rollouts
|
||||
(false, _, _, _, _) |
|
||||
|
||||
// UCS available but no rollouts active and no previous gateway
|
||||
// - Infrastructure ready
|
||||
// - Not a UCS-only connector
|
||||
// - New payment (no previous gateway)
|
||||
// - No shadow rollout
|
||||
// - No full rollout
|
||||
(true, false, None, false, false) |
|
||||
|
||||
// UCS available, continuing with Direct, no rollouts
|
||||
// - Infrastructure ready
|
||||
// - Not a UCS-only connector
|
||||
// - Previous gateway was Direct
|
||||
// - No shadow rollout
|
||||
// - No full rollout
|
||||
(true, false, Some(GatewaySystem::Direct), false, false) |
|
||||
|
||||
// UCS available, previous Shadow but rollout ended
|
||||
// - Infrastructure ready
|
||||
// - Not a UCS-only connector
|
||||
// - Previous gateway was Shadow
|
||||
// - No shadow rollout (ended)
|
||||
// - No full rollout
|
||||
(true, false, Some(GatewaySystem::ShadowUnifiedConnectorService), false, false) => {
|
||||
router_env::logger::debug!(
|
||||
"Routing to Direct: ucs_ready={}, ucs_only={}, previous={:?}, shadow_rollout={}, full_rollout={} - merchant_id={}, connector={}",
|
||||
ucs_client_available && ucs_enabled,
|
||||
ucs_only_connector,
|
||||
previous_gateway,
|
||||
shadow_rollout_enabled,
|
||||
rollout_enabled,
|
||||
merchant_id,
|
||||
connector_name
|
||||
);
|
||||
GatewaySystem::Direct
|
||||
}
|
||||
|
||||
// ==================== SHADOW UCS DECISIONS ====================
|
||||
// All patterns that result in Shadow UCS routing
|
||||
|
||||
// Shadow rollout for new payment (no previous gateway)
|
||||
// - Infrastructure ready
|
||||
// - Not a UCS-only connector
|
||||
// - No previous gateway
|
||||
// - Shadow rollout enabled
|
||||
// - Full rollout: any (false or true, shadow takes precedence)
|
||||
(true, false, None, true, _) |
|
||||
|
||||
// Shadow rollout with previous Direct gateway
|
||||
// - Infrastructure ready
|
||||
// - Not a UCS-only connector
|
||||
// - Previous gateway was Direct
|
||||
// - Shadow rollout enabled
|
||||
// - Full rollout: any (shadow takes precedence)
|
||||
(true, false, Some(GatewaySystem::Direct), true, _) |
|
||||
|
||||
// Shadow rollout with previous Shadow gateway (continuation)
|
||||
// - Infrastructure ready
|
||||
// - Not a UCS-only connector
|
||||
// - Previous gateway was Shadow
|
||||
// - Shadow rollout enabled
|
||||
// - Full rollout: any
|
||||
(true, false, Some(GatewaySystem::ShadowUnifiedConnectorService), true, _) => {
|
||||
router_env::logger::debug!(
|
||||
"Routing to ShadowUnifiedConnectorService: ucs_ready={}, ucs_only={}, previous={:?}, shadow_rollout={}, full_rollout={} - merchant_id={}, connector={}",
|
||||
ucs_client_available && ucs_enabled,
|
||||
ucs_only_connector,
|
||||
previous_gateway,
|
||||
shadow_rollout_enabled,
|
||||
rollout_enabled,
|
||||
merchant_id,
|
||||
connector_name
|
||||
);
|
||||
GatewaySystem::ShadowUnifiedConnectorService
|
||||
}
|
||||
|
||||
// ==================== UNIFIED CONNECTOR SERVICE DECISIONS ====================
|
||||
// All patterns that result in UCS routing
|
||||
|
||||
// UCS-only connector (mandatory UCS)
|
||||
// - Infrastructure ready
|
||||
// - UCS-only connector flag set
|
||||
// - Any previous gateway
|
||||
// - Any shadow rollout state
|
||||
// - Any full rollout state
|
||||
(true, true, _, _, _) |
|
||||
|
||||
// Sticky routing: Continue with previous UCS
|
||||
// - Infrastructure ready
|
||||
// - Not a UCS-only connector
|
||||
// - Previous gateway was UCS
|
||||
// - Any shadow rollout state (doesn't affect existing UCS)
|
||||
// - Any full rollout state
|
||||
(true, false, Some(GatewaySystem::UnifiedConnectorService), _, _) |
|
||||
|
||||
// Full rollout: New payment
|
||||
// - Infrastructure ready
|
||||
// - Not a UCS-only connector
|
||||
// - No previous gateway
|
||||
// - No shadow rollout (shadow would take precedence)
|
||||
// - Full rollout enabled
|
||||
(true, false, None, false, true) |
|
||||
|
||||
// Full rollout: Switch from Direct
|
||||
// - Infrastructure ready
|
||||
// - Not a UCS-only connector
|
||||
// - Previous gateway was Direct
|
||||
// - No shadow rollout (shadow would take precedence)
|
||||
// - Full rollout enabled
|
||||
(true, false, Some(GatewaySystem::Direct), false, true) |
|
||||
|
||||
// Full rollout: Promote from Shadow
|
||||
// - Infrastructure ready
|
||||
// - Not a UCS-only connector
|
||||
// - Previous gateway was Shadow
|
||||
// - Shadow rollout ended (now false)
|
||||
// - Full rollout enabled
|
||||
(true, false, Some(GatewaySystem::ShadowUnifiedConnectorService), false, true) => {
|
||||
router_env::logger::debug!(
|
||||
"Routing to UnifiedConnectorService: ucs_ready={}, ucs_only={}, previous={:?}, shadow_rollout={}, full_rollout={} - merchant_id={}, connector={}",
|
||||
ucs_client_available && ucs_enabled,
|
||||
ucs_only_connector,
|
||||
previous_gateway,
|
||||
shadow_rollout_enabled,
|
||||
rollout_enabled,
|
||||
merchant_id,
|
||||
connector_name
|
||||
);
|
||||
GatewaySystem::UnifiedConnectorService
|
||||
}
|
||||
shadow_rollout_availability,
|
||||
)?
|
||||
};
|
||||
|
||||
router_env::logger::info!(
|
||||
"Payment gateway system decision: {:?} - merchant_id={}, connector={}, payment_method={}, flow={}",
|
||||
decision,
|
||||
"Payment gateway decision: gateway={:?}, execution_path={:?} - merchant_id={}, connector={}, payment_method={}, flow={}",
|
||||
gateway_system,
|
||||
execution_path,
|
||||
merchant_id,
|
||||
connector_name,
|
||||
payment_method,
|
||||
flow_name
|
||||
);
|
||||
|
||||
Ok(decision)
|
||||
Ok(execution_path)
|
||||
}
|
||||
|
||||
fn decide_execution_path(
|
||||
connector_type: ConnectorIntegrationType,
|
||||
previous_gateway: Option<GatewaySystem>,
|
||||
shadow_rollout_enabled: ShadowRolloutAvailability,
|
||||
) -> RouterResult<(GatewaySystem, ExecutionPath)> {
|
||||
match (connector_type, previous_gateway, shadow_rollout_enabled) {
|
||||
// Case 1: DirectConnector with no previous gateway and no shadow rollout
|
||||
// This is a fresh payment request for a direct connector - use direct gateway
|
||||
(
|
||||
ConnectorIntegrationType::DirectConnector,
|
||||
None,
|
||||
ShadowRolloutAvailability::NotAvailable,
|
||||
) => Ok((GatewaySystem::Direct, ExecutionPath::Direct)),
|
||||
|
||||
// Case 2: DirectConnector previously used Direct gateway, no shadow rollout
|
||||
// Continue using the same direct gateway for consistency
|
||||
(
|
||||
ConnectorIntegrationType::DirectConnector,
|
||||
Some(GatewaySystem::Direct),
|
||||
ShadowRolloutAvailability::NotAvailable,
|
||||
) => Ok((GatewaySystem::Direct, ExecutionPath::Direct)),
|
||||
|
||||
// Case 3: DirectConnector previously used UCS, but now switching back to Direct (no shadow)
|
||||
// Migration scenario: UCS was used before, but now we're reverting to Direct
|
||||
(
|
||||
ConnectorIntegrationType::DirectConnector,
|
||||
Some(GatewaySystem::UnifiedConnectorService),
|
||||
ShadowRolloutAvailability::NotAvailable,
|
||||
) => Ok((GatewaySystem::Direct, ExecutionPath::Direct)),
|
||||
|
||||
// Case 4: UcsConnector configuration, but previously used Direct gateway (no shadow)
|
||||
// Maintain Direct for backward compatibility - don't switch mid-transaction
|
||||
(
|
||||
ConnectorIntegrationType::UcsConnector,
|
||||
Some(GatewaySystem::Direct),
|
||||
ShadowRolloutAvailability::NotAvailable,
|
||||
) => Ok((GatewaySystem::Direct, ExecutionPath::Direct)),
|
||||
|
||||
// Case 5: DirectConnector with no previous gateway, shadow rollout enabled
|
||||
// Use Direct as primary, but also execute UCS in shadow mode for comparison
|
||||
(
|
||||
ConnectorIntegrationType::DirectConnector,
|
||||
None,
|
||||
ShadowRolloutAvailability::IsAvailable,
|
||||
) => Ok((
|
||||
GatewaySystem::Direct,
|
||||
ExecutionPath::ShadowUnifiedConnectorService,
|
||||
)),
|
||||
|
||||
// Case 6: DirectConnector previously used Direct, shadow rollout enabled
|
||||
// Continue with Direct as primary, execute UCS in shadow mode for testing
|
||||
(
|
||||
ConnectorIntegrationType::DirectConnector,
|
||||
Some(GatewaySystem::Direct),
|
||||
ShadowRolloutAvailability::IsAvailable,
|
||||
) => Ok((
|
||||
GatewaySystem::Direct,
|
||||
ExecutionPath::ShadowUnifiedConnectorService,
|
||||
)),
|
||||
|
||||
// Case 7: DirectConnector previously used UCS, shadow rollout enabled
|
||||
// Revert to Direct as primary, but keep UCS in shadow mode for comparison
|
||||
(
|
||||
ConnectorIntegrationType::DirectConnector,
|
||||
Some(GatewaySystem::UnifiedConnectorService),
|
||||
ShadowRolloutAvailability::IsAvailable,
|
||||
) => Ok((
|
||||
GatewaySystem::Direct,
|
||||
ExecutionPath::ShadowUnifiedConnectorService,
|
||||
)),
|
||||
|
||||
// Case 8: UcsConnector configuration, previously used Direct, shadow rollout enabled
|
||||
// Maintain Direct as primary for transaction consistency, shadow UCS for testing
|
||||
(
|
||||
ConnectorIntegrationType::UcsConnector,
|
||||
Some(GatewaySystem::Direct),
|
||||
ShadowRolloutAvailability::IsAvailable,
|
||||
) => Ok((
|
||||
GatewaySystem::Direct,
|
||||
ExecutionPath::ShadowUnifiedConnectorService,
|
||||
)),
|
||||
|
||||
// Case 9: UcsConnector with no previous gateway (regardless of shadow rollout)
|
||||
// Fresh payment for a UCS-enabled connector - use UCS as primary
|
||||
(ConnectorIntegrationType::UcsConnector, None, _) => Ok((
|
||||
GatewaySystem::UnifiedConnectorService,
|
||||
ExecutionPath::UnifiedConnectorService,
|
||||
)),
|
||||
|
||||
// Case 10: UcsConnector previously used UCS (regardless of shadow rollout)
|
||||
// Continue using UCS for consistency in the payment flow
|
||||
(
|
||||
ConnectorIntegrationType::UcsConnector,
|
||||
Some(GatewaySystem::UnifiedConnectorService),
|
||||
_,
|
||||
) => Ok((
|
||||
GatewaySystem::UnifiedConnectorService,
|
||||
ExecutionPath::UnifiedConnectorService,
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Extracts the gateway system from the payment intent's feature metadata
|
||||
|
||||
Reference in New Issue
Block a user