mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-29 09:07:09 +08:00
feat(ucs): add gateway system {Direct | UnifiedConnectorSystem} in feature metadata for v1 (#8854)
This commit is contained in:
@ -34,7 +34,7 @@ use api_models::{
|
||||
mandates::RecurringDetails,
|
||||
payments::{self as payments_api},
|
||||
};
|
||||
pub use common_enums::enums::CallConnectorAction;
|
||||
pub use common_enums::enums::{CallConnectorAction, GatewaySystem};
|
||||
use common_types::payments as common_payments_types;
|
||||
use common_utils::{
|
||||
ext_traits::{AsyncExt, StringExt},
|
||||
@ -91,6 +91,8 @@ use self::{
|
||||
operations::{BoxedOperation, Operation, PaymentResponse},
|
||||
routing::{self as self_routing, SessionFlowRoutingInput},
|
||||
};
|
||||
#[cfg(feature = "v1")]
|
||||
use super::unified_connector_service::update_gateway_system_in_feature_metadata;
|
||||
use super::{
|
||||
errors::StorageErrorExt, payment_methods::surcharge_decision_configs, routing::TransactionData,
|
||||
unified_connector_service::should_call_unified_connector_service,
|
||||
@ -4043,7 +4045,22 @@ where
|
||||
services::api::ConnectorIntegration<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
{
|
||||
record_time_taken_with(|| async {
|
||||
if should_call_unified_connector_service(state, merchant_context, &router_data).await? {
|
||||
if should_call_unified_connector_service(
|
||||
state,
|
||||
merchant_context,
|
||||
&router_data,
|
||||
Some(payment_data),
|
||||
)
|
||||
.await?
|
||||
{
|
||||
router_env::logger::info!(
|
||||
"Processing payment through UCS gateway system - payment_id={}, attempt_id={}",
|
||||
payment_data
|
||||
.get_payment_intent()
|
||||
.payment_id
|
||||
.get_string_repr(),
|
||||
payment_data.get_payment_attempt().attempt_id
|
||||
);
|
||||
if should_add_task_to_process_tracker(payment_data) {
|
||||
operation
|
||||
.to_domain()?
|
||||
@ -4058,6 +4075,12 @@ where
|
||||
.ok();
|
||||
}
|
||||
|
||||
// Update feature metadata to track UCS usage for stickiness
|
||||
update_gateway_system_in_feature_metadata(
|
||||
payment_data,
|
||||
GatewaySystem::UnifiedConnectorService,
|
||||
)?;
|
||||
|
||||
(_, *payment_data) = operation
|
||||
.to_update_tracker()?
|
||||
.update_trackers(
|
||||
@ -4083,6 +4106,18 @@ where
|
||||
|
||||
Ok((router_data, merchant_connector_account))
|
||||
} else {
|
||||
router_env::logger::info!(
|
||||
"Processing payment through Direct gateway system - payment_id={}, attempt_id={}",
|
||||
payment_data
|
||||
.get_payment_intent()
|
||||
.payment_id
|
||||
.get_string_repr(),
|
||||
payment_data.get_payment_attempt().attempt_id
|
||||
);
|
||||
|
||||
// Update feature metadata to track Direct routing usage for stickiness
|
||||
update_gateway_system_in_feature_metadata(payment_data, GatewaySystem::Direct)?;
|
||||
|
||||
call_connector_service(
|
||||
state,
|
||||
req_state,
|
||||
@ -4440,8 +4475,13 @@ where
|
||||
.await?;
|
||||
|
||||
// do order creation
|
||||
let should_call_unified_connector_service =
|
||||
should_call_unified_connector_service(state, merchant_context, &router_data).await?;
|
||||
let should_call_unified_connector_service = should_call_unified_connector_service(
|
||||
state,
|
||||
merchant_context,
|
||||
&router_data,
|
||||
Some(payment_data),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let (connector_request, should_continue_further) = if !should_call_unified_connector_service {
|
||||
let mut should_continue_further = true;
|
||||
@ -4501,6 +4541,12 @@ where
|
||||
|
||||
record_time_taken_with(|| async {
|
||||
if should_call_unified_connector_service {
|
||||
router_env::logger::info!(
|
||||
"Processing payment through UCS gateway system- payment_id={}, attempt_id={}",
|
||||
payment_data.get_payment_intent().id.get_string_repr(),
|
||||
payment_data.get_payment_attempt().id.get_string_repr()
|
||||
);
|
||||
|
||||
router_data
|
||||
.call_unified_connector_service(
|
||||
state,
|
||||
@ -4556,7 +4602,19 @@ where
|
||||
services::api::ConnectorIntegration<F, RouterDReq, router_types::PaymentsResponseData>,
|
||||
{
|
||||
record_time_taken_with(|| async {
|
||||
if should_call_unified_connector_service(state, merchant_context, &router_data).await? {
|
||||
if should_call_unified_connector_service(
|
||||
state,
|
||||
merchant_context,
|
||||
&router_data,
|
||||
Some(payment_data),
|
||||
)
|
||||
.await?
|
||||
{
|
||||
router_env::logger::info!(
|
||||
"Executing payment through UCS gateway system - payment_id={}, attempt_id={}",
|
||||
payment_data.get_payment_intent().id.get_string_repr(),
|
||||
payment_data.get_payment_attempt().id.get_string_repr()
|
||||
);
|
||||
if should_add_task_to_process_tracker(payment_data) {
|
||||
operation
|
||||
.to_domain()?
|
||||
@ -4596,6 +4654,12 @@ where
|
||||
|
||||
Ok(router_data)
|
||||
} else {
|
||||
router_env::logger::info!(
|
||||
"Processing payment through Direct gateway system - payment_id={}, attempt_id={}",
|
||||
payment_data.get_payment_intent().id.get_string_repr(),
|
||||
payment_data.get_payment_attempt().id.get_string_repr()
|
||||
);
|
||||
|
||||
call_connector_service(
|
||||
state,
|
||||
req_state,
|
||||
|
||||
@ -256,6 +256,11 @@ impl<F: Clone + Sync> UpdateTracker<F, PaymentData<F>, api::PaymentsCancelReques
|
||||
status: enums::IntentStatus::Cancelled,
|
||||
updated_by: storage_scheme.to_string(),
|
||||
incremental_authorization_allowed: None,
|
||||
feature_metadata: payment_data
|
||||
.payment_intent
|
||||
.feature_metadata
|
||||
.clone()
|
||||
.map(masking::Secret::new),
|
||||
};
|
||||
(Some(payment_intent_update), enums::AttemptStatus::Voided)
|
||||
} else {
|
||||
|
||||
@ -2056,6 +2056,11 @@ impl<F: Clone + Sync> UpdateTracker<F, PaymentData<F>, api::PaymentsRequest> for
|
||||
.is_iframe_redirection_enabled,
|
||||
is_confirm_operation: true, // Indicates that this is a confirm operation
|
||||
payment_channel: payment_data.payment_intent.payment_channel,
|
||||
feature_metadata: payment_data
|
||||
.payment_intent
|
||||
.feature_metadata
|
||||
.clone()
|
||||
.map(masking::Secret::new),
|
||||
tax_status: payment_data.payment_intent.tax_status,
|
||||
discount_amount: payment_data.payment_intent.discount_amount,
|
||||
order_date: payment_data.payment_intent.order_date,
|
||||
|
||||
@ -2113,6 +2113,11 @@ async fn payment_response_update_tracker<F: Clone, T: types::Capturable>(
|
||||
updated_by: storage_scheme.to_string(),
|
||||
// make this false only if initial payment fails, if incremental authorization call fails don't make it false
|
||||
incremental_authorization_allowed: Some(false),
|
||||
feature_metadata: payment_data
|
||||
.payment_intent
|
||||
.feature_metadata
|
||||
.clone()
|
||||
.map(masking::Secret::new),
|
||||
},
|
||||
Ok(_) => storage::PaymentIntentUpdate::ResponseUpdate {
|
||||
status: api_models::enums::IntentStatus::foreign_from(
|
||||
@ -2124,6 +2129,11 @@ async fn payment_response_update_tracker<F: Clone, T: types::Capturable>(
|
||||
incremental_authorization_allowed: payment_data
|
||||
.payment_intent
|
||||
.incremental_authorization_allowed,
|
||||
feature_metadata: payment_data
|
||||
.payment_intent
|
||||
.feature_metadata
|
||||
.clone()
|
||||
.map(masking::Secret::new),
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
@ -953,6 +953,11 @@ impl<F: Clone + Sync> UpdateTracker<F, PaymentData<F>, api::PaymentsRequest> for
|
||||
.is_iframe_redirection_enabled,
|
||||
is_confirm_operation: false, // this is not a confirm operation
|
||||
payment_channel: payment_data.payment_intent.payment_channel,
|
||||
feature_metadata: payment_data
|
||||
.payment_intent
|
||||
.feature_metadata
|
||||
.clone()
|
||||
.map(masking::Secret::new),
|
||||
tax_status: payment_data.payment_intent.tax_status,
|
||||
discount_amount: payment_data.payment_intent.discount_amount,
|
||||
order_date: payment_data.payment_intent.order_date,
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
use api_models::admin;
|
||||
use common_enums::{AttemptStatus, PaymentMethodType};
|
||||
use common_enums::{AttemptStatus, GatewaySystem, PaymentMethodType};
|
||||
use common_utils::{errors::CustomResult, ext_traits::ValueExt};
|
||||
use diesel_models::types::FeatureMetadata;
|
||||
use error_stack::ResultExt;
|
||||
use external_services::grpc_client::unified_connector_service::{
|
||||
ConnectorAuthMetadata, UnifiedConnectorServiceError,
|
||||
@ -23,9 +24,12 @@ use unified_connector_service_client::payments::{
|
||||
use crate::{
|
||||
consts,
|
||||
core::{
|
||||
errors::{ApiErrorResponse, RouterResult},
|
||||
payments::helpers::{
|
||||
is_ucs_enabled, should_execute_based_on_rollout, MerchantConnectorAccountType,
|
||||
errors::{self, RouterResult},
|
||||
payments::{
|
||||
helpers::{
|
||||
is_ucs_enabled, should_execute_based_on_rollout, MerchantConnectorAccountType,
|
||||
},
|
||||
OperationSessionGetters, OperationSessionSetters,
|
||||
},
|
||||
utils::get_flow_name,
|
||||
},
|
||||
@ -39,21 +43,62 @@ pub mod transformers;
|
||||
// Re-export webhook transformer types for easier access
|
||||
pub use transformers::WebhookTransformData;
|
||||
|
||||
pub async fn should_call_unified_connector_service<F: Clone, T>(
|
||||
/// Generic version of should_call_unified_connector_service that works with any type
|
||||
/// implementing OperationSessionGetters trait
|
||||
pub async fn should_call_unified_connector_service<F: Clone, T, D>(
|
||||
state: &SessionState,
|
||||
merchant_context: &MerchantContext,
|
||||
router_data: &RouterData<F, T, PaymentsResponseData>,
|
||||
) -> RouterResult<bool> {
|
||||
payment_data: Option<&D>,
|
||||
) -> RouterResult<bool>
|
||||
where
|
||||
D: OperationSessionGetters<F>,
|
||||
{
|
||||
// Check basic UCS availability first
|
||||
if state.grpc_client.unified_connector_service_client.is_none() {
|
||||
router_env::logger::debug!(
|
||||
"Unified Connector Service client is not available, skipping UCS decision"
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let ucs_config_key = consts::UCS_ENABLED;
|
||||
|
||||
if !is_ucs_enabled(state, ucs_config_key).await {
|
||||
router_env::logger::debug!(
|
||||
"Unified Connector Service is not enabled, skipping UCS decision"
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// Apply stickiness logic if payment_data is available
|
||||
if let Some(payment_data) = payment_data {
|
||||
let previous_gateway_system = extract_gateway_system_from_payment_intent(payment_data);
|
||||
|
||||
match previous_gateway_system {
|
||||
Some(GatewaySystem::UnifiedConnectorService) => {
|
||||
// Payment intent previously used UCS, maintain stickiness to UCS
|
||||
router_env::logger::info!(
|
||||
"Payment gateway system decision: UCS (sticky) - payment intent previously used UCS"
|
||||
);
|
||||
return Ok(true);
|
||||
}
|
||||
Some(GatewaySystem::Direct) => {
|
||||
// Payment intent previously used Direct, maintain stickiness to Direct (return false for UCS)
|
||||
router_env::logger::info!(
|
||||
"Payment gateway system decision: Direct (sticky) - payment intent previously used Direct"
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
None => {
|
||||
// No previous gateway system set, continue with normal gateway system logic
|
||||
router_env::logger::debug!(
|
||||
"UCS stickiness: No previous gateway system set, applying normal gateway system logic"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Continue with normal UCS gateway system logic
|
||||
let merchant_id = merchant_context
|
||||
.get_merchant_account()
|
||||
.get_id()
|
||||
@ -71,8 +116,13 @@ pub async fn should_call_unified_connector_service<F: Clone, T>(
|
||||
.is_some_and(|config| config.ucs_only_connectors.contains(&connector_name));
|
||||
|
||||
if is_ucs_only_connector {
|
||||
router_env::logger::info!(
|
||||
"Payment gateway system decision: UCS (forced) - merchant_id={}, connector={}, payment_method={}, flow={}",
|
||||
merchant_id, connector_name, payment_method, flow_name
|
||||
);
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
let config_key = format!(
|
||||
"{}_{}_{}_{}_{}",
|
||||
consts::UCS_ROLLOUT_PERCENT_CONFIG_PREFIX,
|
||||
@ -83,9 +133,87 @@ pub async fn should_call_unified_connector_service<F: Clone, T>(
|
||||
);
|
||||
|
||||
let should_execute = should_execute_based_on_rollout(state, &config_key).await?;
|
||||
|
||||
// Log gateway system decision
|
||||
if should_execute {
|
||||
router_env::logger::info!(
|
||||
"Payment gateway system decision: UCS - merchant_id={}, connector={}, payment_method={}, flow={}",
|
||||
merchant_id, connector_name, payment_method, flow_name
|
||||
);
|
||||
} else {
|
||||
router_env::logger::info!(
|
||||
"Payment gateway system decision: Direct - merchant_id={}, connector={}, payment_method={}, flow={}",
|
||||
merchant_id, connector_name, payment_method, flow_name
|
||||
);
|
||||
}
|
||||
|
||||
Ok(should_execute)
|
||||
}
|
||||
|
||||
/// Extracts the gateway system from the payment intent's feature metadata
|
||||
/// Returns None if metadata is missing, corrupted, or doesn't contain gateway_system
|
||||
fn extract_gateway_system_from_payment_intent<F: Clone, D>(
|
||||
payment_data: &D,
|
||||
) -> Option<GatewaySystem>
|
||||
where
|
||||
D: OperationSessionGetters<F>,
|
||||
{
|
||||
#[cfg(feature = "v1")]
|
||||
{
|
||||
payment_data
|
||||
.get_payment_intent()
|
||||
.feature_metadata
|
||||
.as_ref()
|
||||
.and_then(|metadata| {
|
||||
// Try to parse the JSON value as FeatureMetadata
|
||||
// Log errors but don't fail the flow for corrupted metadata
|
||||
match serde_json::from_value::<FeatureMetadata>(metadata.clone()) {
|
||||
Ok(feature_metadata) => feature_metadata.gateway_system,
|
||||
Err(err) => {
|
||||
router_env::logger::warn!(
|
||||
"Failed to parse feature_metadata for gateway_system extraction: {}",
|
||||
err
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "v2")]
|
||||
{
|
||||
None // V2 does not use feature metadata for gateway system tracking
|
||||
}
|
||||
}
|
||||
|
||||
/// Updates the payment intent's feature metadata to track the gateway system being used
|
||||
#[cfg(feature = "v1")]
|
||||
pub fn update_gateway_system_in_feature_metadata<F: Clone, D>(
|
||||
payment_data: &mut D,
|
||||
gateway_system: GatewaySystem,
|
||||
) -> RouterResult<()>
|
||||
where
|
||||
D: OperationSessionGetters<F> + OperationSessionSetters<F>,
|
||||
{
|
||||
let mut payment_intent = payment_data.get_payment_intent().clone();
|
||||
|
||||
let existing_metadata = payment_intent.feature_metadata.as_ref();
|
||||
|
||||
let mut feature_metadata = existing_metadata
|
||||
.and_then(|metadata| serde_json::from_value::<FeatureMetadata>(metadata.clone()).ok())
|
||||
.unwrap_or_default();
|
||||
|
||||
feature_metadata.gateway_system = Some(gateway_system);
|
||||
|
||||
let updated_metadata = serde_json::to_value(feature_metadata)
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable("Failed to serialize feature metadata")?;
|
||||
|
||||
payment_intent.feature_metadata = Some(updated_metadata.clone());
|
||||
payment_data.set_payment_intent(payment_intent);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn should_call_unified_connector_service_for_webhooks(
|
||||
state: &SessionState,
|
||||
merchant_context: &MerchantContext,
|
||||
@ -422,7 +550,7 @@ pub async fn call_unified_connector_service_for_webhook(
|
||||
.unified_connector_service_client
|
||||
.as_ref()
|
||||
.ok_or_else(|| {
|
||||
error_stack::report!(ApiErrorResponse::WebhookProcessingFailure)
|
||||
error_stack::report!(errors::ApiErrorResponse::WebhookProcessingFailure)
|
||||
.attach_printable("UCS client is not available for webhook processing")
|
||||
})?;
|
||||
|
||||
@ -472,10 +600,10 @@ pub async fn call_unified_connector_service_for_webhook(
|
||||
build_unified_connector_service_auth_metadata(mca_type, merchant_context)
|
||||
})
|
||||
.transpose()
|
||||
.change_context(ApiErrorResponse::InternalServerError)
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable("Failed to build UCS auth metadata")?
|
||||
.ok_or_else(|| {
|
||||
error_stack::report!(ApiErrorResponse::InternalServerError).attach_printable(
|
||||
error_stack::report!(errors::ApiErrorResponse::InternalServerError).attach_printable(
|
||||
"Missing merchant connector account for UCS webhook transformation",
|
||||
)
|
||||
})?;
|
||||
@ -504,7 +632,7 @@ pub async fn call_unified_connector_service_for_webhook(
|
||||
}
|
||||
Err(err) => {
|
||||
// When UCS is configured, we don't fall back to direct connector processing
|
||||
Err(ApiErrorResponse::WebhookProcessingFailure)
|
||||
Err(errors::ApiErrorResponse::WebhookProcessingFailure)
|
||||
.attach_printable(format!("UCS webhook processing failed: {err}"))
|
||||
}
|
||||
}
|
||||
|
||||
@ -139,7 +139,7 @@ impl ProcessTrackerWorkflow<SessionState> for PaymentsSyncWorkflow {
|
||||
.as_ref()
|
||||
.is_none()
|
||||
{
|
||||
let payment_intent_update = hyperswitch_domain_models::payments::payment_intent::PaymentIntentUpdate::PGStatusUpdate { status: api_models::enums::IntentStatus::Failed,updated_by: merchant_account.storage_scheme.to_string(), incremental_authorization_allowed: Some(false) };
|
||||
let payment_intent_update = hyperswitch_domain_models::payments::payment_intent::PaymentIntentUpdate::PGStatusUpdate { status: api_models::enums::IntentStatus::Failed,updated_by: merchant_account.storage_scheme.to_string(), incremental_authorization_allowed: Some(false), feature_metadata: payment_data.payment_intent.feature_metadata.clone().map(masking::Secret::new), };
|
||||
let payment_attempt_update =
|
||||
hyperswitch_domain_models::payments::payment_attempt::PaymentAttemptUpdate::ErrorUpdate {
|
||||
connector: None,
|
||||
|
||||
Reference in New Issue
Block a user