mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-28 04:04:55 +08:00
feat(routing): Add support to update config for elimination routing (#7938)
Co-authored-by: Spriti Aneja <spriti.aneja@juspay.in> Co-authored-by: Gnanasundari24 <118818938+Gnanasundari24@users.noreply.github.com>
This commit is contained in:
@ -2,9 +2,10 @@ use common_utils::events::{ApiEventMetric, ApiEventsType};
|
||||
|
||||
use crate::routing::{
|
||||
ContractBasedRoutingPayloadWrapper, ContractBasedRoutingSetupPayloadWrapper,
|
||||
DynamicRoutingUpdateConfigQuery, LinkedRoutingConfigRetrieveResponse, MerchantRoutingAlgorithm,
|
||||
ProfileDefaultRoutingConfig, RoutingAlgorithmId, RoutingConfigRequest, RoutingDictionaryRecord,
|
||||
RoutingKind, RoutingLinkWrapper, RoutingPayloadWrapper, RoutingRetrieveLinkQuery,
|
||||
DynamicRoutingUpdateConfigQuery, EliminationRoutingPayloadWrapper,
|
||||
LinkedRoutingConfigRetrieveResponse, MerchantRoutingAlgorithm, ProfileDefaultRoutingConfig,
|
||||
RoutingAlgorithmId, RoutingConfigRequest, RoutingDictionaryRecord, RoutingKind,
|
||||
RoutingLinkWrapper, RoutingPayloadWrapper, RoutingRetrieveLinkQuery,
|
||||
RoutingRetrieveLinkQueryWrapper, RoutingRetrieveQuery, RoutingVolumeSplitWrapper,
|
||||
SuccessBasedRoutingConfig, SuccessBasedRoutingPayloadWrapper, ToggleDynamicRoutingQuery,
|
||||
ToggleDynamicRoutingWrapper,
|
||||
@ -98,6 +99,12 @@ impl ApiEventMetric for SuccessBasedRoutingPayloadWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
impl ApiEventMetric for EliminationRoutingPayloadWrapper {
|
||||
fn get_api_event_type(&self) -> Option<ApiEventsType> {
|
||||
Some(ApiEventsType::Routing)
|
||||
}
|
||||
}
|
||||
|
||||
impl ApiEventMetric for ContractBasedRoutingPayloadWrapper {
|
||||
fn get_api_event_type(&self) -> Option<ApiEventsType> {
|
||||
Some(ApiEventsType::Routing)
|
||||
|
||||
@ -839,18 +839,42 @@ pub struct EliminationAnalyserConfig {
|
||||
pub bucket_leak_interval_in_secs: Option<u64>,
|
||||
}
|
||||
|
||||
impl EliminationAnalyserConfig {
|
||||
pub fn update(&mut self, new: Self) {
|
||||
if let Some(bucket_size) = new.bucket_size {
|
||||
self.bucket_size = Some(bucket_size)
|
||||
}
|
||||
if let Some(bucket_leak_interval_in_secs) = new.bucket_leak_interval_in_secs {
|
||||
self.bucket_leak_interval_in_secs = Some(bucket_leak_interval_in_secs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for EliminationRoutingConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
params: Some(vec![DynamicRoutingConfigParams::PaymentMethod]),
|
||||
elimination_analyser_config: Some(EliminationAnalyserConfig {
|
||||
bucket_size: Some(5),
|
||||
bucket_leak_interval_in_secs: Some(2),
|
||||
bucket_leak_interval_in_secs: Some(60),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl EliminationRoutingConfig {
|
||||
pub fn update(&mut self, new: Self) {
|
||||
if let Some(params) = new.params {
|
||||
self.params = Some(params)
|
||||
}
|
||||
if let Some(new_config) = new.elimination_analyser_config {
|
||||
self.elimination_analyser_config
|
||||
.as_mut()
|
||||
.map(|config| config.update(new_config));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, ToSchema)]
|
||||
pub struct SuccessBasedRoutingConfig {
|
||||
pub params: Option<Vec<DynamicRoutingConfigParams>>,
|
||||
@ -917,6 +941,13 @@ pub struct SuccessBasedRoutingPayloadWrapper {
|
||||
pub profile_id: common_utils::id_type::ProfileId,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct EliminationRoutingPayloadWrapper {
|
||||
pub updated_config: EliminationRoutingConfig,
|
||||
pub algorithm_id: common_utils::id_type::RoutingId,
|
||||
pub profile_id: common_utils::id_type::ProfileId,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct ContractBasedRoutingPayloadWrapper {
|
||||
pub updated_config: ContractBasedRoutingConfig,
|
||||
|
||||
@ -16,6 +16,7 @@ use error_stack::ResultExt;
|
||||
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
|
||||
use external_services::grpc_client::dynamic_routing::{
|
||||
contract_routing_client::ContractBasedDynamicRouting,
|
||||
elimination_based_client::EliminationBasedRouting,
|
||||
success_rate_client::SuccessBasedDynamicRouting,
|
||||
};
|
||||
use hyperswitch_domain_models::{mandates, payment_address};
|
||||
@ -1266,7 +1267,10 @@ pub async fn toggle_specific_dynamic_routing(
|
||||
) -> RouterResponse<routing_types::RoutingDictionaryRecord> {
|
||||
metrics::ROUTING_CREATE_REQUEST_RECEIVED.add(
|
||||
1,
|
||||
router_env::metric_attributes!(("profile_id", profile_id.clone())),
|
||||
router_env::metric_attributes!(
|
||||
("profile_id", profile_id.clone()),
|
||||
("algorithm_type", dynamic_routing_type.to_string())
|
||||
),
|
||||
);
|
||||
let db = state.store.as_ref();
|
||||
let key_manager_state = &(&state).into();
|
||||
@ -1396,7 +1400,13 @@ pub async fn success_based_routing_update_configs(
|
||||
) -> RouterResponse<routing_types::RoutingDictionaryRecord> {
|
||||
metrics::ROUTING_UPDATE_CONFIG_FOR_PROFILE.add(
|
||||
1,
|
||||
router_env::metric_attributes!(("profile_id", profile_id.clone())),
|
||||
router_env::metric_attributes!(
|
||||
("profile_id", profile_id.clone()),
|
||||
(
|
||||
"algorithm_type",
|
||||
routing::DynamicRoutingType::SuccessRateBasedRouting.to_string()
|
||||
)
|
||||
),
|
||||
);
|
||||
let db = state.store.as_ref();
|
||||
|
||||
@ -1468,9 +1478,109 @@ pub async fn success_based_routing_update_configs(
|
||||
state.get_grpc_headers(),
|
||||
)
|
||||
.await
|
||||
.change_context(errors::ApiErrorResponse::GenericNotFoundError {
|
||||
message: "Failed to invalidate the routing keys".to_string(),
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable("Failed to invalidate the routing keys")
|
||||
})
|
||||
.await
|
||||
.transpose()?;
|
||||
|
||||
Ok(service_api::ApplicationResponse::Json(new_record))
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
|
||||
pub async fn elimination_routing_update_configs(
|
||||
state: SessionState,
|
||||
request: routing_types::EliminationRoutingConfig,
|
||||
algorithm_id: common_utils::id_type::RoutingId,
|
||||
profile_id: common_utils::id_type::ProfileId,
|
||||
) -> RouterResponse<routing_types::RoutingDictionaryRecord> {
|
||||
metrics::ROUTING_UPDATE_CONFIG_FOR_PROFILE.add(
|
||||
1,
|
||||
router_env::metric_attributes!(
|
||||
("profile_id", profile_id.clone()),
|
||||
(
|
||||
"algorithm_type",
|
||||
routing::DynamicRoutingType::EliminationRouting.to_string()
|
||||
)
|
||||
),
|
||||
);
|
||||
|
||||
let db = state.store.as_ref();
|
||||
|
||||
let dynamic_routing_algo_to_update = db
|
||||
.find_routing_algorithm_by_profile_id_algorithm_id(&profile_id, &algorithm_id)
|
||||
.await
|
||||
.to_not_found_response(errors::ApiErrorResponse::ResourceIdNotFound)?;
|
||||
|
||||
let mut config_to_update: routing::EliminationRoutingConfig = dynamic_routing_algo_to_update
|
||||
.algorithm_data
|
||||
.parse_value::<routing::EliminationRoutingConfig>("EliminationRoutingConfig")
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable(
|
||||
"unable to deserialize algorithm data from routing table into EliminationRoutingConfig",
|
||||
)?;
|
||||
|
||||
config_to_update.update(request);
|
||||
|
||||
let updated_algorithm_id = common_utils::generate_routing_id_of_default_length();
|
||||
let timestamp = common_utils::date_time::now();
|
||||
let algo = RoutingAlgorithm {
|
||||
algorithm_id: updated_algorithm_id,
|
||||
profile_id: dynamic_routing_algo_to_update.profile_id,
|
||||
merchant_id: dynamic_routing_algo_to_update.merchant_id,
|
||||
name: dynamic_routing_algo_to_update.name,
|
||||
description: dynamic_routing_algo_to_update.description,
|
||||
kind: dynamic_routing_algo_to_update.kind,
|
||||
algorithm_data: serde_json::json!(config_to_update),
|
||||
created_at: timestamp,
|
||||
modified_at: timestamp,
|
||||
algorithm_for: dynamic_routing_algo_to_update.algorithm_for,
|
||||
};
|
||||
|
||||
let record = db
|
||||
.insert_routing_algorithm(algo)
|
||||
.await
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable("Unable to insert record in routing algorithm table")?;
|
||||
|
||||
// redact cache for elimination routing configs
|
||||
let cache_key = format!(
|
||||
"{}_{}",
|
||||
profile_id.get_string_repr(),
|
||||
algorithm_id.get_string_repr()
|
||||
);
|
||||
let cache_entries_to_redact = vec![cache::CacheKind::EliminationBasedDynamicRoutingCache(
|
||||
cache_key.into(),
|
||||
)];
|
||||
|
||||
cache::redact_from_redis_and_publish(
|
||||
state.store.get_cache_store().as_ref(),
|
||||
cache_entries_to_redact,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| logger::error!("unable to publish into the redact channel for evicting the elimination routing config cache {e:?}")).ok();
|
||||
|
||||
let new_record = record.foreign_into();
|
||||
|
||||
metrics::ROUTING_UPDATE_CONFIG_FOR_PROFILE_SUCCESS_RESPONSE.add(
|
||||
1,
|
||||
router_env::metric_attributes!(("profile_id", profile_id.clone())),
|
||||
);
|
||||
|
||||
state
|
||||
.grpc_client
|
||||
.dynamic_routing
|
||||
.elimination_based_client
|
||||
.as_ref()
|
||||
.async_map(|er_client| async {
|
||||
er_client
|
||||
.invalidate_elimination_bucket(
|
||||
profile_id.get_string_repr().into(),
|
||||
state.get_grpc_headers(),
|
||||
)
|
||||
.await
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable("Failed to invalidate the elimination routing keys")
|
||||
})
|
||||
.await
|
||||
.transpose()?;
|
||||
@ -1683,7 +1793,13 @@ pub async fn contract_based_routing_update_configs(
|
||||
) -> RouterResponse<routing_types::RoutingDictionaryRecord> {
|
||||
metrics::ROUTING_UPDATE_CONFIG_FOR_PROFILE.add(
|
||||
1,
|
||||
router_env::metric_attributes!(("profile_id", profile_id.get_string_repr().to_owned())),
|
||||
router_env::metric_attributes!(
|
||||
("profile_id", profile_id.get_string_repr().to_owned()),
|
||||
(
|
||||
"algorithm_type",
|
||||
routing::DynamicRoutingType::ContractBasedRouting.to_string()
|
||||
)
|
||||
),
|
||||
);
|
||||
let db = state.store.as_ref();
|
||||
let key_manager_state = &(&state).into();
|
||||
@ -1794,9 +1910,8 @@ pub async fn contract_based_routing_update_configs(
|
||||
state.get_grpc_headers(),
|
||||
)
|
||||
.await
|
||||
.change_context(errors::ApiErrorResponse::GenericNotFoundError {
|
||||
message: "Failed to invalidate the contract based routing keys".to_string(),
|
||||
})
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable("Failed to invalidate the contract based routing keys")
|
||||
})
|
||||
.await
|
||||
.transpose()?;
|
||||
|
||||
@ -2043,10 +2043,18 @@ impl Profile {
|
||||
.route(web::post().to(routing::set_dynamic_routing_volume_split)),
|
||||
)
|
||||
.service(
|
||||
web::scope("/elimination").service(
|
||||
web::scope("/elimination")
|
||||
.service(
|
||||
web::resource("/toggle")
|
||||
.route(web::post().to(routing::toggle_elimination_routing)),
|
||||
),
|
||||
)
|
||||
.service(web::resource("config/{algorithm_id}").route(
|
||||
web::patch().to(|state, req, path, payload| {
|
||||
routing::elimination_routing_update_configs(
|
||||
state, req, path, payload,
|
||||
)
|
||||
}),
|
||||
)),
|
||||
)
|
||||
.service(
|
||||
web::scope("/contracts")
|
||||
|
||||
@ -1299,6 +1299,50 @@ pub async fn success_based_routing_update_configs(
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "olap", feature = "v1", feature = "dynamic_routing"))]
|
||||
#[instrument(skip_all)]
|
||||
pub async fn elimination_routing_update_configs(
|
||||
state: web::Data<AppState>,
|
||||
req: HttpRequest,
|
||||
path: web::Path<routing_types::DynamicRoutingUpdateConfigQuery>,
|
||||
json_payload: web::Json<routing_types::EliminationRoutingConfig>,
|
||||
) -> impl Responder {
|
||||
let flow = Flow::UpdateDynamicRoutingConfigs;
|
||||
let routing_payload_wrapper = routing_types::EliminationRoutingPayloadWrapper {
|
||||
updated_config: json_payload.into_inner(),
|
||||
algorithm_id: path.clone().algorithm_id,
|
||||
profile_id: path.clone().profile_id,
|
||||
};
|
||||
Box::pin(oss_api::server_wrap(
|
||||
flow,
|
||||
state,
|
||||
&req,
|
||||
routing_payload_wrapper.clone(),
|
||||
|state, _, wrapper: routing_types::EliminationRoutingPayloadWrapper, _| async {
|
||||
Box::pin(routing::elimination_routing_update_configs(
|
||||
state,
|
||||
wrapper.updated_config,
|
||||
wrapper.algorithm_id,
|
||||
wrapper.profile_id,
|
||||
))
|
||||
.await
|
||||
},
|
||||
auth::auth_type(
|
||||
&auth::HeaderAuth(auth::ApiKeyAuth {
|
||||
is_connected_allowed: false,
|
||||
is_platform_allowed: false,
|
||||
}),
|
||||
&auth::JWTAuthProfileFromRoute {
|
||||
profile_id: routing_payload_wrapper.profile_id,
|
||||
required_permission: Permission::ProfileRoutingWrite,
|
||||
},
|
||||
req.headers(),
|
||||
),
|
||||
api_locking::LockAction::NotApplicable,
|
||||
))
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "olap", feature = "v1", feature = "dynamic_routing"))]
|
||||
#[instrument(skip_all)]
|
||||
pub async fn contract_based_routing_setup_config(
|
||||
|
||||
Reference in New Issue
Block a user