feat(routing): Enable volume split for dynamic routing (#6662)

Co-authored-by: prajjwalkumar17 <write2prajjwal@gmail.com>
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: prajjwalkumar17 <prajjwal.kumar@juspay.in>
This commit is contained in:
Sarthak Soni
2024-12-05 14:38:23 +05:30
committed by GitHub
parent 62521f367b
commit 03b936a117
12 changed files with 302 additions and 75 deletions

View File

@ -208,3 +208,6 @@ pub const VAULT_DELETE_FLOW_TYPE: &str = "delete_from_vault";
/// Vault Fingerprint fetch flow type
#[cfg(all(feature = "v2", feature = "payment_methods_v2"))]
pub const VAULT_GET_FINGERPRINT_FLOW_TYPE: &str = "get_fingerprint_vault";
/// Max volume split for Dynamic routing
pub const DYNAMIC_ROUTING_MAX_VOLUME: u8 = 100;

View File

@ -6026,47 +6026,75 @@ where
// dynamic success based connector selection
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
let connectors = {
if business_profile.dynamic_routing_algorithm.is_some() {
let success_based_routing_config_params_interpolator =
routing_helpers::SuccessBasedRoutingConfigParamsInterpolator::new(
payment_data.get_payment_attempt().payment_method,
payment_data.get_payment_attempt().payment_method_type,
payment_data.get_payment_attempt().authentication_type,
payment_data.get_payment_attempt().currency,
payment_data
.get_billing_address()
.and_then(|address| address.address)
.and_then(|address| address.country),
payment_data
.get_payment_attempt()
.payment_method_data
.as_ref()
.and_then(|data| data.as_object())
.and_then(|card| card.get("card"))
.and_then(|data| data.as_object())
.and_then(|card| card.get("card_network"))
.and_then(|network| network.as_str())
.map(|network| network.to_string()),
payment_data
.get_payment_attempt()
.payment_method_data
.as_ref()
.and_then(|data| data.as_object())
.and_then(|card| card.get("card"))
.and_then(|data| data.as_object())
.and_then(|card| card.get("card_isin"))
.and_then(|card_isin| card_isin.as_str())
.map(|card_isin| card_isin.to_string()),
);
routing::perform_success_based_routing(
state,
connectors.clone(),
business_profile,
success_based_routing_config_params_interpolator,
)
.await
.map_err(|e| logger::error!(success_rate_routing_error=?e))
.unwrap_or(connectors)
if let Some(algo) = business_profile.dynamic_routing_algorithm.clone() {
let dynamic_routing_config: api_models::routing::DynamicRoutingAlgorithmRef = algo
.parse_value("DynamicRoutingAlgorithmRef")
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("unable to deserialize DynamicRoutingAlgorithmRef from JSON")?;
let dynamic_split = api_models::routing::RoutingVolumeSplit {
routing_type: api_models::routing::RoutingType::Dynamic,
split: dynamic_routing_config
.dynamic_routing_volume_split
.unwrap_or_default(),
};
let static_split: api_models::routing::RoutingVolumeSplit =
api_models::routing::RoutingVolumeSplit {
routing_type: api_models::routing::RoutingType::Static,
split: crate::consts::DYNAMIC_ROUTING_MAX_VOLUME
- dynamic_routing_config
.dynamic_routing_volume_split
.unwrap_or_default(),
};
let volume_split_vec = vec![dynamic_split, static_split];
let routing_choice =
routing::perform_dynamic_routing_volume_split(volume_split_vec, None)
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("failed to perform volume split on routing type")?;
if routing_choice.routing_type.is_dynamic_routing() {
let success_based_routing_config_params_interpolator =
routing_helpers::SuccessBasedRoutingConfigParamsInterpolator::new(
payment_data.get_payment_attempt().payment_method,
payment_data.get_payment_attempt().payment_method_type,
payment_data.get_payment_attempt().authentication_type,
payment_data.get_payment_attempt().currency,
payment_data
.get_billing_address()
.and_then(|address| address.address)
.and_then(|address| address.country),
payment_data
.get_payment_attempt()
.payment_method_data
.as_ref()
.and_then(|data| data.as_object())
.and_then(|card| card.get("card"))
.and_then(|data| data.as_object())
.and_then(|card| card.get("card_network"))
.and_then(|network| network.as_str())
.map(|network| network.to_string()),
payment_data
.get_payment_attempt()
.payment_method_data
.as_ref()
.and_then(|data| data.as_object())
.and_then(|card| card.get("card"))
.and_then(|data| data.as_object())
.and_then(|card| card.get("card_isin"))
.and_then(|card_isin| card_isin.as_str())
.map(|card_isin| card_isin.to_string()),
);
routing::perform_success_based_routing(
state,
connectors.clone(),
business_profile,
success_based_routing_config_params_interpolator,
)
.await
.map_err(|e| logger::error!(success_rate_routing_error=?e))
.unwrap_or(connectors)
} else {
connectors
}
} else {
connectors
}

View File

@ -489,6 +489,36 @@ pub async fn refresh_routing_cache_v1(
Ok(arc_cached_algorithm)
}
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
pub fn perform_dynamic_routing_volume_split(
splits: Vec<api_models::routing::RoutingVolumeSplit>,
rng_seed: Option<&str>,
) -> RoutingResult<api_models::routing::RoutingVolumeSplit> {
let weights: Vec<u8> = splits.iter().map(|sp| sp.split).collect();
let weighted_index = distributions::WeightedIndex::new(weights)
.change_context(errors::RoutingError::VolumeSplitFailed)
.attach_printable("Error creating weighted distribution for volume split")?;
let idx = if let Some(seed) = rng_seed {
let mut hasher = hash_map::DefaultHasher::new();
seed.hash(&mut hasher);
let hash = hasher.finish();
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(hash);
weighted_index.sample(&mut rng)
} else {
let mut rng = rand::thread_rng();
weighted_index.sample(&mut rng)
};
let routing_choice = *splits
.get(idx)
.ok_or(errors::RoutingError::VolumeSplitFailed)
.attach_printable("Volume split index lookup failed")?;
Ok(routing_choice)
}
pub fn perform_volume_split(
mut splits: Vec<routing_types::ConnectorVolumeSplit>,
rng_seed: Option<&str>,

View File

@ -15,7 +15,9 @@ use error_stack::ResultExt;
use external_services::grpc_client::dynamic_routing::SuccessBasedDynamicRouting;
use hyperswitch_domain_models::{mandates, payment_address};
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
use router_env::{logger, metrics::add_attributes};
use router_env::logger;
#[cfg(feature = "v1")]
use router_env::metrics::add_attributes;
use rustc_hash::FxHashSet;
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
use storage_impl::redis::cache;
@ -1271,6 +1273,69 @@ pub async fn toggle_specific_dynamic_routing(
}
}
#[cfg(feature = "v1")]
pub async fn configure_dynamic_routing_volume_split(
state: SessionState,
merchant_account: domain::MerchantAccount,
key_store: domain::MerchantKeyStore,
profile_id: common_utils::id_type::ProfileId,
routing_info: routing::RoutingVolumeSplit,
) -> RouterResponse<()> {
metrics::ROUTING_CREATE_REQUEST_RECEIVED.add(
&metrics::CONTEXT,
1,
&add_attributes([("profile_id", profile_id.get_string_repr().to_owned())]),
);
let db = state.store.as_ref();
let key_manager_state = &(&state).into();
utils::when(
routing_info.split > crate::consts::DYNAMIC_ROUTING_MAX_VOLUME,
|| {
Err(errors::ApiErrorResponse::InvalidRequestData {
message: "Dynamic routing volume split should be less than 100".to_string(),
})
},
)?;
let business_profile: domain::Profile = core_utils::validate_and_get_business_profile(
db,
key_manager_state,
&key_store,
Some(&profile_id),
merchant_account.get_id(),
)
.await?
.get_required_value("Profile")
.change_context(errors::ApiErrorResponse::ProfileNotFound {
id: profile_id.get_string_repr().to_owned(),
})?;
let mut dynamic_routing_algo_ref: routing_types::DynamicRoutingAlgorithmRef = business_profile
.dynamic_routing_algorithm
.clone()
.map(|val| val.parse_value("DynamicRoutingAlgorithmRef"))
.transpose()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable(
"unable to deserialize dynamic routing algorithm ref from business profile",
)?
.unwrap_or_default();
dynamic_routing_algo_ref.update_volume_split(Some(routing_info.split));
helpers::update_business_profile_active_dynamic_algorithm_ref(
db,
&((&state).into()),
&key_store,
business_profile.clone(),
dynamic_routing_algo_ref.clone(),
)
.await?;
Ok(service_api::ApplicationResponse::StatusOk)
}
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
pub async fn success_based_routing_update_configs(
state: SessionState,

View File

@ -969,6 +969,8 @@ pub async fn disable_dynamic_routing_algorithm(
}),
elimination_routing_algorithm: dynamic_routing_algo_ref
.elimination_routing_algorithm,
dynamic_routing_volume_split: dynamic_routing_algo_ref
.dynamic_routing_volume_split,
},
cache_entries_to_redact,
)
@ -999,6 +1001,8 @@ pub async fn disable_dynamic_routing_algorithm(
algorithm_id,
routing_types::DynamicRoutingAlgorithmRef {
success_based_algorithm: dynamic_routing_algo_ref.success_based_algorithm,
dynamic_routing_volume_split: dynamic_routing_algo_ref
.dynamic_routing_volume_split,
elimination_routing_algorithm: Some(
routing_types::EliminationRoutingAlgorithm {
algorithm_id_with_timestamp:

View File

@ -1784,6 +1784,10 @@ impl Profile {
web::resource("/toggle")
.route(web::post().to(routing::toggle_elimination_routing)),
),
)
.service(
web::resource("/set_volume_split")
.route(web::post().to(routing::set_dynamic_routing_volume_split)),
),
);
}

View File

@ -67,7 +67,8 @@ impl From<Flow> for ApiIdentifier {
| Flow::DecisionManagerRetrieveConfig
| Flow::ToggleDynamicRouting
| Flow::UpdateDynamicRoutingConfigs
| Flow::DecisionManagerUpsertConfig => Self::Routing,
| Flow::DecisionManagerUpsertConfig
| Flow::VolumeSplitOnRoutingType => Self::Routing,
Flow::RetrieveForexFlow => Self::Forex,

View File

@ -1129,3 +1129,51 @@ pub async fn toggle_elimination_routing(
))
.await
}
#[cfg(all(feature = "olap", feature = "v1"))]
#[instrument(skip_all)]
pub async fn set_dynamic_routing_volume_split(
state: web::Data<AppState>,
req: HttpRequest,
query: web::Query<api_models::routing::DynamicRoutingVolumeSplitQuery>,
path: web::Path<routing_types::ToggleDynamicRoutingPath>,
) -> impl Responder {
let flow = Flow::VolumeSplitOnRoutingType;
let routing_info = api_models::routing::RoutingVolumeSplit {
routing_type: api_models::routing::RoutingType::Dynamic,
split: query.into_inner().split,
};
let payload = api_models::routing::RoutingVolumeSplitWrapper {
routing_info,
profile_id: path.into_inner().profile_id,
};
Box::pin(oss_api::server_wrap(
flow,
state,
&req,
payload.clone(),
|state,
auth: auth::AuthenticationData,
payload: api_models::routing::RoutingVolumeSplitWrapper,
_| {
routing::configure_dynamic_routing_volume_split(
state,
auth.merchant_account,
auth.key_store,
payload.profile_id,
payload.routing_info,
)
},
auth::auth_type(
&auth::HeaderAuth(auth::ApiKeyAuth),
&auth::JWTAuthProfileFromRoute {
profile_id: payload.profile_id,
required_permission: Permission::ProfileRoutingWrite,
},
req.headers(),
),
api_locking::LockAction::NotApplicable,
))
.await
}