mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-28 04:04:55 +08:00
feat: migration api for migrating routing rules to decision_engine (#8233)
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
@ -7,7 +7,8 @@ use crate::routing::{
|
||||
RoutingAlgorithmId, RoutingConfigRequest, RoutingDictionaryRecord, RoutingKind,
|
||||
RoutingLinkWrapper, RoutingPayloadWrapper, RoutingRetrieveLinkQuery,
|
||||
RoutingRetrieveLinkQueryWrapper, RoutingRetrieveQuery, RoutingVolumeSplit,
|
||||
RoutingVolumeSplitResponse, RoutingVolumeSplitWrapper, SuccessBasedRoutingConfig,
|
||||
RoutingVolumeSplitResponse, RoutingVolumeSplitWrapper, RuleMigrationError, RuleMigrationQuery,
|
||||
RuleMigrationResponse, RuleMigrationResult, SuccessBasedRoutingConfig,
|
||||
SuccessBasedRoutingPayloadWrapper, ToggleDynamicRoutingPath, ToggleDynamicRoutingQuery,
|
||||
ToggleDynamicRoutingWrapper,
|
||||
};
|
||||
@ -153,3 +154,27 @@ impl ApiEventMetric for RoutingVolumeSplit {
|
||||
Some(ApiEventsType::Routing)
|
||||
}
|
||||
}
|
||||
|
||||
impl ApiEventMetric for RuleMigrationQuery {
|
||||
fn get_api_event_type(&self) -> Option<ApiEventsType> {
|
||||
Some(ApiEventsType::Routing)
|
||||
}
|
||||
}
|
||||
|
||||
impl ApiEventMetric for RuleMigrationResponse {
|
||||
fn get_api_event_type(&self) -> Option<ApiEventsType> {
|
||||
Some(ApiEventsType::Routing)
|
||||
}
|
||||
}
|
||||
|
||||
impl ApiEventMetric for RuleMigrationResult {
|
||||
fn get_api_event_type(&self) -> Option<ApiEventsType> {
|
||||
Some(ApiEventsType::Routing)
|
||||
}
|
||||
}
|
||||
|
||||
impl ApiEventMetric for RuleMigrationError {
|
||||
fn get_api_event_type(&self) -> Option<ApiEventsType> {
|
||||
Some(ApiEventsType::Routing)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1538,3 +1538,50 @@ pub enum ContractUpdationStatusEventResponse {
|
||||
ContractUpdationSucceeded,
|
||||
ContractUpdationFailed,
|
||||
}
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub struct RuleMigrationQuery {
|
||||
pub profile_id: common_utils::id_type::ProfileId,
|
||||
pub merchant_id: common_utils::id_type::MerchantId,
|
||||
pub limit: Option<u32>,
|
||||
pub offset: Option<u32>,
|
||||
}
|
||||
|
||||
impl RuleMigrationQuery {
|
||||
pub fn validated_limit(&self) -> u32 {
|
||||
self.limit.unwrap_or(50).min(1000)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Serialize)]
|
||||
pub struct RuleMigrationResult {
|
||||
pub success: Vec<RuleMigrationResponse>,
|
||||
pub errors: Vec<RuleMigrationError>,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Serialize)]
|
||||
pub struct RuleMigrationResponse {
|
||||
pub profile_id: common_utils::id_type::ProfileId,
|
||||
pub euclid_algorithm_id: common_utils::id_type::RoutingId,
|
||||
pub decision_engine_algorithm_id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Serialize)]
|
||||
pub struct RuleMigrationError {
|
||||
pub profile_id: common_utils::id_type::ProfileId,
|
||||
pub algorithm_id: common_utils::id_type::RoutingId,
|
||||
pub error: String,
|
||||
}
|
||||
|
||||
impl RuleMigrationResponse {
|
||||
pub fn new(
|
||||
profile_id: common_utils::id_type::ProfileId,
|
||||
euclid_algorithm_id: common_utils::id_type::RoutingId,
|
||||
decision_engine_algorithm_id: String,
|
||||
) -> Self {
|
||||
Self {
|
||||
profile_id,
|
||||
euclid_algorithm_id,
|
||||
decision_engine_algorithm_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -30,6 +30,7 @@ pub struct RoutingAlgorithmMetadata {
|
||||
pub algorithm_for: enums::TransactionType,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct RoutingProfileMetadata {
|
||||
pub profile_id: id_type::ProfileId,
|
||||
pub algorithm_id: id_type::RoutingId,
|
||||
@ -40,3 +41,10 @@ pub struct RoutingProfileMetadata {
|
||||
pub modified_at: time::PrimitiveDateTime,
|
||||
pub algorithm_for: enums::TransactionType,
|
||||
}
|
||||
|
||||
impl RoutingProfileMetadata {
|
||||
pub fn metadata_is_advanced_rule_for_payments(&self) -> bool {
|
||||
matches!(self.kind, enums::RoutingAlgorithmKind::Advanced)
|
||||
&& matches!(self.algorithm_for, enums::TransactionType::Payment)
|
||||
}
|
||||
}
|
||||
|
||||
@ -658,6 +658,7 @@ pub struct Program {
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub struct RoutingRule {
|
||||
pub rule_id: Option<String>,
|
||||
pub name: String,
|
||||
pub description: Option<String>,
|
||||
pub metadata: Option<RoutingMetadata>,
|
||||
|
||||
@ -6,7 +6,9 @@ use std::collections::HashSet;
|
||||
use api_models::routing::DynamicRoutingAlgoAccessor;
|
||||
use api_models::{
|
||||
enums, mandates as mandates_api, routing,
|
||||
routing::{self as routing_types, RoutingRetrieveQuery},
|
||||
routing::{
|
||||
self as routing_types, RoutingRetrieveQuery, RuleMigrationError, RuleMigrationResponse,
|
||||
},
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
|
||||
@ -22,6 +24,7 @@ use external_services::grpc_client::dynamic_routing::{
|
||||
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
|
||||
use helpers::update_decision_engine_dynamic_routing_setup;
|
||||
use hyperswitch_domain_models::{mandates, payment_address};
|
||||
use payment_methods::helpers::StorageErrorExt;
|
||||
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
|
||||
use router_env::logger;
|
||||
use rustc_hash::FxHashSet;
|
||||
@ -46,7 +49,7 @@ use crate::utils::ValueExt;
|
||||
use crate::{core::admin, utils::ValueExt};
|
||||
use crate::{
|
||||
core::{
|
||||
errors::{self, CustomResult, RouterResponse, StorageErrorExt},
|
||||
errors::{self, CustomResult, RouterResponse},
|
||||
metrics, utils as core_utils,
|
||||
},
|
||||
db::StorageInterface,
|
||||
@ -345,6 +348,7 @@ pub async fn create_routing_algorithm_under_profile(
|
||||
match program.try_into() {
|
||||
Ok(internal_program) => {
|
||||
let routing_rule = RoutingRule {
|
||||
rule_id: None,
|
||||
name: name.clone(),
|
||||
description: Some(description.clone()),
|
||||
created_by: profile_id.get_string_repr().to_string(),
|
||||
@ -2291,3 +2295,158 @@ impl RoutableConnectors {
|
||||
Ok(connector_data)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn migrate_rules_for_profile(
|
||||
state: SessionState,
|
||||
merchant_context: domain::MerchantContext,
|
||||
query_params: routing_types::RuleMigrationQuery,
|
||||
) -> RouterResult<routing_types::RuleMigrationResult> {
|
||||
use api_models::routing::StaticRoutingAlgorithm as EuclidAlgorithm;
|
||||
|
||||
use crate::services::logger;
|
||||
|
||||
let profile_id = query_params.profile_id.clone();
|
||||
let db = state.store.as_ref();
|
||||
let key_manager_state = &(&state).into();
|
||||
let merchant_key_store = merchant_context.get_merchant_key_store();
|
||||
let merchant_id = merchant_context.get_merchant_account().get_id();
|
||||
|
||||
core_utils::validate_and_get_business_profile(
|
||||
db,
|
||||
key_manager_state,
|
||||
merchant_key_store,
|
||||
Some(&profile_id),
|
||||
merchant_id,
|
||||
)
|
||||
.await?
|
||||
.get_required_value("Profile")
|
||||
.change_context(errors::ApiErrorResponse::ProfileNotFound {
|
||||
id: profile_id.get_string_repr().to_owned(),
|
||||
})?;
|
||||
|
||||
let routing_metadatas: Vec<diesel_models::routing_algorithm::RoutingProfileMetadata> = state
|
||||
.store
|
||||
.list_routing_algorithm_metadata_by_profile_id(
|
||||
&profile_id,
|
||||
i64::from(query_params.validated_limit()),
|
||||
i64::from(query_params.offset.unwrap_or_default()),
|
||||
)
|
||||
.await
|
||||
.to_not_found_response(errors::ApiErrorResponse::ResourceIdNotFound)?;
|
||||
|
||||
let mut response_list = Vec::new();
|
||||
let mut error_list = Vec::new();
|
||||
|
||||
for routing_metadata in routing_metadatas
|
||||
.into_iter()
|
||||
.filter(|algo| algo.metadata_is_advanced_rule_for_payments())
|
||||
{
|
||||
match db
|
||||
.find_routing_algorithm_by_profile_id_algorithm_id(
|
||||
&profile_id,
|
||||
&routing_metadata.algorithm_id,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(algorithm) => {
|
||||
let parsed_result = algorithm
|
||||
.algorithm_data
|
||||
.parse_value::<EuclidAlgorithm>("EuclidAlgorithm");
|
||||
|
||||
match parsed_result {
|
||||
Ok(EuclidAlgorithm::Advanced(program)) => match program.try_into() {
|
||||
Ok(internal_program) => {
|
||||
let routing_rule = RoutingRule {
|
||||
rule_id: Some(
|
||||
algorithm.algorithm_id.clone().get_string_repr().to_string(),
|
||||
),
|
||||
name: algorithm.name.clone(),
|
||||
description: algorithm.description.clone(),
|
||||
created_by: profile_id.get_string_repr().to_string(),
|
||||
algorithm: internal_program,
|
||||
metadata: None,
|
||||
};
|
||||
|
||||
let result = create_de_euclid_routing_algo(&state, &routing_rule).await;
|
||||
|
||||
match result {
|
||||
Ok(decision_engine_routing_id) => {
|
||||
let response = RuleMigrationResponse {
|
||||
profile_id: profile_id.clone(),
|
||||
euclid_algorithm_id: algorithm.algorithm_id.clone(),
|
||||
decision_engine_algorithm_id: decision_engine_routing_id,
|
||||
};
|
||||
response_list.push(response);
|
||||
}
|
||||
Err(err) => {
|
||||
logger::error!(
|
||||
decision_engine_rule_migration_error = ?err,
|
||||
algorithm_id = ?algorithm.algorithm_id,
|
||||
"Failed to insert into decision engine"
|
||||
);
|
||||
error_list.push(RuleMigrationError {
|
||||
profile_id: profile_id.clone(),
|
||||
algorithm_id: algorithm.algorithm_id.clone(),
|
||||
error: format!("Insertion error: {:?}", err),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
logger::error!(
|
||||
decision_engine_rule_migration_error = ?e,
|
||||
algorithm_id = ?algorithm.algorithm_id,
|
||||
"Failed to convert program"
|
||||
);
|
||||
error_list.push(RuleMigrationError {
|
||||
profile_id: profile_id.clone(),
|
||||
algorithm_id: algorithm.algorithm_id.clone(),
|
||||
error: format!("Program conversion error: {:?}", e),
|
||||
});
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
logger::error!(
|
||||
decision_engine_rule_migration_error = ?e,
|
||||
algorithm_id = ?algorithm.algorithm_id,
|
||||
"Failed to parse EuclidAlgorithm"
|
||||
);
|
||||
error_list.push(RuleMigrationError {
|
||||
profile_id: profile_id.clone(),
|
||||
algorithm_id: algorithm.algorithm_id.clone(),
|
||||
error: format!("JSON parse error: {:?}", e),
|
||||
});
|
||||
}
|
||||
_ => {
|
||||
logger::info!(
|
||||
"decision_engine_rule_migration_error: Skipping non-advanced algorithm {:?}",
|
||||
algorithm.algorithm_id
|
||||
);
|
||||
error_list.push(RuleMigrationError {
|
||||
profile_id: profile_id.clone(),
|
||||
algorithm_id: algorithm.algorithm_id.clone(),
|
||||
error: "Not an advanced algorithm".to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
logger::error!(
|
||||
decision_engine_rule_migration_error = ?e,
|
||||
algorithm_id = ?routing_metadata.algorithm_id,
|
||||
"Failed to fetch routing algorithm"
|
||||
);
|
||||
error_list.push(RuleMigrationError {
|
||||
profile_id: profile_id.clone(),
|
||||
algorithm_id: routing_metadata.algorithm_id.clone(),
|
||||
error: format!("Fetch error: {:?}", e),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(routing_types::RuleMigrationResult {
|
||||
success: response_list,
|
||||
errors: error_list,
|
||||
})
|
||||
}
|
||||
|
||||
@ -3,6 +3,7 @@ use std::{collections::HashMap, sync::Arc};
|
||||
use actix_web::{web, Scope};
|
||||
#[cfg(all(feature = "olap", feature = "v1"))]
|
||||
use api_models::routing::RoutingRetrieveQuery;
|
||||
use api_models::routing::RuleMigrationQuery;
|
||||
#[cfg(feature = "olap")]
|
||||
use common_enums::TransactionType;
|
||||
#[cfg(feature = "partial-auth")]
|
||||
@ -905,6 +906,11 @@ impl Routing {
|
||||
)
|
||||
})),
|
||||
)
|
||||
.service(web::resource("/rule/migrate").route(web::post().to(
|
||||
|state, req, query: web::Query<RuleMigrationQuery>| {
|
||||
routing::migrate_routing_rules_for_profile(state, req, query)
|
||||
},
|
||||
)))
|
||||
.service(
|
||||
web::resource("/deactivate").route(web::post().to(|state, req, payload| {
|
||||
routing::routing_unlink_config(state, req, payload, None)
|
||||
|
||||
@ -79,6 +79,7 @@ impl From<Flow> for ApiIdentifier {
|
||||
| Flow::ToggleDynamicRouting
|
||||
| Flow::UpdateDynamicRoutingConfigs
|
||||
| Flow::DecisionManagerUpsertConfig
|
||||
| Flow::DecisionEngineRuleMigration
|
||||
| Flow::VolumeSplitOnRoutingType => Self::Routing,
|
||||
|
||||
Flow::RetrieveForexFlow => Self::Forex,
|
||||
|
||||
@ -4,7 +4,12 @@
|
||||
//! of Routing configs.
|
||||
|
||||
use actix_web::{web, HttpRequest, Responder};
|
||||
use api_models::{enums, routing as routing_types, routing::RoutingRetrieveQuery};
|
||||
use api_models::{
|
||||
enums,
|
||||
routing::{self as routing_types, RoutingRetrieveQuery},
|
||||
};
|
||||
use hyperswitch_domain_models::merchant_context::MerchantKeyStore;
|
||||
use payment_methods::core::errors::ApiErrorResponse;
|
||||
use router_env::{
|
||||
tracing::{self, instrument},
|
||||
Flow,
|
||||
@ -12,6 +17,7 @@ use router_env::{
|
||||
|
||||
use crate::{
|
||||
core::{api_locking, conditional_config, routing, surcharge_decision_config},
|
||||
db::errors::StorageErrorExt,
|
||||
routes::AppState,
|
||||
services::{api as oss_api, authentication as auth, authorization::permissions::Permission},
|
||||
types::domain,
|
||||
@ -1558,3 +1564,61 @@ pub async fn get_dynamic_routing_volume_split(
|
||||
))
|
||||
.await
|
||||
}
|
||||
|
||||
use actix_web::HttpResponse;
|
||||
#[instrument(skip_all, fields(flow = ?Flow::DecisionEngineRuleMigration))]
|
||||
pub async fn migrate_routing_rules_for_profile(
|
||||
state: web::Data<AppState>,
|
||||
req: HttpRequest,
|
||||
query: web::Query<routing_types::RuleMigrationQuery>,
|
||||
) -> HttpResponse {
|
||||
let flow = Flow::DecisionEngineRuleMigration;
|
||||
|
||||
Box::pin(oss_api::server_wrap(
|
||||
flow,
|
||||
state,
|
||||
&req,
|
||||
query.into_inner(),
|
||||
|state, _, query_params, _| async move {
|
||||
let merchant_id = query_params.merchant_id.clone();
|
||||
let (key_store, merchant_account) = get_merchant_account(&state, &merchant_id).await?;
|
||||
let merchant_context = domain::MerchantContext::NormalMerchant(Box::new(
|
||||
domain::Context(merchant_account, key_store),
|
||||
));
|
||||
let res = Box::pin(routing::migrate_rules_for_profile(
|
||||
state,
|
||||
merchant_context,
|
||||
query_params,
|
||||
))
|
||||
.await?;
|
||||
Ok(crate::services::ApplicationResponse::Json(res))
|
||||
},
|
||||
&auth::AdminApiAuth,
|
||||
api_locking::LockAction::NotApplicable,
|
||||
))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_merchant_account(
|
||||
state: &super::SessionState,
|
||||
merchant_id: &common_utils::id_type::MerchantId,
|
||||
) -> common_utils::errors::CustomResult<(MerchantKeyStore, domain::MerchantAccount), ApiErrorResponse>
|
||||
{
|
||||
let key_manager_state = &state.into();
|
||||
let key_store = state
|
||||
.store
|
||||
.get_merchant_key_store_by_merchant_id(
|
||||
key_manager_state,
|
||||
merchant_id,
|
||||
&state.store.get_master_key().to_vec().into(),
|
||||
)
|
||||
.await
|
||||
.to_not_found_response(ApiErrorResponse::MerchantAccountNotFound)?;
|
||||
|
||||
let merchant_account = state
|
||||
.store
|
||||
.find_merchant_account_by_merchant_id(key_manager_state, merchant_id, &key_store)
|
||||
.await
|
||||
.to_not_found_response(ApiErrorResponse::MerchantAccountNotFound)?;
|
||||
Ok((key_store, merchant_account))
|
||||
}
|
||||
|
||||
@ -249,6 +249,8 @@ pub enum Flow {
|
||||
RoutingRetrieveDefaultConfig,
|
||||
/// Routing retrieve dictionary
|
||||
RoutingRetrieveDictionary,
|
||||
/// Rule migration for decision-engine
|
||||
DecisionEngineRuleMigration,
|
||||
/// Routing update config
|
||||
RoutingUpdateConfig,
|
||||
/// Routing update default config
|
||||
|
||||
Reference in New Issue
Block a user