From bab64eefa7ec12b7cf971b813c5ec1a524ef5c70 Mon Sep 17 00:00:00 2001 From: Chethan Rao <70657455+Chethan-rao@users.noreply.github.com> Date: Mon, 12 May 2025 19:16:08 +0530 Subject: [PATCH] refactor(open_router): call elimination routing of open router if enabled instead of dynamo (#7961) --- crates/api_models/src/routing.rs | 11 + crates/router/src/core/errors.rs | 4 +- crates/router/src/core/payments.rs | 4 +- .../payments/operations/payment_response.rs | 68 +++--- crates/router/src/core/payments/routing.rs | 221 +++++++++--------- crates/router/src/core/routing/helpers.rs | 68 ++++-- 6 files changed, 209 insertions(+), 167 deletions(-) diff --git a/crates/api_models/src/routing.rs b/crates/api_models/src/routing.rs index a683e3343a..fddeda6088 100644 --- a/crates/api_models/src/routing.rs +++ b/crates/api_models/src/routing.rs @@ -643,6 +643,17 @@ impl DynamicRoutingAlgorithmRef { self.dynamic_routing_volume_split = volume } + pub fn is_success_rate_routing_enabled(&self) -> bool { + self.success_based_algorithm + .as_ref() + .map(|success_based_routing| { + success_based_routing.enabled_feature + == DynamicRoutingFeatures::DynamicConnectorSelection + || success_based_routing.enabled_feature == DynamicRoutingFeatures::Metrics + }) + .unwrap_or_default() + } + pub fn is_elimination_enabled(&self) -> bool { self.elimination_routing_algorithm .as_ref() diff --git a/crates/router/src/core/errors.rs b/crates/router/src/core/errors.rs index 7f3fd98d77..213cbfa66d 100644 --- a/crates/router/src/core/errors.rs +++ b/crates/router/src/core/errors.rs @@ -419,8 +419,8 @@ pub enum RoutingError { ContractRoutingClientInitializationError, #[error("Invalid contract based connector label received from dynamic routing service: '{0}'")] InvalidContractBasedConnectorLabel(String), - #[error("Failed to perform {algo} in open_router")] - OpenRouterCallFailed { algo: String }, + #[error("Failed to perform routing in open_router")] + OpenRouterCallFailed, #[error("Error from open_router: {0}")] OpenRouterError(String), } diff --git a/crates/router/src/core/payments.rs b/crates/router/src/core/payments.rs index 1e4ad6da03..e8d8387d65 100644 --- a/crates/router/src/core/payments.rs +++ b/crates/router/src/core/payments.rs @@ -7243,7 +7243,7 @@ where if routing_choice.routing_type.is_dynamic_routing() { if state.conf.open_router.enabled { - routing::perform_open_routing( + routing::perform_dynamic_routing_with_open_router( state, connectors.clone(), business_profile, @@ -7285,7 +7285,7 @@ where .map(|card_isin| card_isin.to_string()), ); - routing::perform_dynamic_routing( + routing::perform_dynamic_routing_with_intelligent_router( state, connectors.clone(), business_profile, diff --git a/crates/router/src/core/payments/operations/payment_response.rs b/crates/router/src/core/payments/operations/payment_response.rs index 90b9889fff..0b182d249b 100644 --- a/crates/router/src/core/payments/operations/payment_response.rs +++ b/crates/router/src/core/payments/operations/payment_response.rs @@ -2148,36 +2148,49 @@ async fn payment_response_update_tracker( ); tokio::spawn( async move { - routing_helpers::push_metrics_with_update_window_for_success_based_routing( - &state, - &payment_attempt, - routable_connectors.clone(), - &profile_id, - dynamic_routing_algo_ref.clone(), - dynamic_routing_config_params_interpolator.clone(), - ) - .await - .map_err(|e| logger::error!(success_based_routing_metrics_error=?e)) - .ok(); + let should_route_to_open_router = state.conf.open_router.enabled; - if let Some(gsm_error_category) = gsm_error_category { - if gsm_error_category.should_perform_elimination_routing() { - logger::info!("Performing update window for elimination routing"); - routing_helpers::update_window_for_elimination_routing( - &state, - &payment_attempt, - &profile_id, - dynamic_routing_algo_ref.clone(), - dynamic_routing_config_params_interpolator.clone(), - gsm_error_category, - ) - .await - .map_err(|e| logger::error!(dynamic_routing_metrics_error=?e)) - .ok(); + if should_route_to_open_router { + routing_helpers::update_gateway_score_helper_with_open_router( + &state, + &payment_attempt, + &profile_id, + dynamic_routing_algo_ref.clone(), + ) + .await + .map_err(|e| logger::error!(open_router_update_gateway_score_err=?e)) + .ok(); + } else { + routing_helpers::push_metrics_with_update_window_for_success_based_routing( + &state, + &payment_attempt, + routable_connectors.clone(), + &profile_id, + dynamic_routing_algo_ref.clone(), + dynamic_routing_config_params_interpolator.clone(), + ) + .await + .map_err(|e| logger::error!(success_based_routing_metrics_error=?e)) + .ok(); + + if let Some(gsm_error_category) = gsm_error_category { + if gsm_error_category.should_perform_elimination_routing() { + logger::info!("Performing update window for elimination routing"); + routing_helpers::update_window_for_elimination_routing( + &state, + &payment_attempt, + &profile_id, + dynamic_routing_algo_ref.clone(), + dynamic_routing_config_params_interpolator.clone(), + gsm_error_category, + ) + .await + .map_err(|e| logger::error!(dynamic_routing_metrics_error=?e)) + .ok(); + }; }; - }; - routing_helpers::push_metrics_with_update_window_for_contract_based_routing( + routing_helpers::push_metrics_with_update_window_for_contract_based_routing( &state, &payment_attempt, routable_connectors, @@ -2188,6 +2201,7 @@ async fn payment_response_update_tracker( .await .map_err(|e| logger::error!(contract_based_routing_metrics_error=?e)) .ok(); + } } .in_current_span(), ); diff --git a/crates/router/src/core/payments/routing.rs b/crates/router/src/core/payments/routing.rs index 6bc5520361..0b5b7003c0 100644 --- a/crates/router/src/core/payments/routing.rs +++ b/crates/router/src/core/payments/routing.rs @@ -1491,7 +1491,7 @@ pub fn make_dsl_input_for_surcharge( } #[cfg(all(feature = "v1", feature = "dynamic_routing"))] -pub async fn perform_open_routing( +pub async fn perform_dynamic_routing_with_open_router( state: &SessionState, routable_connectors: Vec, profile: &domain::Profile, @@ -1516,48 +1516,50 @@ pub async fn perform_open_routing( profile.get_id().get_string_repr() ); + let is_success_rate_routing_enabled = + dynamic_routing_algo_ref.is_success_rate_routing_enabled(); let is_elimination_enabled = dynamic_routing_algo_ref.is_elimination_enabled(); - let connectors = dynamic_routing_algo_ref - .success_based_algorithm - .async_map(|algo| { - perform_success_based_routing_with_open_router( - state, - routable_connectors.clone(), - profile.get_id(), - algo, - &payment_data, - is_elimination_enabled, - ) - }) - .await - .transpose()? - .unwrap_or(routable_connectors); - if is_elimination_enabled { - // This will initiate the elimination process for the connector. - // Penalize the elimination score of the connector before making a payment. - // Once the payment is made, we will update the score based on the payment status - if let Some(connector) = connectors.first() { - logger::debug!( - "penalizing the elimination score of the gateway with id {} in open router for profile {}", + // Since success_based and elimination routing is being done in 1 api call, we call decide_gateway when either of it enabled + let connectors = if is_success_rate_routing_enabled || is_elimination_enabled { + let connectors = perform_decide_gateway_call_with_open_router( + state, + routable_connectors.clone(), + profile.get_id(), + &payment_data, + is_elimination_enabled, + ) + .await?; + + if is_elimination_enabled { + // This will initiate the elimination process for the connector. + // Penalize the elimination score of the connector before making a payment. + // Once the payment is made, we will update the score based on the payment status + if let Some(connector) = connectors.first() { + logger::debug!( + "penalizing the elimination score of the gateway with id {} in open_router for profile {}", connector, profile.get_id().get_string_repr() ); - update_success_rate_score_with_open_router( - state, - connector.clone(), - profile.get_id(), - &payment_data.payment_id, - common_enums::AttemptStatus::AuthenticationPending, - ) - .await? + update_gateway_score_with_open_router( + state, + connector.clone(), + profile.get_id(), + &payment_data.payment_id, + common_enums::AttemptStatus::AuthenticationPending, + ) + .await? + } } - } + connectors + } else { + routable_connectors + }; Ok(connectors) } #[cfg(all(feature = "v1", feature = "dynamic_routing"))] -pub async fn perform_dynamic_routing( +pub async fn perform_dynamic_routing_with_intelligent_router( state: &SessionState, routable_connectors: Vec, profile: &domain::Profile, @@ -1648,99 +1650,90 @@ pub async fn perform_dynamic_routing( #[cfg(all(feature = "v1", feature = "dynamic_routing"))] #[instrument(skip_all)] -pub async fn perform_success_based_routing_with_open_router( +pub async fn perform_decide_gateway_call_with_open_router( state: &SessionState, mut routable_connectors: Vec, profile_id: &common_utils::id_type::ProfileId, - success_based_algo_ref: api_routing::SuccessBasedAlgorithm, payment_attempt: &oss_storage::PaymentAttempt, is_elimination_enabled: bool, ) -> RoutingResult> { - if success_based_algo_ref.enabled_feature - == api_routing::DynamicRoutingFeatures::DynamicConnectorSelection - { - logger::debug!( - "performing success_based_routing with open_router for profile {}", - profile_id.get_string_repr() - ); + logger::debug!( + "performing decide_gateway call with open_router for profile {}", + profile_id.get_string_repr() + ); - let open_router_req_body = OpenRouterDecideGatewayRequest::construct_sr_request( - payment_attempt, - routable_connectors.clone(), - Some(or_types::RankingAlgorithm::SrBasedRouting), - is_elimination_enabled, - ); + let open_router_req_body = OpenRouterDecideGatewayRequest::construct_sr_request( + payment_attempt, + routable_connectors.clone(), + Some(or_types::RankingAlgorithm::SrBasedRouting), + is_elimination_enabled, + ); - let url = format!("{}/{}", &state.conf.open_router.url, "decide-gateway"); - let mut request = request::Request::new(services::Method::Post, &url); - request.add_header(headers::CONTENT_TYPE, "application/json".into()); - request.add_header( - headers::X_TENANT_ID, - state.tenant.tenant_id.get_string_repr().to_owned().into(), - ); - request.set_body(request::RequestContent::Json(Box::new( - open_router_req_body, - ))); + let url = format!("{}/{}", &state.conf.open_router.url, "decide-gateway"); + let mut request = request::Request::new(services::Method::Post, &url); + request.add_header(headers::CONTENT_TYPE, "application/json".into()); + request.add_header( + headers::X_TENANT_ID, + state.tenant.tenant_id.get_string_repr().to_owned().into(), + ); + request.set_body(request::RequestContent::Json(Box::new( + open_router_req_body, + ))); - let response = services::call_connector_api(state, request, "open_router_sr_call") - .await - .change_context(errors::RoutingError::OpenRouterCallFailed { - algo: "success_rate".into(), - })?; + let response = services::call_connector_api(state, request, "open_router_decide_gateway_call") + .await + .change_context(errors::RoutingError::OpenRouterCallFailed)?; - let sr_sorted_connectors = match response { - Ok(resp) => { - let decided_gateway: DecidedGateway = resp - .response - .parse_struct("DecidedGateway") - .change_context(errors::RoutingError::OpenRouterError( - "Failed to parse the response from open_router".into(), - ))?; + let sr_sorted_connectors = match response { + Ok(resp) => { + let decided_gateway: DecidedGateway = resp + .response + .parse_struct("DecidedGateway") + .change_context(errors::RoutingError::OpenRouterError( + "Failed to parse the response from open_router".into(), + ))?; - if let Some(gateway_priority_map) = decided_gateway.gateway_priority_map { - logger::debug!( - "Open router gateway_priority_map response: {:?}", - gateway_priority_map - ); - routable_connectors.sort_by(|connector_choice_a, connector_choice_b| { - let connector_choice_a_score = gateway_priority_map - .get(&connector_choice_a.connector.to_string()) - .copied() - .unwrap_or(0.0); - let connector_choice_b_score = gateway_priority_map - .get(&connector_choice_b.connector.to_string()) - .copied() - .unwrap_or(0.0); - connector_choice_b_score - .partial_cmp(&connector_choice_a_score) - .unwrap_or(std::cmp::Ordering::Equal) - }); - } - Ok(routable_connectors) + if let Some(gateway_priority_map) = decided_gateway.gateway_priority_map { + logger::debug!( + "open_router decide_gateway call response: {:?}", + gateway_priority_map + ); + routable_connectors.sort_by(|connector_choice_a, connector_choice_b| { + let connector_choice_a_score = gateway_priority_map + .get(&connector_choice_a.connector.to_string()) + .copied() + .unwrap_or(0.0); + let connector_choice_b_score = gateway_priority_map + .get(&connector_choice_b.connector.to_string()) + .copied() + .unwrap_or(0.0); + connector_choice_b_score + .partial_cmp(&connector_choice_a_score) + .unwrap_or(std::cmp::Ordering::Equal) + }); } - Err(err) => { - let err_resp: or_types::ErrorResponse = err - .response - .parse_struct("ErrorResponse") - .change_context(errors::RoutingError::OpenRouterError( - "Failed to parse the response from open_router".into(), - ))?; - logger::error!("open_router_error_response: {:?}", err_resp); - Err(errors::RoutingError::OpenRouterError( - "Failed to perform success based routing in open router".into(), - )) - } - }?; + Ok(routable_connectors) + } + Err(err) => { + let err_resp: or_types::ErrorResponse = err + .response + .parse_struct("ErrorResponse") + .change_context(errors::RoutingError::OpenRouterError( + "Failed to parse the response from open_router".into(), + ))?; + logger::error!("open_router_error_response: {:?}", err_resp); + Err(errors::RoutingError::OpenRouterError( + "Failed to perform decide_gateway call in open_router".into(), + )) + } + }?; - Ok(sr_sorted_connectors) - } else { - Ok(routable_connectors) - } + Ok(sr_sorted_connectors) } #[cfg(all(feature = "v1", feature = "dynamic_routing"))] #[instrument(skip_all)] -pub async fn update_success_rate_score_with_open_router( +pub async fn update_gateway_score_with_open_router( state: &SessionState, payment_connector: api_routing::RoutableConnectorChoice, profile_id: &common_utils::id_type::ProfileId, @@ -1768,9 +1761,7 @@ pub async fn update_success_rate_score_with_open_router( let response = services::call_connector_api(state, request, "open_router_update_gateway_score_call") .await - .change_context(errors::RoutingError::OpenRouterCallFailed { - algo: "success_rate".into(), - })?; + .change_context(errors::RoutingError::OpenRouterCallFailed)?; match response { Ok(resp) => { @@ -1781,7 +1772,7 @@ pub async fn update_success_rate_score_with_open_router( )?; logger::debug!( - "Open router update_gateway_score response for gateway with id {}: {:?}", + "open_router update_gateway_score response for gateway with id {}: {:?}", payment_connector, update_score_resp ); @@ -1795,9 +1786,9 @@ pub async fn update_success_rate_score_with_open_router( .change_context(errors::RoutingError::OpenRouterError( "Failed to parse the response from open_router".into(), ))?; - logger::error!("open_router_error_response: {:?}", err_resp); + logger::error!("open_router_update_gateway_score_error: {:?}", err_resp); Err(errors::RoutingError::OpenRouterError( - "Failed to update gateway score for success based routing in open router".into(), + "Failed to update gateway score in open_router".into(), )) } }?; diff --git a/crates/router/src/core/routing/helpers.rs b/crates/router/src/core/routing/helpers.rs index cf14c70ca3..dc594180e7 100644 --- a/crates/router/src/core/routing/helpers.rs +++ b/crates/router/src/core/routing/helpers.rs @@ -705,6 +705,53 @@ where } } +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +#[instrument(skip_all)] +pub async fn update_gateway_score_helper_with_open_router( + state: &SessionState, + payment_attempt: &storage::PaymentAttempt, + profile_id: &id_type::ProfileId, + dynamic_routing_algo_ref: routing_types::DynamicRoutingAlgorithmRef, +) -> RouterResult<()> { + let is_success_rate_routing_enabled = + dynamic_routing_algo_ref.is_success_rate_routing_enabled(); + let is_elimination_enabled = dynamic_routing_algo_ref.is_elimination_enabled(); + + if is_success_rate_routing_enabled || is_elimination_enabled { + let payment_connector = &payment_attempt.connector.clone().ok_or( + errors::ApiErrorResponse::GenericNotFoundError { + message: "unable to derive payment connector from payment attempt".to_string(), + }, + )?; + + let routable_connector = routing_types::RoutableConnectorChoice { + choice_kind: api_models::routing::RoutableChoiceKind::FullStruct, + connector: common_enums::RoutableConnectors::from_str(payment_connector.as_str()) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("unable to infer routable_connector from connector")?, + merchant_connector_id: payment_attempt.merchant_connector_id.clone(), + }; + + logger::debug!( + "performing update-gateway-score for gateway with id {} in open_router for profile: {}", + routable_connector, + profile_id.get_string_repr() + ); + routing::payments_routing::update_gateway_score_with_open_router( + state, + routable_connector.clone(), + profile_id, + &payment_attempt.payment_id, + payment_attempt.status, + ) + .await + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("failed to update gateway score in open_router service")?; + } + + Ok(()) +} + /// metrics for success based dynamic routing #[cfg(all(feature = "v1", feature = "dynamic_routing"))] #[instrument(skip_all)] @@ -741,27 +788,6 @@ pub async fn push_metrics_with_update_window_for_success_based_routing( merchant_connector_id: payment_attempt.merchant_connector_id.clone(), }; - let should_route_to_open_router = state.conf.open_router.enabled; - - if should_route_to_open_router { - logger::debug!( - "performing update-gateway-score for gateway with id {} in open router for profile: {}", - routable_connector, profile_id.get_string_repr() - ); - routing::payments_routing::update_success_rate_score_with_open_router( - state, - routable_connector.clone(), - profile_id, - &payment_attempt.payment_id, - payment_attempt.status, - ) - .await - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("failed to update gateway score in open router service")?; - - return Ok(()); - } - let payment_status_attribute = get_desired_payment_status_for_dynamic_routing_metrics(payment_attempt.status);