feat(dynamic_routing): add open router integration for success based routing (#7795)

Co-authored-by: Sarthak Soni <sarthak.soni@sarthaksoni-KTKQRLPVC6.local>
Co-authored-by: Sarthak Soni <sarthakasoni@gmail.com>
This commit is contained in:
Chethan Rao
2025-04-21 19:30:19 +05:30
committed by GitHub
parent eabef328c6
commit a51c9f039f
12 changed files with 477 additions and 43 deletions

View File

@ -987,3 +987,7 @@ background_color = "#FFFFFF"
[billing_connectors_payment_sync]
billing_connectors_which_require_payment_sync = "stripebilling, recurly" # List of billing connectors which has payment sync api call
[open_router]
enabled = true # Enable or disable Open Router
url = "http://localhost:8080" # Open Router URL

View File

@ -1031,3 +1031,7 @@ background_color = "#FFFFFF"
[platform]
enabled = true
[open_router]
enabled = false
url = "http://localhost:8080"

View File

@ -899,3 +899,6 @@ background_color = "#FFFFFF"
[platform]
enabled = true
[open_router]
enabled = false

View File

@ -23,6 +23,8 @@ pub mod gsm;
pub mod health_check;
pub mod locker_migration;
pub mod mandates;
#[cfg(feature = "dynamic_routing")]
pub mod open_router;
pub mod organization;
pub mod payment_methods;
pub mod payments;

View File

@ -0,0 +1,122 @@
use std::{collections::HashMap, fmt::Debug};
use common_utils::{id_type, types::MinorUnit};
pub use euclid::{
dssa::types::EuclidAnalysable,
frontend::{
ast,
dir::{DirKeyKind, EuclidDirFilter},
},
};
use serde::{Deserialize, Serialize};
use crate::enums::{Currency, PaymentMethod, RoutableConnectors};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct OpenRouterDecideGatewayRequest {
pub payment_info: PaymentInfo,
pub merchant_id: id_type::ProfileId,
pub eligible_gateway_list: Option<Vec<RoutableConnectors>>,
pub ranking_algorithm: Option<RankingAlgorithm>,
pub elimination_enabled: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum RankingAlgorithm {
SrBasedRouting,
PlBasedRouting,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PaymentInfo {
pub payment_id: id_type::PaymentId,
pub amount: MinorUnit,
pub currency: Currency,
// customerId: Option<ETCu::CustomerId>,
// preferredGateway: Option<ETG::Gateway>,
pub payment_type: String,
// metadata: Option<String>,
// internalMetadata: Option<String>,
// isEmi: Option<bool>,
// emiBank: Option<String>,
// emiTenure: Option<i32>,
pub payment_method_type: String,
pub payment_method: PaymentMethod,
// paymentSource: Option<String>,
// authType: Option<ETCa::txn_card_info::AuthType>,
// cardIssuerBankName: Option<String>,
// cardIsin: Option<String>,
// cardType: Option<ETCa::card_type::CardType>,
// cardSwitchProvider: Option<Secret<String>>,
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct DecidedGateway {
pub gateway_priority_map: Option<HashMap<String, f64>>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ErrorResponse {
pub status: String,
pub error_code: String,
pub error_message: String,
pub priority_logic_tag: Option<String>,
pub filter_wise_gateways: Option<serde_json::Value>,
pub error_info: UnifiedError,
pub is_dynamic_mga_enabled: bool,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct UnifiedError {
pub code: String,
pub user_message: String,
pub developer_message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct UpdateScorePayload {
pub merchant_id: id_type::ProfileId,
pub gateway: RoutableConnectors,
pub status: TxnStatus,
pub payment_id: id_type::PaymentId,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum TxnStatus {
Started,
AuthenticationFailed,
JuspayDeclined,
PendingVBV,
VBVSuccessful,
Authorized,
AuthorizationFailed,
Charged,
Authorizing,
CODInitiated,
Voided,
VoidInitiated,
Nop,
CaptureInitiated,
CaptureFailed,
VoidFailed,
AutoRefunded,
PartialCharged,
ToBeCharged,
Pending,
Failure,
Declined,
}
impl From<bool> for TxnStatus {
fn from(value: bool) -> Self {
match value {
true => Self::Charged,
_ => Self::Failure,
}
}
}

View File

@ -532,5 +532,6 @@ pub(crate) async fn fetch_raw_secrets(
network_tokenization_supported_connectors: conf.network_tokenization_supported_connectors,
theme: conf.theme,
platform: conf.platform,
open_router: conf.open_router,
}
}

View File

@ -145,8 +145,14 @@ pub struct Settings<S: SecretState> {
pub network_tokenization_supported_connectors: NetworkTokenizationSupportedConnectors,
pub theme: ThemeSettings,
pub platform: Platform,
pub open_router: OpenRouter,
}
#[derive(Debug, Deserialize, Clone, Default)]
pub struct OpenRouter {
pub enabled: bool,
pub url: String,
}
#[derive(Debug, Deserialize, Clone, Default)]
pub struct Platform {
pub enabled: bool,

View File

@ -407,6 +407,10 @@ pub enum RoutingError {
ContractRoutingClientInitializationError,
#[error("Invalid contract based connector label received from dynamic routing service: '{0}'")]
InvalidContractBasedConnectorLabel(String),
#[error("Failed to perform {algo} in open_router")]
OpenRouterCallFailed { algo: String },
#[error("Error from open_router: {0}")]
OpenRouterError(String),
}
#[derive(Debug, Clone, thiserror::Error)]

View File

@ -7266,6 +7266,9 @@ where
.await
.change_context(errors::ApiErrorResponse::InternalServerError)?;
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
let payment_attempt = transaction_data.payment_attempt.clone();
let connectors = routing::perform_eligibility_analysis_with_fallback(
&state.clone(),
key_store,
@ -7280,8 +7283,7 @@ where
// dynamic success based connector selection
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
let connectors = {
if let Some(algo) = business_profile.dynamic_routing_algorithm.clone() {
let connectors = if let Some(algo) = business_profile.dynamic_routing_algorithm.clone() {
let dynamic_routing_config: api_models::routing::DynamicRoutingAlgorithmRef = algo
.parse_value("DynamicRoutingAlgorithmRef")
.change_context(errors::ApiErrorResponse::InternalServerError)
@ -7301,12 +7303,22 @@ where
.unwrap_or_default(),
};
let volume_split_vec = vec![dynamic_split, static_split];
let routing_choice =
routing::perform_dynamic_routing_volume_split(volume_split_vec, None)
let routing_choice = routing::perform_dynamic_routing_volume_split(volume_split_vec, None)
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("failed to perform volume split on routing type")?;
if routing_choice.routing_type.is_dynamic_routing() {
if state.conf.open_router.enabled {
routing::perform_open_routing(
state,
connectors.clone(),
business_profile,
payment_attempt,
)
.await
.map_err(|e| logger::error!(open_routing_error=?e))
.unwrap_or(connectors)
} else {
let dynamic_routing_config_params_interpolator =
routing_helpers::DynamicRoutingConfigParamsInterpolator::new(
payment_data.get_payment_attempt().payment_method,
@ -7348,12 +7360,12 @@ where
.await
.map_err(|e| logger::error!(dynamic_routing_error=?e))
.unwrap_or(connectors)
}
} else {
connectors
}
} else {
connectors
}
};
let connector_data = connectors

View File

@ -6,15 +6,21 @@ use std::collections::hash_map;
use std::hash::{Hash, Hasher};
use std::{collections::HashMap, str::FromStr, sync::Arc};
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
use api_models::routing as api_routing;
use api_models::{
admin as admin_api,
enums::{self as api_enums, CountryAlpha2},
routing::ConnectorSelection,
};
#[cfg(feature = "dynamic_routing")]
use common_utils::ext_traits::AsyncExt;
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
use api_models::{
open_router::{self as or_types, DecidedGateway, OpenRouterDecideGatewayRequest},
routing as api_routing,
};
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
use common_utils::{
ext_traits::{AsyncExt, BytesExt},
request,
};
use diesel_models::enums as storage_enums;
use error_stack::ResultExt;
use euclid::{
@ -48,11 +54,10 @@ use storage_impl::redis::cache::{CacheKey, CGRAPH_CACHE, ROUTING_CACHE};
use crate::core::admin;
#[cfg(feature = "payouts")]
use crate::core::payouts;
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
use crate::{core::routing::transformers::OpenRouterDecideGatewayRequestExt, headers, services};
use crate::{
core::{
errors, errors as oss_errors,
routing::{self},
},
core::{errors, errors as oss_errors, routing},
logger,
types::{
api::{self, routing as routing_types},
@ -1484,6 +1489,53 @@ pub fn make_dsl_input_for_surcharge(
Ok(backend_input)
}
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
pub async fn perform_open_routing(
state: &SessionState,
routable_connectors: Vec<api_routing::RoutableConnectorChoice>,
profile: &domain::Profile,
payment_data: oss_storage::PaymentAttempt,
) -> RoutingResult<Vec<api_routing::RoutableConnectorChoice>> {
let dynamic_routing_algo_ref: api_routing::DynamicRoutingAlgorithmRef = profile
.dynamic_routing_algorithm
.clone()
.map(|val| val.parse_value("DynamicRoutingAlgorithmRef"))
.transpose()
.change_context(errors::RoutingError::DeserializationError {
from: "JSON".to_string(),
to: "DynamicRoutingAlgorithmRef".to_string(),
})
.attach_printable("unable to deserialize DynamicRoutingAlgorithmRef from JSON")?
.ok_or(errors::RoutingError::GenericNotFoundError {
field: "dynamic_routing_algorithm".to_string(),
})?;
logger::debug!(
"performing dynamic_routing with open_router for profile {}",
profile.get_id().get_string_repr()
);
let connectors = dynamic_routing_algo_ref
.success_based_algorithm
.async_map(|algo| {
perform_success_based_routing_with_open_router(
state,
routable_connectors.clone(),
profile.get_id(),
algo,
payment_data,
)
})
.await
.transpose()
.inspect_err(|e| logger::error!(dynamic_routing_error=?e))
.ok()
.flatten()
.unwrap_or(routable_connectors);
Ok(connectors)
}
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
pub async fn perform_dynamic_routing(
state: &SessionState,
@ -1555,6 +1607,165 @@ pub async fn perform_dynamic_routing(
Ok(connector_list)
}
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
#[instrument(skip_all)]
pub async fn perform_success_based_routing_with_open_router(
state: &SessionState,
mut routable_connectors: Vec<api_routing::RoutableConnectorChoice>,
profile_id: &common_utils::id_type::ProfileId,
success_based_algo_ref: api_routing::SuccessBasedAlgorithm,
payment_attempt: oss_storage::PaymentAttempt,
) -> RoutingResult<Vec<api_routing::RoutableConnectorChoice>> {
if success_based_algo_ref.enabled_feature
== api_routing::DynamicRoutingFeatures::DynamicConnectorSelection
{
logger::debug!(
"performing success_based_routing with open_router for profile {}",
profile_id.get_string_repr()
);
let open_router_req_body = OpenRouterDecideGatewayRequest::construct_sr_request(
payment_attempt,
routable_connectors
.iter()
.map(|gateway| gateway.connector)
.collect::<Vec<_>>(),
Some(or_types::RankingAlgorithm::SrBasedRouting),
);
let url = format!("{}/{}", &state.conf.open_router.url, "decide-gateway");
let mut request = request::Request::new(services::Method::Post, &url);
request.add_header(headers::CONTENT_TYPE, "application/json".into());
request.add_header(
headers::X_TENANT_ID,
state.tenant.tenant_id.get_string_repr().to_owned().into(),
);
request.set_body(request::RequestContent::Json(Box::new(
open_router_req_body,
)));
let response = services::call_connector_api(state, request, "open_router_sr_call")
.await
.change_context(errors::RoutingError::OpenRouterCallFailed {
algo: "success_rate".into(),
})?;
let sr_sorted_connectors = match response {
Ok(resp) => {
let decided_gateway: DecidedGateway = resp
.response
.parse_struct("DecidedGateway")
.change_context(errors::RoutingError::OpenRouterError(
"Failed to parse the response from open_router".into(),
))?;
if let Some(gateway_priority_map) = decided_gateway.gateway_priority_map {
logger::debug!(
"Open router gateway_priority_map response: {:?}",
gateway_priority_map
);
routable_connectors.sort_by(|connector_choice_a, connector_choice_b| {
let connector_choice_a_score = gateway_priority_map
.get(&connector_choice_a.connector.to_string())
.copied()
.unwrap_or(0.0);
let connector_choice_b_score = gateway_priority_map
.get(&connector_choice_b.connector.to_string())
.copied()
.unwrap_or(0.0);
connector_choice_b_score
.partial_cmp(&connector_choice_a_score)
.unwrap_or(std::cmp::Ordering::Equal)
});
}
Ok(routable_connectors)
}
Err(err) => {
let err_resp: or_types::ErrorResponse = err
.response
.parse_struct("ErrorResponse")
.change_context(errors::RoutingError::OpenRouterError(
"Failed to parse the response from open_router".into(),
))?;
logger::error!("open_router_error_response: {:?}", err_resp);
Err(errors::RoutingError::OpenRouterError(
"Failed to perform success based routing in open router".into(),
))
}
}?;
Ok(sr_sorted_connectors)
} else {
Ok(routable_connectors)
}
}
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
#[instrument(skip_all)]
pub async fn update_success_rate_score_with_open_router(
state: &SessionState,
payment_connector: common_enums::RoutableConnectors,
profile_id: &common_utils::id_type::ProfileId,
payment_id: &common_utils::id_type::PaymentId,
payment_status: bool,
) -> RoutingResult<()> {
let open_router_req_body = or_types::UpdateScorePayload {
merchant_id: profile_id.clone(),
gateway: payment_connector,
status: payment_status.into(),
payment_id: payment_id.clone(),
};
let url = format!("{}/{}", &state.conf.open_router.url, "update-gateway-score");
let mut request = request::Request::new(services::Method::Post, &url);
request.add_header(headers::CONTENT_TYPE, "application/json".into());
request.add_header(
headers::X_TENANT_ID,
state.tenant.tenant_id.get_string_repr().to_owned().into(),
);
request.set_body(request::RequestContent::Json(Box::new(
open_router_req_body,
)));
let response =
services::call_connector_api(state, request, "open_router_update_gateway_score_call")
.await
.change_context(errors::RoutingError::OpenRouterCallFailed {
algo: "success_rate".into(),
})?;
match response {
Ok(resp) => {
let update_score_resp = String::from_utf8(resp.response.to_vec()).change_context(
errors::RoutingError::OpenRouterError(
"Failed to parse the response from open_router".into(),
),
)?;
logger::debug!(
"Open router update_gateway_score response: {:?}",
update_score_resp
);
Ok(())
}
Err(err) => {
let err_resp: or_types::ErrorResponse = err
.response
.parse_struct("ErrorResponse")
.change_context(errors::RoutingError::OpenRouterError(
"Failed to parse the response from open_router".into(),
))?;
logger::error!("open_router_error_response: {:?}", err_resp);
Err(errors::RoutingError::OpenRouterError(
"Failed to update gateway score for success based routing in open router".into(),
))
}
}?;
Ok(())
}
/// success based dynamic routing
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
#[instrument(skip_all)]

View File

@ -46,7 +46,10 @@ use crate::{
utils::StringExt,
};
#[cfg(all(feature = "dynamic_routing", feature = "v1"))]
use crate::{core::metrics as core_metrics, types::transformers::ForeignInto};
use crate::{
core::{metrics as core_metrics, routing},
types::transformers::ForeignInto,
};
pub const SUCCESS_BASED_DYNAMIC_ROUTING_ALGORITHM: &str =
"Success rate based dynamic routing algorithm";
pub const ELIMINATION_BASED_DYNAMIC_ROUTING_ALGORITHM: &str =
@ -688,6 +691,28 @@ pub async fn push_metrics_with_update_window_for_success_based_routing(
},
)?;
let payment_status_attribute =
get_desired_payment_status_for_dynamic_routing_metrics(payment_attempt.status);
let should_route_to_open_router = state.conf.open_router.enabled;
if should_route_to_open_router {
routing::payments_routing::update_success_rate_score_with_open_router(
state,
common_enums::RoutableConnectors::from_str(payment_connector.as_str())
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("unable to infer routable_connector from connector")?,
profile_id,
&payment_attempt.payment_id,
payment_status_attribute == common_enums::AttemptStatus::Charged,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("failed to update gateway score in open router service")?;
return Ok(());
}
let success_based_routing_configs = fetch_dynamic_routing_configs::<
routing_types::SuccessBasedRoutingConfig,
>(
@ -732,9 +757,6 @@ pub async fn push_metrics_with_update_window_for_success_based_routing(
"unable to calculate/fetch success rate from dynamic routing service",
)?;
let payment_status_attribute =
get_desired_payment_status_for_dynamic_routing_metrics(payment_attempt.status);
let first_merchant_success_based_connector = &success_based_connectors
.entity_scores_with_labels
.first()
@ -932,6 +954,7 @@ pub async fn push_metrics_with_update_window_for_success_based_routing(
.attach_printable(
"unable to update success based routing window in dynamic routing service",
)?;
Ok(())
} else {
Ok(())

View File

@ -1,12 +1,18 @@
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
use api_models::open_router::{OpenRouterDecideGatewayRequest, PaymentInfo, RankingAlgorithm};
use api_models::routing::{
MerchantRoutingAlgorithm, RoutingAlgorithm as Algorithm, RoutingAlgorithmKind,
RoutingDictionaryRecord,
};
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
use common_enums::RoutableConnectors;
use common_utils::ext_traits::ValueExt;
use diesel_models::{
enums as storage_enums,
routing_algorithm::{RoutingAlgorithm, RoutingProfileMetadata},
};
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
use hyperswitch_domain_models::payments::payment_attempt::PaymentAttempt;
use crate::{
core::{errors, routing},
@ -98,3 +104,39 @@ impl From<&routing::TransactionData<'_>> for storage_enums::TransactionType {
}
}
}
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
pub trait OpenRouterDecideGatewayRequestExt {
fn construct_sr_request(
attempt: PaymentAttempt,
eligible_gateway_list: Vec<RoutableConnectors>,
ranking_algorithm: Option<RankingAlgorithm>,
) -> Self
where
Self: Sized;
}
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
impl OpenRouterDecideGatewayRequestExt for OpenRouterDecideGatewayRequest {
fn construct_sr_request(
attempt: PaymentAttempt,
eligible_gateway_list: Vec<RoutableConnectors>,
ranking_algorithm: Option<RankingAlgorithm>,
) -> Self {
Self {
payment_info: PaymentInfo {
payment_id: attempt.payment_id,
amount: attempt.net_amount.get_order_amount(),
currency: attempt.currency.unwrap_or(storage_enums::Currency::USD),
payment_type: "ORDER_PAYMENT".to_string(),
// payment_method_type: attempt.payment_method_type.clone().unwrap(),
payment_method_type: "UPI".into(), // TODO: once open-router makes this field string, we can send from attempt
payment_method: attempt.payment_method.unwrap_or_default(),
},
merchant_id: attempt.profile_id,
eligible_gateway_list: Some(eligible_gateway_list),
ranking_algorithm,
elimination_enabled: None,
}
}
}