refactor(routing): Routing events core refactor (#8323)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: prajjwalkumar17 <prajjwal.kumar@juspay.in>
This commit is contained in:
Sarthak Soni
2025-06-17 19:46:05 +05:30
committed by GitHub
parent 7f6f4c47fe
commit 4d36be87ec
6 changed files with 1535 additions and 740 deletions

View File

@ -388,6 +388,8 @@ pub enum RoutingError {
DecisionEngineValidationError(String),
#[error("Invalid transaction type")]
InvalidTransactionType,
#[error("Routing events error: {message}, status code: {status_code}")]
RoutingEventsError { message: String, status_code: u16 },
}
#[derive(Debug, Clone, thiserror::Error)]

View File

@ -17,8 +17,6 @@ use api_models::{
};
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
use common_utils::ext_traits::AsyncExt;
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
use common_utils::{ext_traits::BytesExt, request};
use diesel_models::enums as storage_enums;
use error_stack::ResultExt;
use euclid::{
@ -30,13 +28,12 @@ use euclid::{
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
use external_services::grpc_client::dynamic_routing::{
contract_routing_client::ContractBasedDynamicRouting,
elimination_based_client::{EliminationBasedRouting, EliminationResponse},
success_rate_client::SuccessBasedDynamicRouting,
DynamicRoutingError,
elimination_based_client::EliminationBasedRouting,
success_rate_client::SuccessBasedDynamicRouting, DynamicRoutingError,
};
use hyperswitch_domain_models::address::Address;
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
use hyperswitch_interfaces::events::routing_api_logs::{ApiMethod, RoutingEngine, RoutingEvent};
use hyperswitch_interfaces::events::routing_api_logs::{ApiMethod, RoutingEngine};
use kgraph_utils::{
mca as mca_graph,
transformers::{IntoContext, IntoDirValue},
@ -59,8 +56,6 @@ use crate::core::payouts;
#[cfg(feature = "v1")]
use crate::core::routing::transformers::OpenRouterDecideGatewayRequestExt;
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
use crate::headers;
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
use crate::routes::app::SessionStateInfo;
use crate::{
core::{
@ -482,10 +477,36 @@ pub async fn perform_static_routing_v1(
}
};
let payment_id = match transaction_data {
routing::TransactionData::Payment(payment_data) => payment_data
.payment_attempt
.payment_id
.clone()
.get_string_repr()
.to_string(),
#[cfg(feature = "payouts")]
routing::TransactionData::Payout(payout_data) => {
payout_data.payout_attempt.payout_id.clone()
}
};
let routing_events_wrapper = utils::RoutingEventsWrapper::new(
state.tenant.tenant_id.clone(),
state.request_id,
payment_id,
business_profile.get_id().to_owned(),
business_profile.merchant_id.to_owned(),
"DecisionEngine: Euclid Static Routing".to_string(),
None,
true,
false,
);
let de_euclid_connectors = perform_decision_euclid_routing(
state,
backend_input.clone(),
business_profile.get_id().get_string_repr().to_string(),
routing_events_wrapper
)
.await
.map_err(|e|
@ -497,8 +518,12 @@ pub async fn perform_static_routing_v1(
.iter()
.map(|c| c.connector.to_string())
.collect::<Vec<String>>();
let de_connectors = de_euclid_connectors
.iter()
.map(|c| c.gateway_name.to_string())
.collect::<Vec<String>>();
utils::compare_and_log_result(
de_euclid_connectors,
de_connectors,
connectors,
"evaluate_routing".to_string(),
);
@ -1646,19 +1671,36 @@ pub async fn perform_open_routing_for_debit_routing(
Some(or_types::RankingAlgorithm::NtwBasedRouting),
);
let response: RoutingResult<DecidedGateway> =
let routing_events_wrapper = utils::RoutingEventsWrapper::new(
state.tenant.tenant_id.clone(),
state.request_id,
payment_attempt.payment_id.get_string_repr().to_string(),
payment_attempt.profile_id.to_owned(),
payment_attempt.merchant_id.to_owned(),
"DecisionEngine: Debit Routing".to_string(),
Some(open_router_req_body.clone()),
true,
true,
);
let response: RoutingResult<utils::RoutingEventsResponse<DecidedGateway>> =
utils::EuclidApiClient::send_decision_engine_request(
state,
services::Method::Post,
"decide-gateway",
Some(open_router_req_body),
None,
Some(routing_events_wrapper),
)
.await;
let output = match response {
Ok(decided_gateway) => {
let debit_routing_output = decided_gateway
Ok(events_response) => {
let debit_routing_output = events_response
.response
.ok_or(errors::RoutingError::OpenRouterError(
"Response from decision engine API is empty".to_string(),
))?
.debit_routing_output
.get_required_value("debit_routing_output")
.change_context(errors::RoutingError::OpenRouterError(
@ -1797,57 +1839,45 @@ pub async fn perform_decide_gateway_call_with_open_router(
is_elimination_enabled,
);
let serialized_request = serde_json::to_value(&open_router_req_body)
.change_context(errors::RoutingError::OpenRouterCallFailed)
.attach_printable("Failed to serialize open_router request body")?;
let url = format!("{}/{}", &state.conf.open_router.url, "decide-gateway");
let mut request = request::Request::new(services::Method::Post, &url);
request.add_header(headers::CONTENT_TYPE, "application/json".into());
request.add_header(
headers::X_TENANT_ID,
state.tenant.tenant_id.get_string_repr().to_owned().into(),
);
request.set_body(request::RequestContent::Json(Box::new(
open_router_req_body,
)));
let mut routing_event = RoutingEvent::new(
let routing_events_wrapper = utils::RoutingEventsWrapper::new(
state.tenant.tenant_id.clone(),
"".to_string(),
"open_router_decide_gateway_call",
serialized_request,
url.clone(),
ApiMethod::Rest(services::Method::Post),
payment_attempt.payment_id.get_string_repr().to_string(),
profile_id.to_owned(),
payment_attempt.merchant_id.to_owned(),
state.request_id,
RoutingEngine::DecisionEngine,
payment_attempt.payment_id.get_string_repr().to_string(),
payment_attempt.profile_id.to_owned(),
payment_attempt.merchant_id.to_owned(),
"DecisionEngine: SuccessRate decide_gateway".to_string(),
Some(open_router_req_body.clone()),
true,
false,
);
let response = services::call_connector_api(state, request, "open_router_decide_gateway_call")
.await
.inspect_err(|err| {
routing_event
.set_error(serde_json::json!({"error": err.current_context().to_string()}));
state.event_handler().log_event(&routing_event);
})
.change_context(errors::RoutingError::OpenRouterCallFailed)?;
let response: RoutingResult<utils::RoutingEventsResponse<DecidedGateway>> =
utils::SRApiClient::send_decision_engine_request(
state,
services::Method::Post,
"decide-gateway",
Some(open_router_req_body),
None,
Some(routing_events_wrapper),
)
.await;
let sr_sorted_connectors = match response {
Ok(resp) => {
let decided_gateway: DecidedGateway = resp
.response
.parse_struct("DecidedGateway")
.change_context(errors::RoutingError::OpenRouterError(
"Failed to parse the response from open_router".into(),
let decided_gateway: DecidedGateway =
resp.response.ok_or(errors::RoutingError::OpenRouterError(
"Empty response received from open_router".into(),
))?;
routing_event.set_status_code(resp.status_code);
let mut routing_event = resp.event.ok_or(errors::RoutingError::RoutingEventsError {
message: "Decision-Engine: RoutingEvent not found in RoutingEventsResponse"
.to_string(),
status_code: 500,
})?;
routing_event.set_response_body(&decided_gateway);
routing_event.set_routing_approach(
api_routing::RoutingApproach::from_decision_engine_approach(
utils::RoutingApproach::from_decision_engine_approach(
&decided_gateway.routing_approach,
)
.to_string(),
@ -1876,18 +1906,7 @@ pub async fn perform_decide_gateway_call_with_open_router(
Ok(routable_connectors)
}
Err(err) => {
let err_resp: or_types::ErrorResponse = err
.response
.parse_struct("ErrorResponse")
.change_context(errors::RoutingError::OpenRouterError(
"Failed to parse the response from open_router".into(),
))?;
logger::error!("open_router_error_response: {:?}", err_resp);
routing_event.set_status_code(err.status_code);
routing_event.set_error(serde_json::json!({"error": err_resp.error_message}));
routing_event.set_error_response_body(&err_resp);
state.event_handler().log_event(&routing_event);
logger::error!("open_router_error_response: {:?}", err);
Err(errors::RoutingError::OpenRouterError(
"Failed to perform decide_gateway call in open_router".into(),
@ -1915,81 +1934,55 @@ pub async fn update_gateway_score_with_open_router(
payment_id: payment_id.clone(),
};
let serialized_request = serde_json::to_value(&open_router_req_body)
.change_context(errors::RoutingError::OpenRouterCallFailed)
.attach_printable("Failed to serialize open_router request body")?;
let url = format!("{}/{}", &state.conf.open_router.url, "update-gateway-score");
let mut request = request::Request::new(services::Method::Post, &url);
request.add_header(headers::CONTENT_TYPE, "application/json".into());
request.add_header(
headers::X_TENANT_ID,
state.tenant.tenant_id.get_string_repr().to_owned().into(),
);
request.set_body(request::RequestContent::Json(Box::new(
open_router_req_body,
)));
let mut routing_event = RoutingEvent::new(
let routing_events_wrapper = utils::RoutingEventsWrapper::new(
state.tenant.tenant_id.clone(),
"".to_string(),
"open_router_update_gateway_score_call",
serialized_request,
url.clone(),
ApiMethod::Rest(services::Method::Post),
state.request_id,
payment_id.get_string_repr().to_string(),
profile_id.to_owned(),
merchant_id.to_owned(),
state.request_id,
RoutingEngine::DecisionEngine,
"DecisionEngine: SuccessRate update_gateway_score".to_string(),
Some(open_router_req_body.clone()),
true,
false,
);
let response =
services::call_connector_api(state, request, "open_router_update_gateway_score_call")
.await
.inspect_err(|err| {
routing_event
.set_error(serde_json::json!({"error": err.current_context().to_string()}));
state.event_handler().log_event(&routing_event);
})
.change_context(errors::RoutingError::OpenRouterCallFailed)?;
routing_event.set_payment_connector(payment_connector.clone()); // check this in review
let response: RoutingResult<utils::RoutingEventsResponse<or_types::UpdateScoreResponse>> =
utils::SRApiClient::send_decision_engine_request(
state,
services::Method::Post,
"update-gateway-score",
Some(open_router_req_body),
None,
Some(routing_events_wrapper),
)
.await;
match response {
Ok(resp) => {
let update_score_resp = resp
.response
.parse_struct::<or_types::UpdateScoreResponse>("UpdateScoreResponse")
.change_context(errors::RoutingError::OpenRouterError(
"Failed to parse the response from open_router".into(),
))?;
let update_score_resp = resp.response.ok_or(errors::RoutingError::OpenRouterError(
"Failed to parse the response from open_router".into(),
))?;
let mut routing_event = resp.event.ok_or(errors::RoutingError::RoutingEventsError {
message: "Decision-Engine: RoutingEvent not found in RoutingEventsResponse"
.to_string(),
status_code: 500,
})?;
logger::debug!(
"open_router update_gateway_score response for gateway with id {}: {:?}",
payment_connector,
update_score_resp
update_score_resp.message
);
routing_event.set_status_code(resp.status_code);
routing_event.set_response_body(&update_score_resp);
routing_event.set_payment_connector(payment_connector.clone()); // check this in review
state.event_handler().log_event(&routing_event);
Ok(())
}
Err(err) => {
let err_resp: or_types::ErrorResponse = err
.response
.parse_struct("ErrorResponse")
.change_context(errors::RoutingError::OpenRouterError(
"Failed to parse the response from open_router".into(),
))?;
logger::error!("open_router_update_gateway_score_error: {:?}", err_resp);
routing_event.set_status_code(err.status_code);
routing_event.set_error(serde_json::json!({"error": err_resp.error_message}));
routing_event.set_error_response_body(&err_resp);
state.event_handler().log_event(&routing_event);
logger::error!("open_router_update_gateway_score_error: {:?}", err);
Err(errors::RoutingError::OpenRouterError(
"Failed to update gateway score in open_router".into(),
@ -2052,88 +2045,96 @@ pub async fn perform_success_based_routing(
.ok_or(errors::RoutingError::SuccessBasedRoutingParamsNotFoundError)?,
);
let event_request = api_routing::CalSuccessRateEventRequest {
let event_request = utils::CalSuccessRateEventRequest {
id: profile_id.get_string_repr().to_string(),
params: success_based_routing_config_params.clone(),
labels: routable_connectors
.iter()
.map(|conn_choice| conn_choice.to_string())
.collect::<Vec<_>>(),
config: success_based_routing_configs.config.as_ref().map(|conf| {
api_routing::CalSuccessRateConfigEventRequest {
min_aggregates_size: conf.min_aggregates_size,
default_success_rate: conf.default_success_rate,
specificity_level: conf.specificity_level,
exploration_percent: conf.exploration_percent,
}
}),
config: success_based_routing_configs
.config
.as_ref()
.map(utils::CalSuccessRateConfigEventRequest::from),
};
let serialized_request = serde_json::to_value(&event_request)
.change_context(errors::RoutingError::SuccessBasedRoutingConfigError)
.attach_printable("unable to serialize success_based_routing_config_params")?;
let mut routing_event = RoutingEvent::new(
let routing_events_wrapper = utils::RoutingEventsWrapper::new(
state.tenant.tenant_id.clone(),
"".to_string(),
"Intelligent-router FetchSuccessRate",
serialized_request,
"SuccessRateCalculator.FetchSuccessRate".to_string(),
ApiMethod::Grpc,
state.request_id,
payment_id.get_string_repr().to_string(),
profile_id.to_owned(),
merchant_id.to_owned(),
state.request_id,
RoutingEngine::IntelligentRouter,
"IntelligentRouter: CalculateSuccessRate".to_string(),
Some(event_request.clone()),
true,
false,
);
let success_based_connectors = client
.calculate_success_rate(
profile_id.get_string_repr().into(),
success_based_routing_configs,
success_based_routing_config_params,
routable_connectors,
state.get_grpc_headers(),
)
.await
.inspect_err(|e| {
routing_event
.set_error(serde_json::json!({"error": e.current_context().to_string()}));
state.event_handler().log_event(&routing_event);
})
.change_context(errors::RoutingError::SuccessRateCalculationError)
.attach_printable(
"unable to calculate/fetch success rate from dynamic routing service",
)?;
let event_response = api_routing::CalSuccessRateEventResponse {
labels_with_score: success_based_connectors
.labels_with_score
.iter()
.map(
|label_with_score| api_routing::LabelWithScoreEventResponse {
label: label_with_score.label.clone(),
score: label_with_score.score,
},
let closure = || async {
let success_based_connectors_result = client
.calculate_success_rate(
profile_id.get_string_repr().into(),
success_based_routing_configs,
success_based_routing_config_params,
routable_connectors,
state.get_grpc_headers(),
)
.collect(),
routing_approach: match success_based_connectors.routing_approach {
0 => api_routing::RoutingApproach::Exploration,
1 => api_routing::RoutingApproach::Exploitation,
_ => {
return Err(errors::RoutingError::GenericNotFoundError {
field: "routing_approach".to_string(),
})
.change_context(errors::RoutingError::GenericNotFoundError {
field: "unknown routing approach from dynamic routing service".to_string(),
})
.attach_printable("unknown routing approach from dynamic routing service")
.await
.change_context(errors::RoutingError::SuccessRateCalculationError)
.attach_printable(
"unable to calculate/fetch success rate from dynamic routing service",
);
match success_based_connectors_result {
Ok(success_response) => {
let updated_resp = utils::CalSuccessRateEventResponse::try_from(
&success_response,
)
.change_context(errors::RoutingError::RoutingEventsError { message: "unable to convert SuccessBasedConnectors to CalSuccessRateEventResponse".to_string(), status_code: 500 })
.attach_printable(
"unable to convert SuccessBasedConnectors to CalSuccessRateEventResponse",
)?;
Ok(Some(updated_resp))
}
},
Err(e) => {
logger::error!(
"unable to calculate/fetch success rate from dynamic routing service: {:?}",
e.current_context()
);
Err(error_stack::report!(
errors::RoutingError::SuccessRateCalculationError
))
}
}
};
routing_event.set_response_body(&event_response);
routing_event.set_routing_approach(event_response.routing_approach.to_string());
let events_response = routing_events_wrapper
.construct_event_builder(
"SuccessRateCalculator.FetchSuccessRate".to_string(),
RoutingEngine::IntelligentRouter,
ApiMethod::Grpc,
)?
.trigger_event(state, closure)
.await?;
let success_based_connectors: utils::CalSuccessRateEventResponse = events_response
.response
.ok_or(errors::RoutingError::SuccessRateCalculationError)?;
// Need to log error case
let mut routing_event =
events_response
.event
.ok_or(errors::RoutingError::RoutingEventsError {
message:
"SR-Intelligent-Router: RoutingEvent not found in RoutingEventsResponse"
.to_string(),
status_code: 500,
})?;
routing_event.set_routing_approach(success_based_connectors.routing_approach.to_string());
let mut connectors = Vec::with_capacity(success_based_connectors.labels_with_score.len());
for label_with_score in success_based_connectors.labels_with_score {
@ -2227,7 +2228,7 @@ pub async fn perform_elimination_routing(
.ok_or(errors::RoutingError::EliminationBasedRoutingParamsNotFoundError)?,
);
let event_request = api_routing::EliminationRoutingEventRequest {
let event_request = utils::EliminationRoutingEventRequest {
id: profile_id.get_string_repr().to_string(),
params: elimination_routing_config_params.clone(),
labels: routable_connectors
@ -2237,80 +2238,76 @@ pub async fn perform_elimination_routing(
config: elimination_routing_config
.elimination_analyser_config
.as_ref()
.map(|conf| api_routing::EliminationRoutingEventBucketConfig {
bucket_leak_interval_in_secs: conf.bucket_leak_interval_in_secs,
bucket_size: conf.bucket_size,
}),
.map(utils::EliminationRoutingEventBucketConfig::from),
};
let serialized_request = serde_json::to_value(&event_request)
.change_context(errors::RoutingError::SuccessBasedRoutingConfigError)
.attach_printable("unable to serialize EliminationRoutingEventRequest")?;
let mut routing_event = RoutingEvent::new(
let routing_events_wrapper = utils::RoutingEventsWrapper::new(
state.tenant.tenant_id.clone(),
"".to_string(),
"Intelligent-router GetEliminationStatus",
serialized_request,
"EliminationAnalyser.GetEliminationStatus".to_string(),
ApiMethod::Grpc,
state.request_id,
payment_id.get_string_repr().to_string(),
profile_id.to_owned(),
merchant_id.to_owned(),
state.request_id,
RoutingEngine::IntelligentRouter,
"IntelligentRouter: PerformEliminationRouting".to_string(),
Some(event_request.clone()),
true,
false,
);
let elimination_based_connectors: EliminationResponse = client
.perform_elimination_routing(
profile_id.get_string_repr().to_string(),
elimination_routing_config_params,
routable_connectors.clone(),
elimination_routing_config.elimination_analyser_config,
state.get_grpc_headers(),
)
.await
.inspect_err(|e| {
routing_event
.set_error(serde_json::json!({"error": e.current_context().to_string()}));
state.event_handler().log_event(&routing_event);
})
.change_context(errors::RoutingError::EliminationRoutingCalculationError)
.attach_printable(
"unable to analyze/fetch elimination routing from dynamic routing service",
)?;
let event_response = api_routing::EliminationEventResponse {
labels_with_status: elimination_based_connectors
.labels_with_status
.iter()
.map(
|label_with_status| api_routing::LabelWithStatusEliminationEventResponse {
label: label_with_status.label.clone(),
elimination_information: label_with_status
.elimination_information
.as_ref()
.map(|info| api_routing::EliminationInformationEventResponse {
entity: info.entity.as_ref().map(|entity_info| {
api_routing::BucketInformationEventResponse {
is_eliminated: entity_info.is_eliminated,
bucket_name: entity_info.bucket_name.clone(),
}
}),
global: info.global.as_ref().map(|global_info| {
api_routing::BucketInformationEventResponse {
is_eliminated: global_info.is_eliminated,
bucket_name: global_info.bucket_name.clone(),
}
}),
}),
},
let closure = || async {
let elimination_based_connectors_result = client
.perform_elimination_routing(
profile_id.get_string_repr().to_string(),
elimination_routing_config_params,
routable_connectors.clone(),
elimination_routing_config.elimination_analyser_config,
state.get_grpc_headers(),
)
.collect(),
.await
.change_context(errors::RoutingError::EliminationRoutingCalculationError)
.attach_printable(
"unable to analyze/fetch elimination routing from dynamic routing service",
);
match elimination_based_connectors_result {
Ok(elimination_response) => Ok(Some(utils::EliminationEventResponse::from(
&elimination_response,
))),
Err(e) => {
logger::error!(
"unable to analyze/fetch elimination routing from dynamic routing service: {:?}",
e.current_context()
);
Err(error_stack::report!(
errors::RoutingError::EliminationRoutingCalculationError
))
}
}
};
routing_event.set_response_body(&event_response);
routing_event.set_routing_approach(api_routing::RoutingApproach::Elimination.to_string());
let events_response = routing_events_wrapper
.construct_event_builder(
"EliminationAnalyser.GetEliminationStatus".to_string(),
RoutingEngine::IntelligentRouter,
ApiMethod::Grpc,
)?
.trigger_event(state, closure)
.await?;
let elimination_based_connectors: utils::EliminationEventResponse = events_response
.response
.ok_or(errors::RoutingError::EliminationRoutingCalculationError)?;
let mut routing_event = events_response
.event
.ok_or(errors::RoutingError::RoutingEventsError {
message:
"Elimination-Intelligent-Router: RoutingEvent not found in RoutingEventsResponse"
.to_string(),
status_code: 500,
})?;
routing_event.set_routing_approach(utils::RoutingApproach::Elimination.to_string());
let mut connectors =
Vec::with_capacity(elimination_based_connectors.labels_with_status.len());
@ -2440,7 +2437,7 @@ pub async fn perform_contract_based_routing(
})
.collect::<Vec<_>>();
let event_request = api_routing::CalContractScoreEventRequest {
let event_request = utils::CalContractScoreEventRequest {
id: profile_id.get_string_repr().to_string(),
params: "".to_string(),
labels: contract_based_connectors
@ -2450,66 +2447,37 @@ pub async fn perform_contract_based_routing(
config: Some(contract_based_routing_configs.clone()),
};
let serialized_request = serde_json::to_value(&event_request)
.change_context(errors::RoutingError::SuccessBasedRoutingConfigError)
.attach_printable("unable to serialize EliminationRoutingEventRequest")?;
let mut routing_event = RoutingEvent::new(
let routing_events_wrapper = utils::RoutingEventsWrapper::new(
state.tenant.tenant_id.clone(),
"".to_string(),
"Intelligent-router CalContractScore",
serialized_request,
"ContractScoreCalculator.FetchContractScore".to_string(),
ApiMethod::Grpc,
state.request_id,
payment_id.get_string_repr().to_string(),
profile_id.to_owned(),
merchant_id.to_owned(),
state.request_id,
RoutingEngine::IntelligentRouter,
"IntelligentRouter: PerformContractRouting".to_string(),
Some(event_request.clone()),
true,
false,
);
let contract_based_connectors_result = client
.calculate_contract_score(
profile_id.get_string_repr().into(),
contract_based_routing_configs.clone(),
"".to_string(),
contract_based_connectors,
state.get_grpc_headers(),
)
.await
.inspect_err(|e| {
routing_event
.set_error(serde_json::json!({"error": e.current_context().to_string()}));
routing_event
.set_routing_approach(api_routing::RoutingApproach::ContractBased.to_string());
state.event_handler().log_event(&routing_event);
})
.attach_printable(
"unable to calculate/fetch contract score from dynamic routing service",
);
let closure = || async {
let contract_based_connectors_result = client
.calculate_contract_score(
profile_id.get_string_repr().into(),
contract_based_routing_configs.clone(),
"".to_string(),
contract_based_connectors,
state.get_grpc_headers(),
)
.await
.attach_printable(
"unable to calculate/fetch contract score from dynamic routing service",
);
let contract_based_connectors = match contract_based_connectors_result {
Ok(resp) => {
let event_response = api_routing::CalContractScoreEventResponse {
labels_with_score: resp
.labels_with_score
.iter()
.map(|label_with_score| api_routing::ScoreDataEventResponse {
score: label_with_score.score,
label: label_with_score.label.clone(),
current_count: label_with_score.current_count,
})
.collect(),
};
routing_event.set_response_body(&event_response);
routing_event
.set_routing_approach(api_routing::RoutingApproach::ContractBased.to_string());
resp
}
Err(err) => match err.current_context() {
DynamicRoutingError::ContractNotFound => {
client
let contract_based_connectors = match contract_based_connectors_result {
Ok(resp) => Some(utils::CalContractScoreEventResponse::from(&resp)),
Err(err) => match err.current_context() {
DynamicRoutingError::ContractNotFound => {
client
.update_contracts(
profile_id.get_string_repr().into(),
label_info,
@ -2523,20 +2491,47 @@ pub async fn perform_contract_based_routing(
.attach_printable(
"unable to update contract based routing window in dynamic routing service",
)?;
return Err((errors::RoutingError::ContractScoreCalculationError {
err: err.to_string(),
})
.into());
}
_ => {
return Err((errors::RoutingError::ContractScoreCalculationError {
err: err.to_string(),
})
.into())
}
},
return Err((errors::RoutingError::ContractScoreCalculationError {
err: err.to_string(),
})
.into());
}
_ => {
return Err((errors::RoutingError::ContractScoreCalculationError {
err: err.to_string(),
})
.into())
}
},
};
Ok(contract_based_connectors)
};
let events_response = routing_events_wrapper
.construct_event_builder(
"ContractScoreCalculator.FetchContractScore".to_string(),
RoutingEngine::IntelligentRouter,
ApiMethod::Grpc,
)?
.trigger_event(state, closure)
.await?;
let contract_based_connectors: utils::CalContractScoreEventResponse = events_response
.response
.ok_or(errors::RoutingError::ContractScoreCalculationError {
err: "CalContractScoreEventResponse not found".to_string(),
})?;
let mut routing_event = events_response
.event
.ok_or(errors::RoutingError::RoutingEventsError {
message:
"ContractRouting-Intelligent-Router: RoutingEvent not found in RoutingEventsResponse"
.to_string(),
status_code: 500,
})?;
let mut connectors = Vec::with_capacity(contract_based_connectors.labels_with_score.len());
for label_with_score in contract_based_connectors.labels_with_score {

File diff suppressed because it is too large Load Diff

View File

@ -1093,78 +1093,110 @@ pub async fn push_metrics_with_update_window_for_success_based_routing(
.attach_printable("Unable to push dynamic routing stats to db")?;
};
let label_with_status = routing_types::UpdateLabelWithStatusEventRequest {
let label_with_status = routing_utils::UpdateLabelWithStatusEventRequest {
label: routable_connector.clone().to_string(),
status: payment_status_attribute == common_enums::AttemptStatus::Charged,
};
let event_request = routing_types::UpdateSuccessRateWindowEventRequest {
let event_request = routing_utils::UpdateSuccessRateWindowEventRequest {
id: payment_attempt.profile_id.get_string_repr().to_string(),
params: success_based_routing_config_params.clone(),
labels_with_status: vec![label_with_status.clone()],
global_labels_with_status: vec![label_with_status],
config: success_based_routing_configs.config.as_ref().map(|conf| {
routing_types::UpdateSuccessRateWindowConfig {
max_aggregates_size: conf.max_aggregates_size,
current_block_threshold: conf.current_block_threshold.clone(),
}
}),
config: success_based_routing_configs
.config
.as_ref()
.map(routing_utils::UpdateSuccessRateWindowConfig::from),
};
let serialized_request = serde_json::to_value(&event_request)
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("unable to serialize success_based_routing_config_params")?;
let mut routing_event = routing_events::RoutingEvent::new(
let routing_events_wrapper = routing_utils::RoutingEventsWrapper::new(
state.tenant.tenant_id.clone(),
"".to_string(),
"Intelligent-router UpdateSuccessRateWindow",
serialized_request,
"SuccessRateCalculator.UpdateSuccessRateWindow".to_string(),
routing_events::ApiMethod::Grpc,
state.request_id,
payment_attempt.payment_id.get_string_repr().to_string(),
profile_id.to_owned(),
payment_attempt.merchant_id.to_owned(),
state.request_id,
routing_events::RoutingEngine::IntelligentRouter,
"IntelligentRouter: UpdateSuccessRateWindow".to_string(),
Some(event_request.clone()),
true,
false,
);
let update_response = client
.update_success_rate(
profile_id.get_string_repr().into(),
success_based_routing_configs,
success_based_routing_config_params,
vec![routing_types::RoutableConnectorChoiceWithStatus::new(
routable_connector.clone(),
payment_status_attribute == common_enums::AttemptStatus::Charged,
)],
state.get_grpc_headers(),
let closure = || async {
let update_response_result = client
.update_success_rate(
profile_id.get_string_repr().into(),
success_based_routing_configs,
success_based_routing_config_params,
vec![routing_types::RoutableConnectorChoiceWithStatus::new(
routable_connector.clone(),
payment_status_attribute == common_enums::AttemptStatus::Charged,
)],
state.get_grpc_headers(),
)
.await
.change_context(errors::RoutingError::SuccessRateCalculationError)
.attach_printable(
"unable to update success based routing window in dynamic routing service",
);
match update_response_result {
Ok(update_response) => {
let updated_resp =
routing_utils::UpdateSuccessRateWindowEventResponse::try_from(
&update_response,
)
.change_context(errors::RoutingError::RoutingEventsError { message: "Unable to convert to UpdateSuccessRateWindowEventResponse from UpdateSuccessRateWindowResponse".to_string(), status_code: 500 })?;
Ok(Some(updated_resp))
}
Err(err) => {
logger::error!(
"unable to update connector score in dynamic routing service: {:?}",
err.current_context()
);
Err(err)
}
}
};
let events_response = routing_events_wrapper
.construct_event_builder(
"SuccessRateCalculator.UpdateSuccessRateWindow".to_string(),
routing_events::RoutingEngine::IntelligentRouter,
routing_events::ApiMethod::Grpc,
)
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable(
"SR-Intelligent-Router: Failed to update success rate in Intelligent-Router",
)?
.trigger_event(state, closure)
.await
.inspect_err(|e| {
routing_event
.set_error(serde_json::json!({"error": e.current_context().to_string()}));
state.event_handler().log_event(&routing_event);
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable(
"SR-Intelligent-Router: Failed to update success rate in Intelligent-Router",
)?;
let _response: routing_utils::UpdateSuccessRateWindowEventResponse = events_response
.response
.ok_or(errors::ApiErrorResponse::InternalServerError)
.attach_printable(
"UpdateSuccessRateWindowEventResponse not found in RoutingEventResponse",
)?;
let mut routing_event = events_response
.event
.ok_or(errors::RoutingError::RoutingEventsError {
message:
"SR-Intelligent-Router: RoutingEvent not found in RoutingEventsResponse"
.to_string(),
status_code: 500,
})
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable(
"unable to update success based routing window in dynamic routing service",
"SR-Intelligent-Router: RoutingEvent not found in RoutingEventsResponse",
)?;
let event_response = routing_types::UpdateSuccessRateWindowEventResponse {
status: match update_response.status {
0 => routing_types::UpdationStatusEventResponse::WindowUpdationSucceeded,
1 => routing_types::UpdationStatusEventResponse::WindowUpdationFailed,
_ => {
return Err(errors::ApiErrorResponse::InternalServerError)
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("unknown status code from dynamic routing service")
}
},
};
routing_event.set_response_body(&event_response);
routing_event.set_status_code(200);
routing_event.set_payment_connector(routable_connector);
routing_event.set_payment_connector(routable_connector); // we can do this inside the event wrap by implementing an interface on the req type
state.event_handler().log_event(&routing_event);
Ok(())
@ -1248,45 +1280,35 @@ pub async fn update_window_for_elimination_routing(
gsm_error_category.to_string(),
)];
let event_request = routing_types::UpdateEliminationBucketEventRequest {
let event_request = routing_utils::UpdateEliminationBucketEventRequest {
id: profile_id.get_string_repr().to_string(),
params: elimination_routing_config_params.clone(),
labels_with_bucket_name: labels_with_bucket_name
.iter()
.map(
|conn_choice| routing_types::LabelWithBucketNameEventRequest {
label: conn_choice.routable_connector_choice.to_string(),
bucket_name: conn_choice.bucket_name.clone(),
},
)
.map(|conn_choice| {
routing_utils::LabelWithBucketNameEventRequest::from(conn_choice)
})
.collect(),
config: elimination_routing_config
.elimination_analyser_config
.map(|conf| routing_types::EliminationRoutingEventBucketConfig {
bucket_leak_interval_in_secs: conf.bucket_leak_interval_in_secs,
bucket_size: conf.bucket_size,
}),
.as_ref()
.map(routing_utils::EliminationRoutingEventBucketConfig::from),
};
let serialized_request = serde_json::to_value(&event_request)
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("unable to serialize success_based_routing_config_params")?;
let mut routing_event = routing_events::RoutingEvent::new(
let routing_events_wrapper = routing_utils::RoutingEventsWrapper::new(
state.tenant.tenant_id.clone(),
"".to_string(),
"Intelligent-router UpdateEliminationBucket",
serialized_request,
"EliminationAnalyser.UpdateEliminationBucket".to_string(),
routing_events::ApiMethod::Grpc,
state.request_id,
payment_attempt.payment_id.get_string_repr().to_string(),
profile_id.to_owned(),
payment_attempt.merchant_id.to_owned(),
state.request_id,
routing_events::RoutingEngine::IntelligentRouter,
"IntelligentRouter: UpdateEliminationBucket".to_string(),
Some(event_request.clone()),
true,
false,
);
let update_response = client
let closure = || async {
let update_response_result = client
.update_elimination_bucket_config(
profile_id.get_string_repr().to_string(),
elimination_routing_config_params,
@ -1295,29 +1317,58 @@ pub async fn update_window_for_elimination_routing(
state.get_grpc_headers(),
)
.await
.inspect_err(|e| {
routing_event
.set_error(serde_json::json!({"error": e.current_context().to_string()}));
state.event_handler().log_event(&routing_event);
})
.change_context(errors::ApiErrorResponse::InternalServerError)
.change_context(errors::RoutingError::EliminationRoutingCalculationError)
.attach_printable(
"unable to update elimination based routing buckets in dynamic routing service",
);
match update_response_result {
Ok(resp) => {
let updated_resp =
routing_utils::UpdateEliminationBucketEventResponse::try_from(&resp)
.change_context(errors::RoutingError::RoutingEventsError { message: "Unable to convert to UpdateEliminationBucketEventResponse from UpdateEliminationBucketResponse".to_string(), status_code: 500 })?;
Ok(Some(updated_resp))
}
Err(err) => {
logger::error!(
"unable to update elimination score in dynamic routing service: {:?}",
err.current_context()
);
Err(err)
}
}
};
let events_response = routing_events_wrapper.construct_event_builder( "EliminationAnalyser.UpdateEliminationBucket".to_string(),
routing_events::RoutingEngine::IntelligentRouter,
routing_events::ApiMethod::Grpc)
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Elimination-Intelligent-Router: Failed to update elimination bucket in Intelligent-Router")?
.trigger_event(state, closure)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Elimination-Intelligent-Router: Failed to update elimination bucket in Intelligent-Router")?;
let _response: routing_utils::UpdateEliminationBucketEventResponse = events_response
.response
.ok_or(errors::ApiErrorResponse::InternalServerError)
.attach_printable(
"UpdateEliminationBucketEventResponse not found in RoutingEventResponse",
)?;
let event_response = routing_types::UpdateEliminationBucketEventResponse {
status: match update_response.status {
0 => routing_types::EliminationUpdationStatusEventResponse::BucketUpdationSucceeded,
1 => routing_types::EliminationUpdationStatusEventResponse::BucketUpdationFailed,
_ => {
return Err(errors::ApiErrorResponse::InternalServerError)
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("unknown status code from dynamic routing service")
}
},
};
let mut routing_event = events_response
.event
.ok_or(errors::RoutingError::RoutingEventsError {
message:
"Elimination-Intelligent-Router: RoutingEvent not found in RoutingEventsResponse"
.to_string(),
status_code: 500,
})
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Elimination-Intelligent-Router: RoutingEvent not found in RoutingEventsResponse")?;
routing_event.set_response_body(&event_response);
routing_event.set_status_code(200);
routing_event.set_payment_connector(routing_types::RoutableConnectorChoice {
choice_kind: api_models::routing::RoutableChoiceKind::FullStruct,
@ -1425,36 +1476,30 @@ pub async fn push_metrics_with_update_window_for_contract_based_routing(
get_desired_payment_status_for_dynamic_routing_metrics(payment_attempt.status);
if payment_status_attribute == common_enums::AttemptStatus::Charged {
let event_request = routing_types::UpdateContractRequestEventRequest {
let event_request = routing_utils::UpdateContractRequestEventRequest {
id: profile_id.get_string_repr().to_string(),
params: "".to_string(),
labels_information: vec![routing_types::ContractLabelInformationEventRequest {
label: request_label_info.label.clone(),
target_count: request_label_info.target_count,
target_time: request_label_info.target_time,
current_count: 1,
}],
labels_information: vec![
routing_utils::ContractLabelInformationEventRequest::from(
&request_label_info,
),
],
};
let serialized_request = serde_json::to_value(&event_request)
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("unable to serialize success_based_routing_config_params")?;
let mut routing_event = routing_events::RoutingEvent::new(
let routing_events_wrapper = routing_utils::RoutingEventsWrapper::new(
state.tenant.tenant_id.clone(),
"".to_string(),
"Intelligent-router UpdateContract",
serialized_request,
"ContractScoreCalculator.UpdateContract".to_string(),
routing_events::ApiMethod::Grpc,
state.request_id,
payment_attempt.payment_id.get_string_repr().to_string(),
profile_id.to_owned(),
payment_attempt.merchant_id.to_owned(),
state.request_id,
routing_events::RoutingEngine::IntelligentRouter,
"IntelligentRouter: UpdateContractScore".to_string(),
Some(event_request.clone()),
true,
false,
);
let update_response = client
let closure = || async {
let update_response_result = client
.update_contracts(
profile_id.get_string_repr().into(),
vec![request_label_info],
@ -1464,30 +1509,60 @@ pub async fn push_metrics_with_update_window_for_contract_based_routing(
state.get_grpc_headers(),
)
.await
.inspect_err(|e| {
routing_event.set_error(
serde_json::json!({"error": e.current_context().to_string()}),
);
state.event_handler().log_event(&routing_event);
})
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable(
"unable to update contract based routing window in dynamic routing service",
);
match update_response_result {
Ok(resp) => {
let updated_resp =
routing_utils::UpdateContractEventResponse::try_from(&resp)
.change_context(errors::RoutingError::RoutingEventsError { message: "Unable to convert to UpdateContractEventResponse from UpdateContractResponse".to_string(), status_code: 500 })?;
Ok(Some(updated_resp))
}
Err(err) => {
logger::error!(
"unable to update elimination score in dynamic routing service: {:?}",
err.current_context()
);
// have to refactor errors
Err(error_stack::report!(
errors::RoutingError::ContractScoreUpdationError
))
}
}
};
let events_response = routing_events_wrapper.construct_event_builder( "ContractScoreCalculator.UpdateContract".to_string(),
routing_events::RoutingEngine::IntelligentRouter,
routing_events::ApiMethod::Grpc)
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("ContractRouting-Intelligent-Router: Failed to contruct RoutingEventsBuilder")?
.trigger_event(state, closure)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("ContractRouting-Intelligent-Router: Failed to update contract scores in Intelligent-Router")?;
let _response: routing_utils::UpdateContractEventResponse = events_response
.response
.ok_or(errors::ApiErrorResponse::InternalServerError)
.attach_printable(
"UpdateContractEventResponse not found in RoutingEventResponse",
)?;
let event_response = routing_types::UpdateContractEventResponse {
status: match update_response.status {
0 => routing_types::ContractUpdationStatusEventResponse::ContractUpdationSucceeded,
1 => routing_types::ContractUpdationStatusEventResponse::ContractUpdationFailed,
_ => {
return Err(errors::ApiErrorResponse::InternalServerError)
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("unknown status code from dynamic routing service")
}
},
};
let mut routing_event = events_response
.event
.ok_or(errors::RoutingError::RoutingEventsError {
message:
"ContractRouting-Intelligent-Router: RoutingEvent not found in RoutingEventsResponse"
.to_string(),
status_code: 500,
})
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("ContractRouting-Intelligent-Router: RoutingEvent not found in RoutingEventsResponse")?;
routing_event.set_response_body(&event_response);
routing_event.set_payment_connector(routing_types::RoutableConnectorChoice {
choice_kind: api_models::routing::RoutableChoiceKind::FullStruct,
connector: common_enums::RoutableConnectors::from_str(
@ -2271,6 +2346,7 @@ pub async fn enable_decision_engine_dynamic_routing_setup(
DECISION_ENGINE_RULE_CREATE_ENDPOINT,
Some(default_engine_config_request),
None,
None,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
@ -2348,6 +2424,7 @@ pub async fn update_decision_engine_dynamic_routing_setup(
DECISION_ENGINE_RULE_UPDATE_ENDPOINT,
Some(decision_engine_request),
None,
None,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
@ -2397,6 +2474,7 @@ pub async fn disable_decision_engine_dynamic_routing_setup(
DECISION_ENGINE_RULE_DELETE_ENDPOINT,
Some(decision_engine_request),
None,
None,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
@ -2447,6 +2525,7 @@ pub async fn create_decision_engine_merchant(
DECISION_ENGINE_MERCHANT_CREATE_ENDPOINT,
Some(merchant_account_req),
None,
None,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
@ -2472,6 +2551,7 @@ pub async fn delete_decision_engine_merchant(
&path,
None,
None,
None,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)