diff --git a/crates/api_models/src/open_router.rs b/crates/api_models/src/open_router.rs index 33719e8a41..af91ece3f1 100644 --- a/crates/api_models/src/open_router.rs +++ b/crates/api_models/src/open_router.rs @@ -58,14 +58,14 @@ pub struct PaymentInfo { // cardSwitchProvider: Option>, } -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] pub struct DecidedGateway { pub gateway_priority_map: Option>, pub debit_routing_output: Option, pub routing_approach: String, } -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] pub struct DebitRoutingOutput { pub co_badged_card_networks: Vec, pub issuer_country: common_enums::CountryAlpha2, @@ -121,7 +121,7 @@ pub struct DebitRoutingRequestData { pub card_type: common_enums::CardType, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct ErrorResponse { pub status: String, pub error_code: String, @@ -132,7 +132,7 @@ pub struct ErrorResponse { pub is_dynamic_mga_enabled: bool, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct UnifiedError { pub code: String, pub user_message: String, diff --git a/crates/api_models/src/routing.rs b/crates/api_models/src/routing.rs index 75bb67ba49..3e312769e0 100644 --- a/crates/api_models/src/routing.rs +++ b/crates/api_models/src/routing.rs @@ -1384,160 +1384,6 @@ impl std::fmt::Display for RoutingApproach { } } } - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub struct BucketInformationEventResponse { - pub is_eliminated: bool, - pub bucket_name: Vec, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub struct EliminationInformationEventResponse { - pub entity: Option, - pub global: Option, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub struct LabelWithStatusEliminationEventResponse { - pub label: String, - pub elimination_information: Option, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub struct EliminationEventResponse { - pub labels_with_status: Vec, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub struct ScoreDataEventResponse { - pub score: f64, - pub label: String, - pub current_count: u64, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub struct CalContractScoreEventResponse { - pub labels_with_score: Vec, -} - -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub struct CalGlobalSuccessRateConfigEventRequest { - pub entity_min_aggregates_size: u32, - pub entity_default_success_rate: f64, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub struct CalGlobalSuccessRateEventRequest { - pub entity_id: String, - pub entity_params: String, - pub entity_labels: Vec, - pub global_labels: Vec, - pub config: Option, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub struct UpdateSuccessRateWindowConfig { - pub max_aggregates_size: Option, - pub current_block_threshold: Option, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub struct UpdateLabelWithStatusEventRequest { - pub label: String, - pub status: bool, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub struct UpdateSuccessRateWindowEventRequest { - pub id: String, - pub params: String, - pub labels_with_status: Vec, - pub config: Option, - pub global_labels_with_status: Vec, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub struct UpdateSuccessRateWindowEventResponse { - pub status: UpdationStatusEventResponse, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum UpdationStatusEventResponse { - WindowUpdationSucceeded, - WindowUpdationFailed, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub struct LabelWithBucketNameEventRequest { - pub label: String, - pub bucket_name: String, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub struct UpdateEliminationBucketEventRequest { - pub id: String, - pub params: String, - pub labels_with_bucket_name: Vec, - pub config: Option, -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub struct UpdateEliminationBucketEventResponse { - pub status: EliminationUpdationStatusEventResponse, -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum EliminationUpdationStatusEventResponse { - BucketUpdationSucceeded, - BucketUpdationFailed, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub struct ContractLabelInformationEventRequest { - pub label: String, - pub target_count: u64, - pub target_time: u64, - pub current_count: u64, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub struct UpdateContractRequestEventRequest { - pub id: String, - pub params: String, - pub labels_information: Vec, -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub struct UpdateContractEventResponse { - pub status: ContractUpdationStatusEventResponse, -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum ContractUpdationStatusEventResponse { - ContractUpdationSucceeded, - ContractUpdationFailed, -} #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct RuleMigrationQuery { pub profile_id: common_utils::id_type::ProfileId, diff --git a/crates/router/src/core/errors.rs b/crates/router/src/core/errors.rs index 3037c1d4e6..93d4414a5b 100644 --- a/crates/router/src/core/errors.rs +++ b/crates/router/src/core/errors.rs @@ -388,6 +388,8 @@ pub enum RoutingError { DecisionEngineValidationError(String), #[error("Invalid transaction type")] InvalidTransactionType, + #[error("Routing events error: {message}, status code: {status_code}")] + RoutingEventsError { message: String, status_code: u16 }, } #[derive(Debug, Clone, thiserror::Error)] diff --git a/crates/router/src/core/payments/routing.rs b/crates/router/src/core/payments/routing.rs index f08e76469d..53a657c2e6 100644 --- a/crates/router/src/core/payments/routing.rs +++ b/crates/router/src/core/payments/routing.rs @@ -17,8 +17,6 @@ use api_models::{ }; #[cfg(all(feature = "v1", feature = "dynamic_routing"))] use common_utils::ext_traits::AsyncExt; -#[cfg(all(feature = "v1", feature = "dynamic_routing"))] -use common_utils::{ext_traits::BytesExt, request}; use diesel_models::enums as storage_enums; use error_stack::ResultExt; use euclid::{ @@ -30,13 +28,12 @@ use euclid::{ #[cfg(all(feature = "v1", feature = "dynamic_routing"))] use external_services::grpc_client::dynamic_routing::{ contract_routing_client::ContractBasedDynamicRouting, - elimination_based_client::{EliminationBasedRouting, EliminationResponse}, - success_rate_client::SuccessBasedDynamicRouting, - DynamicRoutingError, + elimination_based_client::EliminationBasedRouting, + success_rate_client::SuccessBasedDynamicRouting, DynamicRoutingError, }; use hyperswitch_domain_models::address::Address; #[cfg(all(feature = "v1", feature = "dynamic_routing"))] -use hyperswitch_interfaces::events::routing_api_logs::{ApiMethod, RoutingEngine, RoutingEvent}; +use hyperswitch_interfaces::events::routing_api_logs::{ApiMethod, RoutingEngine}; use kgraph_utils::{ mca as mca_graph, transformers::{IntoContext, IntoDirValue}, @@ -59,8 +56,6 @@ use crate::core::payouts; #[cfg(feature = "v1")] use crate::core::routing::transformers::OpenRouterDecideGatewayRequestExt; #[cfg(all(feature = "v1", feature = "dynamic_routing"))] -use crate::headers; -#[cfg(all(feature = "v1", feature = "dynamic_routing"))] use crate::routes::app::SessionStateInfo; use crate::{ core::{ @@ -482,10 +477,36 @@ pub async fn perform_static_routing_v1( } }; + let payment_id = match transaction_data { + routing::TransactionData::Payment(payment_data) => payment_data + .payment_attempt + .payment_id + .clone() + .get_string_repr() + .to_string(), + #[cfg(feature = "payouts")] + routing::TransactionData::Payout(payout_data) => { + payout_data.payout_attempt.payout_id.clone() + } + }; + + let routing_events_wrapper = utils::RoutingEventsWrapper::new( + state.tenant.tenant_id.clone(), + state.request_id, + payment_id, + business_profile.get_id().to_owned(), + business_profile.merchant_id.to_owned(), + "DecisionEngine: Euclid Static Routing".to_string(), + None, + true, + false, + ); + let de_euclid_connectors = perform_decision_euclid_routing( state, backend_input.clone(), business_profile.get_id().get_string_repr().to_string(), + routing_events_wrapper ) .await .map_err(|e| @@ -497,8 +518,12 @@ pub async fn perform_static_routing_v1( .iter() .map(|c| c.connector.to_string()) .collect::>(); + let de_connectors = de_euclid_connectors + .iter() + .map(|c| c.gateway_name.to_string()) + .collect::>(); utils::compare_and_log_result( - de_euclid_connectors, + de_connectors, connectors, "evaluate_routing".to_string(), ); @@ -1646,19 +1671,36 @@ pub async fn perform_open_routing_for_debit_routing( Some(or_types::RankingAlgorithm::NtwBasedRouting), ); - let response: RoutingResult = + let routing_events_wrapper = utils::RoutingEventsWrapper::new( + state.tenant.tenant_id.clone(), + state.request_id, + payment_attempt.payment_id.get_string_repr().to_string(), + payment_attempt.profile_id.to_owned(), + payment_attempt.merchant_id.to_owned(), + "DecisionEngine: Debit Routing".to_string(), + Some(open_router_req_body.clone()), + true, + true, + ); + + let response: RoutingResult> = utils::EuclidApiClient::send_decision_engine_request( state, services::Method::Post, "decide-gateway", Some(open_router_req_body), None, + Some(routing_events_wrapper), ) .await; let output = match response { - Ok(decided_gateway) => { - let debit_routing_output = decided_gateway + Ok(events_response) => { + let debit_routing_output = events_response + .response + .ok_or(errors::RoutingError::OpenRouterError( + "Response from decision engine API is empty".to_string(), + ))? .debit_routing_output .get_required_value("debit_routing_output") .change_context(errors::RoutingError::OpenRouterError( @@ -1797,57 +1839,45 @@ pub async fn perform_decide_gateway_call_with_open_router( is_elimination_enabled, ); - let serialized_request = serde_json::to_value(&open_router_req_body) - .change_context(errors::RoutingError::OpenRouterCallFailed) - .attach_printable("Failed to serialize open_router request body")?; - - let url = format!("{}/{}", &state.conf.open_router.url, "decide-gateway"); - let mut request = request::Request::new(services::Method::Post, &url); - request.add_header(headers::CONTENT_TYPE, "application/json".into()); - request.add_header( - headers::X_TENANT_ID, - state.tenant.tenant_id.get_string_repr().to_owned().into(), - ); - request.set_body(request::RequestContent::Json(Box::new( - open_router_req_body, - ))); - - let mut routing_event = RoutingEvent::new( + let routing_events_wrapper = utils::RoutingEventsWrapper::new( state.tenant.tenant_id.clone(), - "".to_string(), - "open_router_decide_gateway_call", - serialized_request, - url.clone(), - ApiMethod::Rest(services::Method::Post), - payment_attempt.payment_id.get_string_repr().to_string(), - profile_id.to_owned(), - payment_attempt.merchant_id.to_owned(), state.request_id, - RoutingEngine::DecisionEngine, + payment_attempt.payment_id.get_string_repr().to_string(), + payment_attempt.profile_id.to_owned(), + payment_attempt.merchant_id.to_owned(), + "DecisionEngine: SuccessRate decide_gateway".to_string(), + Some(open_router_req_body.clone()), + true, + false, ); - let response = services::call_connector_api(state, request, "open_router_decide_gateway_call") - .await - .inspect_err(|err| { - routing_event - .set_error(serde_json::json!({"error": err.current_context().to_string()})); - state.event_handler().log_event(&routing_event); - }) - .change_context(errors::RoutingError::OpenRouterCallFailed)?; + let response: RoutingResult> = + utils::SRApiClient::send_decision_engine_request( + state, + services::Method::Post, + "decide-gateway", + Some(open_router_req_body), + None, + Some(routing_events_wrapper), + ) + .await; let sr_sorted_connectors = match response { Ok(resp) => { - let decided_gateway: DecidedGateway = resp - .response - .parse_struct("DecidedGateway") - .change_context(errors::RoutingError::OpenRouterError( - "Failed to parse the response from open_router".into(), + let decided_gateway: DecidedGateway = + resp.response.ok_or(errors::RoutingError::OpenRouterError( + "Empty response received from open_router".into(), ))?; - routing_event.set_status_code(resp.status_code); + let mut routing_event = resp.event.ok_or(errors::RoutingError::RoutingEventsError { + message: "Decision-Engine: RoutingEvent not found in RoutingEventsResponse" + .to_string(), + status_code: 500, + })?; + routing_event.set_response_body(&decided_gateway); routing_event.set_routing_approach( - api_routing::RoutingApproach::from_decision_engine_approach( + utils::RoutingApproach::from_decision_engine_approach( &decided_gateway.routing_approach, ) .to_string(), @@ -1876,18 +1906,7 @@ pub async fn perform_decide_gateway_call_with_open_router( Ok(routable_connectors) } Err(err) => { - let err_resp: or_types::ErrorResponse = err - .response - .parse_struct("ErrorResponse") - .change_context(errors::RoutingError::OpenRouterError( - "Failed to parse the response from open_router".into(), - ))?; - logger::error!("open_router_error_response: {:?}", err_resp); - - routing_event.set_status_code(err.status_code); - routing_event.set_error(serde_json::json!({"error": err_resp.error_message})); - routing_event.set_error_response_body(&err_resp); - state.event_handler().log_event(&routing_event); + logger::error!("open_router_error_response: {:?}", err); Err(errors::RoutingError::OpenRouterError( "Failed to perform decide_gateway call in open_router".into(), @@ -1915,81 +1934,55 @@ pub async fn update_gateway_score_with_open_router( payment_id: payment_id.clone(), }; - let serialized_request = serde_json::to_value(&open_router_req_body) - .change_context(errors::RoutingError::OpenRouterCallFailed) - .attach_printable("Failed to serialize open_router request body")?; - - let url = format!("{}/{}", &state.conf.open_router.url, "update-gateway-score"); - let mut request = request::Request::new(services::Method::Post, &url); - request.add_header(headers::CONTENT_TYPE, "application/json".into()); - request.add_header( - headers::X_TENANT_ID, - state.tenant.tenant_id.get_string_repr().to_owned().into(), - ); - request.set_body(request::RequestContent::Json(Box::new( - open_router_req_body, - ))); - - let mut routing_event = RoutingEvent::new( + let routing_events_wrapper = utils::RoutingEventsWrapper::new( state.tenant.tenant_id.clone(), - "".to_string(), - "open_router_update_gateway_score_call", - serialized_request, - url.clone(), - ApiMethod::Rest(services::Method::Post), + state.request_id, payment_id.get_string_repr().to_string(), profile_id.to_owned(), merchant_id.to_owned(), - state.request_id, - RoutingEngine::DecisionEngine, + "DecisionEngine: SuccessRate update_gateway_score".to_string(), + Some(open_router_req_body.clone()), + true, + false, ); - let response = - services::call_connector_api(state, request, "open_router_update_gateway_score_call") - .await - .inspect_err(|err| { - routing_event - .set_error(serde_json::json!({"error": err.current_context().to_string()})); - state.event_handler().log_event(&routing_event); - }) - .change_context(errors::RoutingError::OpenRouterCallFailed)?; - - routing_event.set_payment_connector(payment_connector.clone()); // check this in review + let response: RoutingResult> = + utils::SRApiClient::send_decision_engine_request( + state, + services::Method::Post, + "update-gateway-score", + Some(open_router_req_body), + None, + Some(routing_events_wrapper), + ) + .await; match response { Ok(resp) => { - let update_score_resp = resp - .response - .parse_struct::("UpdateScoreResponse") - .change_context(errors::RoutingError::OpenRouterError( - "Failed to parse the response from open_router".into(), - ))?; + let update_score_resp = resp.response.ok_or(errors::RoutingError::OpenRouterError( + "Failed to parse the response from open_router".into(), + ))?; + + let mut routing_event = resp.event.ok_or(errors::RoutingError::RoutingEventsError { + message: "Decision-Engine: RoutingEvent not found in RoutingEventsResponse" + .to_string(), + status_code: 500, + })?; logger::debug!( "open_router update_gateway_score response for gateway with id {}: {:?}", payment_connector, - update_score_resp + update_score_resp.message ); - routing_event.set_status_code(resp.status_code); routing_event.set_response_body(&update_score_resp); + routing_event.set_payment_connector(payment_connector.clone()); // check this in review state.event_handler().log_event(&routing_event); Ok(()) } Err(err) => { - let err_resp: or_types::ErrorResponse = err - .response - .parse_struct("ErrorResponse") - .change_context(errors::RoutingError::OpenRouterError( - "Failed to parse the response from open_router".into(), - ))?; - logger::error!("open_router_update_gateway_score_error: {:?}", err_resp); - - routing_event.set_status_code(err.status_code); - routing_event.set_error(serde_json::json!({"error": err_resp.error_message})); - routing_event.set_error_response_body(&err_resp); - state.event_handler().log_event(&routing_event); + logger::error!("open_router_update_gateway_score_error: {:?}", err); Err(errors::RoutingError::OpenRouterError( "Failed to update gateway score in open_router".into(), @@ -2052,88 +2045,96 @@ pub async fn perform_success_based_routing( .ok_or(errors::RoutingError::SuccessBasedRoutingParamsNotFoundError)?, ); - let event_request = api_routing::CalSuccessRateEventRequest { + let event_request = utils::CalSuccessRateEventRequest { id: profile_id.get_string_repr().to_string(), params: success_based_routing_config_params.clone(), labels: routable_connectors .iter() .map(|conn_choice| conn_choice.to_string()) .collect::>(), - config: success_based_routing_configs.config.as_ref().map(|conf| { - api_routing::CalSuccessRateConfigEventRequest { - min_aggregates_size: conf.min_aggregates_size, - default_success_rate: conf.default_success_rate, - specificity_level: conf.specificity_level, - exploration_percent: conf.exploration_percent, - } - }), + config: success_based_routing_configs + .config + .as_ref() + .map(utils::CalSuccessRateConfigEventRequest::from), }; - let serialized_request = serde_json::to_value(&event_request) - .change_context(errors::RoutingError::SuccessBasedRoutingConfigError) - .attach_printable("unable to serialize success_based_routing_config_params")?; - - let mut routing_event = RoutingEvent::new( + let routing_events_wrapper = utils::RoutingEventsWrapper::new( state.tenant.tenant_id.clone(), - "".to_string(), - "Intelligent-router FetchSuccessRate", - serialized_request, - "SuccessRateCalculator.FetchSuccessRate".to_string(), - ApiMethod::Grpc, + state.request_id, payment_id.get_string_repr().to_string(), profile_id.to_owned(), merchant_id.to_owned(), - state.request_id, - RoutingEngine::IntelligentRouter, + "IntelligentRouter: CalculateSuccessRate".to_string(), + Some(event_request.clone()), + true, + false, ); - let success_based_connectors = client - .calculate_success_rate( - profile_id.get_string_repr().into(), - success_based_routing_configs, - success_based_routing_config_params, - routable_connectors, - state.get_grpc_headers(), - ) - .await - .inspect_err(|e| { - routing_event - .set_error(serde_json::json!({"error": e.current_context().to_string()})); - state.event_handler().log_event(&routing_event); - }) - .change_context(errors::RoutingError::SuccessRateCalculationError) - .attach_printable( - "unable to calculate/fetch success rate from dynamic routing service", - )?; - - let event_response = api_routing::CalSuccessRateEventResponse { - labels_with_score: success_based_connectors - .labels_with_score - .iter() - .map( - |label_with_score| api_routing::LabelWithScoreEventResponse { - label: label_with_score.label.clone(), - score: label_with_score.score, - }, + let closure = || async { + let success_based_connectors_result = client + .calculate_success_rate( + profile_id.get_string_repr().into(), + success_based_routing_configs, + success_based_routing_config_params, + routable_connectors, + state.get_grpc_headers(), ) - .collect(), - routing_approach: match success_based_connectors.routing_approach { - 0 => api_routing::RoutingApproach::Exploration, - 1 => api_routing::RoutingApproach::Exploitation, - _ => { - return Err(errors::RoutingError::GenericNotFoundError { - field: "routing_approach".to_string(), - }) - .change_context(errors::RoutingError::GenericNotFoundError { - field: "unknown routing approach from dynamic routing service".to_string(), - }) - .attach_printable("unknown routing approach from dynamic routing service") + .await + .change_context(errors::RoutingError::SuccessRateCalculationError) + .attach_printable( + "unable to calculate/fetch success rate from dynamic routing service", + ); + + match success_based_connectors_result { + Ok(success_response) => { + let updated_resp = utils::CalSuccessRateEventResponse::try_from( + &success_response, + ) + .change_context(errors::RoutingError::RoutingEventsError { message: "unable to convert SuccessBasedConnectors to CalSuccessRateEventResponse".to_string(), status_code: 500 }) + .attach_printable( + "unable to convert SuccessBasedConnectors to CalSuccessRateEventResponse", + )?; + + Ok(Some(updated_resp)) } - }, + Err(e) => { + logger::error!( + "unable to calculate/fetch success rate from dynamic routing service: {:?}", + e.current_context() + ); + + Err(error_stack::report!( + errors::RoutingError::SuccessRateCalculationError + )) + } + } }; - routing_event.set_response_body(&event_response); - routing_event.set_routing_approach(event_response.routing_approach.to_string()); + let events_response = routing_events_wrapper + .construct_event_builder( + "SuccessRateCalculator.FetchSuccessRate".to_string(), + RoutingEngine::IntelligentRouter, + ApiMethod::Grpc, + )? + .trigger_event(state, closure) + .await?; + + let success_based_connectors: utils::CalSuccessRateEventResponse = events_response + .response + .ok_or(errors::RoutingError::SuccessRateCalculationError)?; + + // Need to log error case + let mut routing_event = + events_response + .event + .ok_or(errors::RoutingError::RoutingEventsError { + message: + "SR-Intelligent-Router: RoutingEvent not found in RoutingEventsResponse" + .to_string(), + status_code: 500, + })?; + + routing_event.set_routing_approach(success_based_connectors.routing_approach.to_string()); let mut connectors = Vec::with_capacity(success_based_connectors.labels_with_score.len()); for label_with_score in success_based_connectors.labels_with_score { @@ -2227,7 +2228,7 @@ pub async fn perform_elimination_routing( .ok_or(errors::RoutingError::EliminationBasedRoutingParamsNotFoundError)?, ); - let event_request = api_routing::EliminationRoutingEventRequest { + let event_request = utils::EliminationRoutingEventRequest { id: profile_id.get_string_repr().to_string(), params: elimination_routing_config_params.clone(), labels: routable_connectors @@ -2237,80 +2238,76 @@ pub async fn perform_elimination_routing( config: elimination_routing_config .elimination_analyser_config .as_ref() - .map(|conf| api_routing::EliminationRoutingEventBucketConfig { - bucket_leak_interval_in_secs: conf.bucket_leak_interval_in_secs, - bucket_size: conf.bucket_size, - }), + .map(utils::EliminationRoutingEventBucketConfig::from), }; - let serialized_request = serde_json::to_value(&event_request) - .change_context(errors::RoutingError::SuccessBasedRoutingConfigError) - .attach_printable("unable to serialize EliminationRoutingEventRequest")?; - - let mut routing_event = RoutingEvent::new( + let routing_events_wrapper = utils::RoutingEventsWrapper::new( state.tenant.tenant_id.clone(), - "".to_string(), - "Intelligent-router GetEliminationStatus", - serialized_request, - "EliminationAnalyser.GetEliminationStatus".to_string(), - ApiMethod::Grpc, + state.request_id, payment_id.get_string_repr().to_string(), profile_id.to_owned(), merchant_id.to_owned(), - state.request_id, - RoutingEngine::IntelligentRouter, + "IntelligentRouter: PerformEliminationRouting".to_string(), + Some(event_request.clone()), + true, + false, ); - let elimination_based_connectors: EliminationResponse = client - .perform_elimination_routing( - profile_id.get_string_repr().to_string(), - elimination_routing_config_params, - routable_connectors.clone(), - elimination_routing_config.elimination_analyser_config, - state.get_grpc_headers(), - ) - .await - .inspect_err(|e| { - routing_event - .set_error(serde_json::json!({"error": e.current_context().to_string()})); - state.event_handler().log_event(&routing_event); - }) - .change_context(errors::RoutingError::EliminationRoutingCalculationError) - .attach_printable( - "unable to analyze/fetch elimination routing from dynamic routing service", - )?; - - let event_response = api_routing::EliminationEventResponse { - labels_with_status: elimination_based_connectors - .labels_with_status - .iter() - .map( - |label_with_status| api_routing::LabelWithStatusEliminationEventResponse { - label: label_with_status.label.clone(), - elimination_information: label_with_status - .elimination_information - .as_ref() - .map(|info| api_routing::EliminationInformationEventResponse { - entity: info.entity.as_ref().map(|entity_info| { - api_routing::BucketInformationEventResponse { - is_eliminated: entity_info.is_eliminated, - bucket_name: entity_info.bucket_name.clone(), - } - }), - global: info.global.as_ref().map(|global_info| { - api_routing::BucketInformationEventResponse { - is_eliminated: global_info.is_eliminated, - bucket_name: global_info.bucket_name.clone(), - } - }), - }), - }, + let closure = || async { + let elimination_based_connectors_result = client + .perform_elimination_routing( + profile_id.get_string_repr().to_string(), + elimination_routing_config_params, + routable_connectors.clone(), + elimination_routing_config.elimination_analyser_config, + state.get_grpc_headers(), ) - .collect(), + .await + .change_context(errors::RoutingError::EliminationRoutingCalculationError) + .attach_printable( + "unable to analyze/fetch elimination routing from dynamic routing service", + ); + + match elimination_based_connectors_result { + Ok(elimination_response) => Ok(Some(utils::EliminationEventResponse::from( + &elimination_response, + ))), + Err(e) => { + logger::error!( + "unable to analyze/fetch elimination routing from dynamic routing service: {:?}", + e.current_context() + ); + + Err(error_stack::report!( + errors::RoutingError::EliminationRoutingCalculationError + )) + } + } }; - routing_event.set_response_body(&event_response); - routing_event.set_routing_approach(api_routing::RoutingApproach::Elimination.to_string()); + let events_response = routing_events_wrapper + .construct_event_builder( + "EliminationAnalyser.GetEliminationStatus".to_string(), + RoutingEngine::IntelligentRouter, + ApiMethod::Grpc, + )? + .trigger_event(state, closure) + .await?; + + let elimination_based_connectors: utils::EliminationEventResponse = events_response + .response + .ok_or(errors::RoutingError::EliminationRoutingCalculationError)?; + + let mut routing_event = events_response + .event + .ok_or(errors::RoutingError::RoutingEventsError { + message: + "Elimination-Intelligent-Router: RoutingEvent not found in RoutingEventsResponse" + .to_string(), + status_code: 500, + })?; + + routing_event.set_routing_approach(utils::RoutingApproach::Elimination.to_string()); let mut connectors = Vec::with_capacity(elimination_based_connectors.labels_with_status.len()); @@ -2440,7 +2437,7 @@ pub async fn perform_contract_based_routing( }) .collect::>(); - let event_request = api_routing::CalContractScoreEventRequest { + let event_request = utils::CalContractScoreEventRequest { id: profile_id.get_string_repr().to_string(), params: "".to_string(), labels: contract_based_connectors @@ -2450,66 +2447,37 @@ pub async fn perform_contract_based_routing( config: Some(contract_based_routing_configs.clone()), }; - let serialized_request = serde_json::to_value(&event_request) - .change_context(errors::RoutingError::SuccessBasedRoutingConfigError) - .attach_printable("unable to serialize EliminationRoutingEventRequest")?; - - let mut routing_event = RoutingEvent::new( + let routing_events_wrapper = utils::RoutingEventsWrapper::new( state.tenant.tenant_id.clone(), - "".to_string(), - "Intelligent-router CalContractScore", - serialized_request, - "ContractScoreCalculator.FetchContractScore".to_string(), - ApiMethod::Grpc, + state.request_id, payment_id.get_string_repr().to_string(), profile_id.to_owned(), merchant_id.to_owned(), - state.request_id, - RoutingEngine::IntelligentRouter, + "IntelligentRouter: PerformContractRouting".to_string(), + Some(event_request.clone()), + true, + false, ); - let contract_based_connectors_result = client - .calculate_contract_score( - profile_id.get_string_repr().into(), - contract_based_routing_configs.clone(), - "".to_string(), - contract_based_connectors, - state.get_grpc_headers(), - ) - .await - .inspect_err(|e| { - routing_event - .set_error(serde_json::json!({"error": e.current_context().to_string()})); - routing_event - .set_routing_approach(api_routing::RoutingApproach::ContractBased.to_string()); - state.event_handler().log_event(&routing_event); - }) - .attach_printable( - "unable to calculate/fetch contract score from dynamic routing service", - ); + let closure = || async { + let contract_based_connectors_result = client + .calculate_contract_score( + profile_id.get_string_repr().into(), + contract_based_routing_configs.clone(), + "".to_string(), + contract_based_connectors, + state.get_grpc_headers(), + ) + .await + .attach_printable( + "unable to calculate/fetch contract score from dynamic routing service", + ); - let contract_based_connectors = match contract_based_connectors_result { - Ok(resp) => { - let event_response = api_routing::CalContractScoreEventResponse { - labels_with_score: resp - .labels_with_score - .iter() - .map(|label_with_score| api_routing::ScoreDataEventResponse { - score: label_with_score.score, - label: label_with_score.label.clone(), - current_count: label_with_score.current_count, - }) - .collect(), - }; - - routing_event.set_response_body(&event_response); - routing_event - .set_routing_approach(api_routing::RoutingApproach::ContractBased.to_string()); - resp - } - Err(err) => match err.current_context() { - DynamicRoutingError::ContractNotFound => { - client + let contract_based_connectors = match contract_based_connectors_result { + Ok(resp) => Some(utils::CalContractScoreEventResponse::from(&resp)), + Err(err) => match err.current_context() { + DynamicRoutingError::ContractNotFound => { + client .update_contracts( profile_id.get_string_repr().into(), label_info, @@ -2523,20 +2491,47 @@ pub async fn perform_contract_based_routing( .attach_printable( "unable to update contract based routing window in dynamic routing service", )?; - return Err((errors::RoutingError::ContractScoreCalculationError { - err: err.to_string(), - }) - .into()); - } - _ => { - return Err((errors::RoutingError::ContractScoreCalculationError { - err: err.to_string(), - }) - .into()) - } - }, + return Err((errors::RoutingError::ContractScoreCalculationError { + err: err.to_string(), + }) + .into()); + } + _ => { + return Err((errors::RoutingError::ContractScoreCalculationError { + err: err.to_string(), + }) + .into()) + } + }, + }; + + Ok(contract_based_connectors) }; + let events_response = routing_events_wrapper + .construct_event_builder( + "ContractScoreCalculator.FetchContractScore".to_string(), + RoutingEngine::IntelligentRouter, + ApiMethod::Grpc, + )? + .trigger_event(state, closure) + .await?; + + let contract_based_connectors: utils::CalContractScoreEventResponse = events_response + .response + .ok_or(errors::RoutingError::ContractScoreCalculationError { + err: "CalContractScoreEventResponse not found".to_string(), + })?; + + let mut routing_event = events_response + .event + .ok_or(errors::RoutingError::RoutingEventsError { + message: + "ContractRouting-Intelligent-Router: RoutingEvent not found in RoutingEventsResponse" + .to_string(), + status_code: 500, + })?; + let mut connectors = Vec::with_capacity(contract_based_connectors.labels_with_score.len()); for label_with_score in contract_based_connectors.labels_with_score { diff --git a/crates/router/src/core/payments/routing/utils.rs b/crates/router/src/core/payments/routing/utils.rs index 0478091936..d49aada707 100644 --- a/crates/router/src/core/payments/routing/utils.rs +++ b/crates/router/src/core/payments/routing/utils.rs @@ -1,20 +1,27 @@ -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + str::FromStr, +}; use api_models::{ - routing as api_routing, + open_router as or_types, routing as api_routing, routing::{ConnectorSelection, RoutableConnectorChoice}, }; use async_trait::async_trait; -use common_utils::id_type; +use common_utils::{ext_traits::BytesExt, id_type}; use diesel_models::{enums, routing_algorithm}; use error_stack::ResultExt; use euclid::{backend::BackendInput, frontend::ast}; +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +use external_services::grpc_client::dynamic_routing as ir_client; +use hyperswitch_interfaces::events::routing_api_logs as routing_events; +use router_env::tracing_actix_web::RequestId; use serde::{Deserialize, Serialize}; use super::RoutingResult; use crate::{ core::errors, - routes::SessionState, + routes::{app::SessionStateInfo, SessionState}, services::{self, logger}, types::transformers::ForeignInto, }; @@ -28,10 +35,11 @@ pub trait DecisionEngineApiHandler { path: &str, request_body: Option, // Option to handle GET/DELETE requests without body timeout: Option, - ) -> RoutingResult + events_wrapper: Option>, + ) -> RoutingResult> where - Req: Serialize + Send + Sync + 'static, - Res: serde::de::DeserializeOwned + Send + 'static + std::fmt::Debug; + Req: Serialize + Send + Sync + 'static + Clone, + Res: Serialize + serde::de::DeserializeOwned + Send + 'static + std::fmt::Debug + Clone; async fn send_decision_engine_request_without_response_parsing( state: &SessionState, @@ -39,9 +47,10 @@ pub trait DecisionEngineApiHandler { path: &str, request_body: Option, timeout: Option, + events_wrapper: Option>, ) -> RoutingResult<()> where - Req: Serialize + Send + Sync + 'static; + Req: Serialize + Send + Sync + 'static + Clone; } // Struct to implement the DecisionEngineApiHandler trait @@ -49,16 +58,21 @@ pub struct EuclidApiClient; pub struct ConfigApiClient; -pub async fn build_and_send_decision_engine_http_request( +pub struct SRApiClient; + +pub async fn build_and_send_decision_engine_http_request( state: &SessionState, http_method: services::Method, path: &str, request_body: Option, - timeout: Option, + _timeout: Option, context_message: &str, -) -> RoutingResult + events_wrapper: Option>, +) -> RoutingResult> where - Req: Serialize + Send + Sync + 'static, + Req: Serialize + Send + Sync + 'static + Clone, + Res: Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Clone, + ErrRes: serde::de::DeserializeOwned + std::fmt::Debug + Clone + DecisionEngineErrorsInterface, { let decision_engine_base_url = &state.conf.open_router.url; let url = format!("{}/{}", decision_engine_base_url, path); @@ -75,18 +89,88 @@ where let http_request = request_builder.build(); logger::info!(?http_request, decision_engine_request_path = %path, "decision_engine: Constructed Decision Engine API request details ({})", context_message); + let should_parse_response = events_wrapper + .as_ref() + .map(|wrapper| wrapper.parse_response) + .unwrap_or(true); - state - .api_client - .send_request(state, http_request, timeout, false) - .await - .change_context(errors::RoutingError::DslExecutionError) - .attach_printable_lazy(|| { - format!( - "Decision Engine API call to path '{}' unresponsive ({})", - path, context_message - ) - }) + let closure = || async { + let response = + services::call_connector_api(state, http_request, "Decision Engine API call") + .await + .change_context(errors::RoutingError::OpenRouterCallFailed)?; + + match response { + Ok(resp) => { + logger::debug!( + "decision_engine: Received response from Decision Engine API ({:?})", + String::from_utf8_lossy(&resp.response) // For logging + ); + + let resp = should_parse_response + .then(|| { + let response_type: Res = resp + .response + .parse_struct(std::any::type_name::()) + .change_context(errors::RoutingError::OpenRouterError( + "Failed to parse the response from open_router".into(), + ))?; + + Ok::<_, error_stack::Report>(response_type) + }) + .transpose()?; + + logger::debug!("decision_engine_success_response: {:?}", resp); + + Ok(resp) + } + Err(err) => { + logger::debug!( + "decision_engine: Received response from Decision Engine API ({:?})", + String::from_utf8_lossy(&err.response) // For logging + ); + + let err_resp: ErrRes = err + .response + .parse_struct(std::any::type_name::()) + .change_context(errors::RoutingError::OpenRouterError( + "Failed to parse the response from open_router".into(), + ))?; + + logger::error!( + decision_engine_error_code = %err_resp.get_error_code(), + decision_engine_error_message = %err_resp.get_error_message(), + decision_engine_raw_response = ?err_resp.get_error_data(), + ); + + Err(error_stack::report!( + errors::RoutingError::RoutingEventsError { + message: err_resp.get_error_message(), + status_code: err.status_code, + } + )) + } + } + }; + + let events_response = if let Some(wrapper) = events_wrapper { + wrapper + .construct_event_builder( + url, + routing_events::RoutingEngine::DecisionEngine, + routing_events::ApiMethod::Rest(http_method), + )? + .trigger_event(state, closure) + .await? + } else { + let resp = closure() + .await + .change_context(errors::RoutingError::OpenRouterCallFailed)?; + + RoutingEventsResponse::new(None, resp) + }; + + Ok(events_response) } #[async_trait] @@ -97,83 +181,33 @@ impl DecisionEngineApiHandler for EuclidApiClient { path: &str, request_body: Option, // Option to handle GET/DELETE requests without body timeout: Option, - ) -> RoutingResult + events_wrapper: Option>, + ) -> RoutingResult> where - Req: Serialize + Send + Sync + 'static, - Res: serde::de::DeserializeOwned + Send + 'static + std::fmt::Debug, + Req: Serialize + Send + Sync + 'static + Clone, + Res: Serialize + serde::de::DeserializeOwned + Send + 'static + std::fmt::Debug + Clone, { - let response = build_and_send_decision_engine_http_request( + let event_response = build_and_send_decision_engine_http_request::<_, _, DeErrorResponse>( state, http_method, path, request_body, timeout, "parsing response", + events_wrapper, ) .await?; - let status = response.status(); - let response_bytes = response.bytes().await.unwrap_or_default(); + let parsed_response = + event_response + .response + .as_ref() + .ok_or(errors::RoutingError::OpenRouterError( + "Response from decision engine API is empty".to_string(), + ))?; - let body_str = String::from_utf8_lossy(&response_bytes); // For logging - - if !status.is_success() { - match serde_json::from_slice::(&response_bytes) { - Ok(parsed) => { - logger::error!( - decision_engine_error_code = %parsed.code, - decision_engine_error_message = %parsed.message, - decision_engine_raw_response = ?parsed.data, - "decision_engine_euclid: validation failed" - ); - - return Err(errors::RoutingError::DecisionEngineValidationError( - parsed.message, - ) - .into()); - } - Err(_) => { - logger::error!( - decision_engine_raw_response = %body_str, - "decision_engine_euclid: failed to deserialize validation error response" - ); - - return Err(errors::RoutingError::DecisionEngineValidationError( - "decision_engine_euclid: Failed to parse validation error from decision engine".to_string(), - ) - .into()); - } - } - } - - logger::debug!( - euclid_response_body = %body_str, - response_status = ?status, - euclid_request_path = %path, - "decision_engine_euclid: Received raw response from Euclid API" - ); - - let parsed_response = serde_json::from_slice::(&response_bytes) - .map_err(|_| errors::RoutingError::GenericConversionError { - from: "ApiResponse".to_string(), - to: std::any::type_name::().to_string(), - }) - .attach_printable_lazy(|| { - format!( - "Unable to parse response of type '{}' received from Euclid API path: {}", - std::any::type_name::(), - path - ) - })?; - - logger::debug!( - parsed_response = ?parsed_response, - response_type = %std::any::type_name::(), - euclid_request_path = %path, - "decision_engine_euclid: Successfully parsed response from Euclid API" - ); - - Ok(parsed_response) + logger::debug!(parsed_response = ?parsed_response, response_type = %std::any::type_name::(), euclid_request_path = %path, "decision_engine_euclid: Successfully parsed response from Euclid API"); + Ok(event_response) } async fn send_decision_engine_request_without_response_parsing( @@ -182,19 +216,24 @@ impl DecisionEngineApiHandler for EuclidApiClient { path: &str, request_body: Option, timeout: Option, + events_wrapper: Option>, ) -> RoutingResult<()> where - Req: Serialize + Send + Sync + 'static, + Req: Serialize + Send + Sync + 'static + Clone, { - let response = build_and_send_decision_engine_http_request( - state, - http_method, - path, - request_body, - timeout, - "not parsing response", - ) - .await?; + let event_response = + build_and_send_decision_engine_http_request::( + state, + http_method, + path, + request_body, + timeout, + "not parsing response", + events_wrapper, + ) + .await?; + + let response = event_response.response; logger::debug!(euclid_response = ?response, euclid_request_path = %path, "decision_engine_routing: Received raw response from Euclid API"); Ok(()) @@ -209,38 +248,32 @@ impl DecisionEngineApiHandler for ConfigApiClient { path: &str, request_body: Option, timeout: Option, - ) -> RoutingResult + events_wrapper: Option>, + ) -> RoutingResult> where - Req: Serialize + Send + Sync + 'static, - Res: serde::de::DeserializeOwned + Send + 'static + std::fmt::Debug, + Req: Serialize + Send + Sync + 'static + Clone, + Res: Serialize + serde::de::DeserializeOwned + Send + 'static + std::fmt::Debug + Clone, { - let response = build_and_send_decision_engine_http_request( + let events_response = build_and_send_decision_engine_http_request::<_, _, DeErrorResponse>( state, http_method, path, request_body, timeout, "parsing response", + events_wrapper, ) .await?; - logger::debug!(decision_engine_config_response = ?response, decision_engine_request_path = %path, "decision_engine_config: Received raw response from Decision Engine config API"); - let parsed_response = response - .json::() - .await - .change_context(errors::RoutingError::GenericConversionError { - from: "ApiResponse".to_string(), - to: std::any::type_name::().to_string(), - }) - .attach_printable_lazy(|| { - format!( - "Unable to parse response of type '{}' received from Decision Engine config API path: {}", - std::any::type_name::(), - path - ) - })?; + let parsed_response = + events_response + .response + .as_ref() + .ok_or(errors::RoutingError::OpenRouterError( + "Response from decision engine API is empty".to_string(), + ))?; logger::debug!(parsed_response = ?parsed_response, response_type = %std::any::type_name::(), decision_engine_request_path = %path, "decision_engine_config: Successfully parsed response from Decision Engine config API"); - Ok(parsed_response) + Ok(events_response) } async fn send_decision_engine_request_without_response_parsing( @@ -249,19 +282,91 @@ impl DecisionEngineApiHandler for ConfigApiClient { path: &str, request_body: Option, timeout: Option, + events_wrapper: Option>, ) -> RoutingResult<()> where - Req: Serialize + Send + Sync + 'static, + Req: Serialize + Send + Sync + 'static + Clone, { - let response = build_and_send_decision_engine_http_request( - state, - http_method, - path, - request_body, - timeout, - "not parsing response", - ) - .await?; + let event_response = + build_and_send_decision_engine_http_request::( + state, + http_method, + path, + request_body, + timeout, + "not parsing response", + events_wrapper, + ) + .await?; + + let response = event_response.response; + + logger::debug!(decision_engine_response = ?response, decision_engine_request_path = %path, "decision_engine_config: Received raw response from Decision Engine config API"); + Ok(()) + } +} + +#[async_trait] +impl DecisionEngineApiHandler for SRApiClient { + async fn send_decision_engine_request( + state: &SessionState, + http_method: services::Method, + path: &str, + request_body: Option, + timeout: Option, + events_wrapper: Option>, + ) -> RoutingResult> + where + Req: Serialize + Send + Sync + 'static + Clone, + Res: Serialize + serde::de::DeserializeOwned + Send + 'static + std::fmt::Debug + Clone, + { + let events_response = + build_and_send_decision_engine_http_request::<_, _, or_types::ErrorResponse>( + state, + http_method, + path, + request_body, + timeout, + "parsing response", + events_wrapper, + ) + .await?; + + let parsed_response = + events_response + .response + .as_ref() + .ok_or(errors::RoutingError::OpenRouterError( + "Response from decision engine API is empty".to_string(), + ))?; + logger::debug!(parsed_response = ?parsed_response, response_type = %std::any::type_name::(), decision_engine_request_path = %path, "decision_engine_config: Successfully parsed response from Decision Engine config API"); + Ok(events_response) + } + + async fn send_decision_engine_request_without_response_parsing( + state: &SessionState, + http_method: services::Method, + path: &str, + request_body: Option, + timeout: Option, + events_wrapper: Option>, + ) -> RoutingResult<()> + where + Req: Serialize + Send + Sync + 'static + Clone, + { + let event_response = + build_and_send_decision_engine_http_request::( + state, + http_method, + path, + request_body, + timeout, + "not parsing response", + events_wrapper, + ) + .await?; + + let response = event_response.response; logger::debug!(decision_engine_response = ?response, decision_engine_request_path = %path, "decision_engine_config: Received raw response from Decision Engine config API"); Ok(()) @@ -274,20 +379,85 @@ pub async fn perform_decision_euclid_routing( state: &SessionState, input: BackendInput, created_by: String, -) -> RoutingResult> { + events_wrapper: RoutingEventsWrapper, +) -> RoutingResult> { logger::debug!("decision_engine_euclid: evaluate api call for euclid routing evaluation"); - let routing_request = convert_backend_input_to_routing_eval(created_by, input)?; + let mut events_wrapper = events_wrapper; - let euclid_response: RoutingEvaluateResponse = EuclidApiClient::send_decision_engine_request( + let routing_request = convert_backend_input_to_routing_eval(created_by, input)?; + events_wrapper.set_request_body(routing_request.clone()); + + let event_response = EuclidApiClient::send_decision_engine_request( state, services::Method::Post, "routing/evaluate", Some(routing_request), Some(EUCLID_API_TIMEOUT), + Some(events_wrapper), ) .await?; + let euclid_response: RoutingEvaluateResponse = + event_response + .response + .ok_or(errors::RoutingError::OpenRouterError( + "Response from decision engine API is empty".to_string(), + ))?; + + let mut routing_event = + event_response + .event + .ok_or(errors::RoutingError::RoutingEventsError { + message: "Routing event not found in EventsResponse".to_string(), + status_code: 500, + })?; + + let connector_info = euclid_response.evaluated_output.clone(); + let mut routable_connectors = Vec::new(); + for conn in &connector_info { + let connector = common_enums::RoutableConnectors::from_str(conn.gateway_name.as_str()) + .change_context(errors::RoutingError::GenericConversionError { + from: "String".to_string(), + to: "RoutableConnectors".to_string(), + }) + .attach_printable( + "decision_engine_euclid: unable to convert String to RoutableConnectors", + ) + .ok(); + let mca_id = conn + .gateway_id + .as_ref() + .map(|id| { + id_type::MerchantConnectorAccountId::wrap(id.to_string()) + .change_context(errors::RoutingError::GenericConversionError { + from: "String".to_string(), + to: "MerchantConnectorAccountId".to_string(), + }) + .attach_printable( + "decision_engine_euclid: unable to convert MerchantConnectorAccountId from string", + ) + }) + .transpose() + .ok() + .flatten(); + + if let Some(conn) = connector { + let connector = RoutableConnectorChoice { + choice_kind: api_routing::RoutableChoiceKind::FullStruct, + connector: conn, + merchant_connector_id: mca_id, + }; + routable_connectors.push(connector); + } + } + + routing_event.set_routing_approach(RoutingApproach::StaticRouting.to_string()); + routing_event.set_routable_connectors(routable_connectors); + state.event_handler.log_event(&routing_event); + + // Need to log euclid response event here + logger::debug!(decision_engine_euclid_response=?euclid_response,"decision_engine_euclid"); logger::debug!(decision_engine_euclid_selected_connector=?euclid_response.evaluated_output,"decision_engine_euclid"); @@ -301,15 +471,23 @@ pub async fn create_de_euclid_routing_algo( logger::debug!("decision_engine_euclid: create api call for euclid routing rule creation"); logger::debug!(decision_engine_euclid_request=?routing_request,"decision_engine_euclid"); - let euclid_response: RoutingDictionaryRecord = EuclidApiClient::send_decision_engine_request( + let events_response = EuclidApiClient::send_decision_engine_request( state, services::Method::Post, "routing/create", Some(routing_request.clone()), Some(EUCLID_API_TIMEOUT), + None, ) .await?; + let euclid_response: RoutingDictionaryRecord = + events_response + .response + .ok_or(errors::RoutingError::OpenRouterError( + "Response from decision engine API is empty".to_string(), + ))?; + logger::debug!(decision_engine_euclid_parsed_response=?euclid_response,"decision_engine_euclid"); Ok(euclid_response.rule_id) } @@ -326,6 +504,7 @@ pub async fn link_de_euclid_routing_algorithm( "routing/activate", Some(routing_request.clone()), Some(EUCLID_API_TIMEOUT), + None, ) .await?; @@ -339,16 +518,24 @@ pub async fn list_de_euclid_routing_algorithms( ) -> RoutingResult> { logger::debug!("decision_engine_euclid: list api call for euclid routing algorithms"); let created_by = routing_list_request.created_by; - let response: Vec = EuclidApiClient::send_decision_engine_request( + let events_response = EuclidApiClient::send_decision_engine_request( state, services::Method::Post, format!("routing/list/{created_by}").as_str(), None::<()>, Some(EUCLID_API_TIMEOUT), + None, ) .await?; - Ok(response + let euclid_response: Vec = + events_response + .response + .ok_or(errors::RoutingError::OpenRouterError( + "Response from decision engine API is empty".to_string(), + ))?; + + Ok(euclid_response .into_iter() .map(routing_algorithm::RoutingProfileMetadata::from) .map(ForeignInto::foreign_into) @@ -543,6 +730,37 @@ struct DeErrorResponse { data: Option, } +impl DecisionEngineErrorsInterface for DeErrorResponse { + fn get_error_message(&self) -> String { + self.message.clone() + } + + fn get_error_code(&self) -> String { + self.code.clone() + } + + fn get_error_data(&self) -> Option { + self.data.as_ref().map(|data| data.to_string()) + } +} + +impl DecisionEngineErrorsInterface for or_types::ErrorResponse { + fn get_error_message(&self) -> String { + self.error_message.clone() + } + + fn get_error_code(&self) -> String { + self.error_code.clone() + } + + fn get_error_data(&self) -> Option { + Some(format!( + "decision_engine Error: {}", + self.error_message.clone() + )) + } +} + //TODO: temporary change will be refactored afterwards #[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)] pub struct RoutingEvaluateRequest { @@ -550,12 +768,12 @@ pub struct RoutingEvaluateRequest { pub parameters: HashMap>, } -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] pub struct RoutingEvaluateResponse { pub status: String, pub output: serde_json::Value, - pub evaluated_output: Vec, - pub eligible_connectors: Vec, + pub evaluated_output: Vec, + pub eligible_connectors: Vec, } #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -686,13 +904,16 @@ pub struct VolumeSplit { #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub struct ConnectorInfo { - pub connector: String, - pub mca_id: Option, + pub gateway_name: String, + pub gateway_id: Option, } impl ConnectorInfo { - pub fn new(connector: String, mca_id: Option) -> Self { - Self { connector, mca_id } + pub fn new(gateway_name: String, gateway_id: Option) -> Self { + Self { + gateway_name, + gateway_id, + } } } @@ -914,3 +1135,654 @@ fn stringify_choice(c: RoutableConnectorChoice) -> ConnectorInfo { .map(|mca_id| mca_id.get_string_repr().to_string()), ) } + +pub trait DecisionEngineErrorsInterface { + fn get_error_message(&self) -> String; + fn get_error_code(&self) -> String; + fn get_error_data(&self) -> Option; +} + +#[derive(Debug)] +pub struct RoutingEventsWrapper +where + Req: Serialize + Clone, +{ + pub tenant_id: id_type::TenantId, + pub request_id: Option, + pub payment_id: String, + pub profile_id: id_type::ProfileId, + pub merchant_id: id_type::MerchantId, + pub flow: String, + pub request: Option, + pub parse_response: bool, + pub log_event: bool, + pub routing_event: Option, +} + +#[derive(Debug)] +pub enum EventResponseType +where + Res: Serialize + serde::de::DeserializeOwned + Clone, +{ + Structured(Res), + String(String), +} + +#[derive(Debug)] +pub struct RoutingEventsResponse +where + Res: Serialize + serde::de::DeserializeOwned + Clone, +{ + pub event: Option, + pub response: Option, +} + +impl RoutingEventsResponse +where + Res: Serialize + serde::de::DeserializeOwned + Clone, +{ + pub fn new(event: Option, response: Option) -> Self { + Self { event, response } + } + + pub fn set_response(&mut self, response: Res) { + self.response = Some(response); + } + + pub fn set_event(&mut self, event: routing_events::RoutingEvent) { + self.event = Some(event); + } +} + +impl RoutingEventsWrapper +where + Req: Serialize + Clone, +{ + #[allow(clippy::too_many_arguments)] + pub fn new( + tenant_id: id_type::TenantId, + request_id: Option, + payment_id: String, + profile_id: id_type::ProfileId, + merchant_id: id_type::MerchantId, + flow: String, + request: Option, + parse_response: bool, + log_event: bool, + ) -> Self { + Self { + tenant_id, + request_id, + payment_id, + profile_id, + merchant_id, + flow, + request, + parse_response, + log_event, + routing_event: None, + } + } + + pub fn construct_event_builder( + self, + url: String, + routing_engine: routing_events::RoutingEngine, + method: routing_events::ApiMethod, + ) -> RoutingResult { + let mut wrapper = self; + let request = wrapper + .request + .clone() + .ok_or(errors::RoutingError::RoutingEventsError { + message: "Request body is missing".to_string(), + status_code: 400, + })?; + + let serialized_request = serde_json::to_value(&request) + .change_context(errors::RoutingError::RoutingEventsError { + message: "Failed to serialize RoutingRequest".to_string(), + status_code: 500, + }) + .attach_printable("Failed to serialize request body")?; + + let routing_event = routing_events::RoutingEvent::new( + wrapper.tenant_id.clone(), + "".to_string(), + &wrapper.flow, + serialized_request, + url, + method, + wrapper.payment_id.clone(), + wrapper.profile_id.clone(), + wrapper.merchant_id.clone(), + wrapper.request_id, + routing_engine, + ); + + wrapper.set_routing_event(routing_event); + + Ok(wrapper) + } + + pub async fn trigger_event( + self, + state: &SessionState, + func: F, + ) -> RoutingResult> + where + F: FnOnce() -> Fut + Send, + Res: Serialize + serde::de::DeserializeOwned + Clone, + Fut: futures::Future>> + Send, + { + let mut routing_event = + self.routing_event + .ok_or(errors::RoutingError::RoutingEventsError { + message: "Routing event is missing".to_string(), + status_code: 500, + })?; + + let mut response = RoutingEventsResponse::new(None, None); + + let resp = func().await; + match resp { + Ok(ok_resp) => { + if let Some(resp) = ok_resp { + routing_event.set_response_body(&resp); + // routing_event + // .set_routable_connectors(ok_resp.get_routable_connectors().unwrap_or_default()); + // routing_event.set_payment_connector(ok_resp.get_payment_connector()); + routing_event.set_status_code(200); + + response.set_response(resp.clone()); + self.log_event + .then(|| state.event_handler().log_event(&routing_event)); + } + } + Err(err) => { + // Need to figure out a generic way to log errors + routing_event + .set_error(serde_json::json!({"error": err.current_context().to_string()})); + + match err.current_context() { + errors::RoutingError::RoutingEventsError { status_code, .. } => { + routing_event.set_status_code(*status_code); + } + _ => { + routing_event.set_status_code(500); + } + } + state.event_handler().log_event(&routing_event) + } + } + + response.set_event(routing_event); + + Ok(response) + } + + pub fn set_log_event(&mut self, log_event: bool) { + self.log_event = log_event; + } + + pub fn set_request_body(&mut self, request: Req) { + self.request = Some(request); + } + + pub fn set_routing_event(&mut self, routing_event: routing_events::RoutingEvent) { + self.routing_event = Some(routing_event); + } +} + +pub trait RoutingEventsInterface { + fn get_routable_connectors(&self) -> Option>; + fn get_payment_connector(&self) -> Option; +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct CalSuccessRateConfigEventRequest { + pub min_aggregates_size: Option, + pub default_success_rate: Option, + pub specificity_level: api_routing::SuccessRateSpecificityLevel, + pub exploration_percent: Option, +} + +impl From<&api_routing::SuccessBasedRoutingConfigBody> for CalSuccessRateConfigEventRequest { + fn from(value: &api_routing::SuccessBasedRoutingConfigBody) -> Self { + Self { + min_aggregates_size: value.min_aggregates_size, + default_success_rate: value.default_success_rate, + specificity_level: value.specificity_level, + exploration_percent: value.exploration_percent, + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct CalSuccessRateEventRequest { + pub id: String, + pub params: String, + pub labels: Vec, + pub config: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct EliminationRoutingEventBucketConfig { + pub bucket_size: Option, + pub bucket_leak_interval_in_secs: Option, +} + +impl From<&api_routing::EliminationAnalyserConfig> for EliminationRoutingEventBucketConfig { + fn from(value: &api_routing::EliminationAnalyserConfig) -> Self { + Self { + bucket_size: value.bucket_size, + bucket_leak_interval_in_secs: value.bucket_leak_interval_in_secs, + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct EliminationRoutingEventRequest { + pub id: String, + pub params: String, + pub labels: Vec, + pub config: Option, +} + +/// API-1 types +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct CalContractScoreEventRequest { + pub id: String, + pub params: String, + pub labels: Vec, + pub config: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct LabelWithScoreEventResponse { + pub score: f64, + pub label: String, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct CalSuccessRateEventResponse { + pub labels_with_score: Vec, + pub routing_approach: RoutingApproach, +} + +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +impl TryFrom<&ir_client::success_rate_client::CalSuccessRateResponse> + for CalSuccessRateEventResponse +{ + type Error = errors::RoutingError; + + fn try_from( + value: &ir_client::success_rate_client::CalSuccessRateResponse, + ) -> Result { + Ok(Self { + labels_with_score: value + .labels_with_score + .iter() + .map(|l| LabelWithScoreEventResponse { + score: l.score, + label: l.label.clone(), + }) + .collect(), + routing_approach: match value.routing_approach { + 0 => RoutingApproach::Exploration, + 1 => RoutingApproach::Exploitation, + _ => { + return Err(errors::RoutingError::GenericNotFoundError { + field: "unknown routing approach from dynamic routing service".to_string(), + }) + } + }, + }) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum RoutingApproach { + Exploitation, + Exploration, + Elimination, + ContractBased, + StaticRouting, + Default, +} + +impl RoutingApproach { + pub fn from_decision_engine_approach(approach: &str) -> Self { + match approach { + "SR_SELECTION_V3_ROUTING" => Self::Exploitation, + "SR_V3_HEDGING" => Self::Exploration, + _ => Self::Default, + } + } +} + +impl std::fmt::Display for RoutingApproach { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Exploitation => write!(f, "Exploitation"), + Self::Exploration => write!(f, "Exploration"), + Self::Elimination => write!(f, "Elimination"), + Self::ContractBased => write!(f, "ContractBased"), + Self::StaticRouting => write!(f, "StaticRouting"), + Self::Default => write!(f, "Default"), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct BucketInformationEventResponse { + pub is_eliminated: bool, + pub bucket_name: Vec, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct EliminationInformationEventResponse { + pub entity: Option, + pub global: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct LabelWithStatusEliminationEventResponse { + pub label: String, + pub elimination_information: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct EliminationEventResponse { + pub labels_with_status: Vec, +} + +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +impl From<&ir_client::elimination_based_client::EliminationResponse> for EliminationEventResponse { + fn from(value: &ir_client::elimination_based_client::EliminationResponse) -> Self { + Self { + labels_with_status: value + .labels_with_status + .iter() + .map( + |label_with_status| LabelWithStatusEliminationEventResponse { + label: label_with_status.label.clone(), + elimination_information: label_with_status + .elimination_information + .as_ref() + .map(|info| EliminationInformationEventResponse { + entity: info.entity.as_ref().map(|entity_info| { + BucketInformationEventResponse { + is_eliminated: entity_info.is_eliminated, + bucket_name: entity_info.bucket_name.clone(), + } + }), + global: info.global.as_ref().map(|global_info| { + BucketInformationEventResponse { + is_eliminated: global_info.is_eliminated, + bucket_name: global_info.bucket_name.clone(), + } + }), + }), + }, + ) + .collect(), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct ScoreDataEventResponse { + pub score: f64, + pub label: String, + pub current_count: u64, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct CalContractScoreEventResponse { + pub labels_with_score: Vec, +} + +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +impl From<&ir_client::contract_routing_client::CalContractScoreResponse> + for CalContractScoreEventResponse +{ + fn from(value: &ir_client::contract_routing_client::CalContractScoreResponse) -> Self { + Self { + labels_with_score: value + .labels_with_score + .iter() + .map(|label_with_score| ScoreDataEventResponse { + score: label_with_score.score, + label: label_with_score.label.clone(), + current_count: label_with_score.current_count, + }) + .collect(), + } + } +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct CalGlobalSuccessRateConfigEventRequest { + pub entity_min_aggregates_size: u32, + pub entity_default_success_rate: f64, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct CalGlobalSuccessRateEventRequest { + pub entity_id: String, + pub entity_params: String, + pub entity_labels: Vec, + pub global_labels: Vec, + pub config: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct UpdateSuccessRateWindowConfig { + pub max_aggregates_size: Option, + pub current_block_threshold: Option, +} + +impl From<&api_routing::SuccessBasedRoutingConfigBody> for UpdateSuccessRateWindowConfig { + fn from(value: &api_routing::SuccessBasedRoutingConfigBody) -> Self { + Self { + max_aggregates_size: value.max_aggregates_size, + current_block_threshold: value.current_block_threshold.clone(), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct UpdateLabelWithStatusEventRequest { + pub label: String, + pub status: bool, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct UpdateSuccessRateWindowEventRequest { + pub id: String, + pub params: String, + pub labels_with_status: Vec, + pub config: Option, + pub global_labels_with_status: Vec, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct UpdateSuccessRateWindowEventResponse { + pub status: UpdationStatusEventResponse, +} + +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +impl TryFrom<&ir_client::success_rate_client::UpdateSuccessRateWindowResponse> + for UpdateSuccessRateWindowEventResponse +{ + type Error = errors::RoutingError; + + fn try_from( + value: &ir_client::success_rate_client::UpdateSuccessRateWindowResponse, + ) -> Result { + Ok(Self { + status: match value.status { + 0 => UpdationStatusEventResponse::WindowUpdationSucceeded, + 1 => UpdationStatusEventResponse::WindowUpdationFailed, + _ => { + return Err(errors::RoutingError::GenericNotFoundError { + field: "unknown updation status from dynamic routing service".to_string(), + }) + } + }, + }) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum UpdationStatusEventResponse { + WindowUpdationSucceeded, + WindowUpdationFailed, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct LabelWithBucketNameEventRequest { + pub label: String, + pub bucket_name: String, +} + +impl From<&api_routing::RoutableConnectorChoiceWithBucketName> for LabelWithBucketNameEventRequest { + fn from(value: &api_routing::RoutableConnectorChoiceWithBucketName) -> Self { + Self { + label: value.routable_connector_choice.to_string(), + bucket_name: value.bucket_name.clone(), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct UpdateEliminationBucketEventRequest { + pub id: String, + pub params: String, + pub labels_with_bucket_name: Vec, + pub config: Option, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct UpdateEliminationBucketEventResponse { + pub status: EliminationUpdationStatusEventResponse, +} + +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +impl TryFrom<&ir_client::elimination_based_client::UpdateEliminationBucketResponse> + for UpdateEliminationBucketEventResponse +{ + type Error = errors::RoutingError; + + fn try_from( + value: &ir_client::elimination_based_client::UpdateEliminationBucketResponse, + ) -> Result { + Ok(Self { + status: match value.status { + 0 => EliminationUpdationStatusEventResponse::BucketUpdationSucceeded, + 1 => EliminationUpdationStatusEventResponse::BucketUpdationFailed, + _ => { + return Err(errors::RoutingError::GenericNotFoundError { + field: "unknown updation status from dynamic routing service".to_string(), + }) + } + }, + }) + } +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum EliminationUpdationStatusEventResponse { + BucketUpdationSucceeded, + BucketUpdationFailed, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct ContractLabelInformationEventRequest { + pub label: String, + pub target_count: u64, + pub target_time: u64, + pub current_count: u64, +} + +impl From<&api_routing::LabelInformation> for ContractLabelInformationEventRequest { + fn from(value: &api_routing::LabelInformation) -> Self { + Self { + label: value.label.clone(), + target_count: value.target_count, + target_time: value.target_time, + current_count: 1, + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct UpdateContractRequestEventRequest { + pub id: String, + pub params: String, + pub labels_information: Vec, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct UpdateContractEventResponse { + pub status: ContractUpdationStatusEventResponse, +} + +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +impl TryFrom<&ir_client::contract_routing_client::UpdateContractResponse> + for UpdateContractEventResponse +{ + type Error = errors::RoutingError; + + fn try_from( + value: &ir_client::contract_routing_client::UpdateContractResponse, + ) -> Result { + Ok(Self { + status: match value.status { + 0 => ContractUpdationStatusEventResponse::ContractUpdationSucceeded, + 1 => ContractUpdationStatusEventResponse::ContractUpdationFailed, + _ => { + return Err(errors::RoutingError::GenericNotFoundError { + field: "unknown updation status from dynamic routing service".to_string(), + }) + } + }, + }) + } +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ContractUpdationStatusEventResponse { + ContractUpdationSucceeded, + ContractUpdationFailed, +} diff --git a/crates/router/src/core/routing/helpers.rs b/crates/router/src/core/routing/helpers.rs index c342f11d55..1dd965a71c 100644 --- a/crates/router/src/core/routing/helpers.rs +++ b/crates/router/src/core/routing/helpers.rs @@ -1093,78 +1093,110 @@ pub async fn push_metrics_with_update_window_for_success_based_routing( .attach_printable("Unable to push dynamic routing stats to db")?; }; - let label_with_status = routing_types::UpdateLabelWithStatusEventRequest { + let label_with_status = routing_utils::UpdateLabelWithStatusEventRequest { label: routable_connector.clone().to_string(), status: payment_status_attribute == common_enums::AttemptStatus::Charged, }; - let event_request = routing_types::UpdateSuccessRateWindowEventRequest { + let event_request = routing_utils::UpdateSuccessRateWindowEventRequest { id: payment_attempt.profile_id.get_string_repr().to_string(), params: success_based_routing_config_params.clone(), labels_with_status: vec![label_with_status.clone()], global_labels_with_status: vec![label_with_status], - config: success_based_routing_configs.config.as_ref().map(|conf| { - routing_types::UpdateSuccessRateWindowConfig { - max_aggregates_size: conf.max_aggregates_size, - current_block_threshold: conf.current_block_threshold.clone(), - } - }), + config: success_based_routing_configs + .config + .as_ref() + .map(routing_utils::UpdateSuccessRateWindowConfig::from), }; - let serialized_request = serde_json::to_value(&event_request) - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("unable to serialize success_based_routing_config_params")?; - - let mut routing_event = routing_events::RoutingEvent::new( + let routing_events_wrapper = routing_utils::RoutingEventsWrapper::new( state.tenant.tenant_id.clone(), - "".to_string(), - "Intelligent-router UpdateSuccessRateWindow", - serialized_request, - "SuccessRateCalculator.UpdateSuccessRateWindow".to_string(), - routing_events::ApiMethod::Grpc, + state.request_id, payment_attempt.payment_id.get_string_repr().to_string(), profile_id.to_owned(), payment_attempt.merchant_id.to_owned(), - state.request_id, - routing_events::RoutingEngine::IntelligentRouter, + "IntelligentRouter: UpdateSuccessRateWindow".to_string(), + Some(event_request.clone()), + true, + false, ); - let update_response = client - .update_success_rate( - profile_id.get_string_repr().into(), - success_based_routing_configs, - success_based_routing_config_params, - vec![routing_types::RoutableConnectorChoiceWithStatus::new( - routable_connector.clone(), - payment_status_attribute == common_enums::AttemptStatus::Charged, - )], - state.get_grpc_headers(), + let closure = || async { + let update_response_result = client + .update_success_rate( + profile_id.get_string_repr().into(), + success_based_routing_configs, + success_based_routing_config_params, + vec![routing_types::RoutableConnectorChoiceWithStatus::new( + routable_connector.clone(), + payment_status_attribute == common_enums::AttemptStatus::Charged, + )], + state.get_grpc_headers(), + ) + .await + .change_context(errors::RoutingError::SuccessRateCalculationError) + .attach_printable( + "unable to update success based routing window in dynamic routing service", + ); + + match update_response_result { + Ok(update_response) => { + let updated_resp = + routing_utils::UpdateSuccessRateWindowEventResponse::try_from( + &update_response, + ) + .change_context(errors::RoutingError::RoutingEventsError { message: "Unable to convert to UpdateSuccessRateWindowEventResponse from UpdateSuccessRateWindowResponse".to_string(), status_code: 500 })?; + Ok(Some(updated_resp)) + } + Err(err) => { + logger::error!( + "unable to update connector score in dynamic routing service: {:?}", + err.current_context() + ); + + Err(err) + } + } + }; + + let events_response = routing_events_wrapper + .construct_event_builder( + "SuccessRateCalculator.UpdateSuccessRateWindow".to_string(), + routing_events::RoutingEngine::IntelligentRouter, + routing_events::ApiMethod::Grpc, ) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable( + "SR-Intelligent-Router: Failed to update success rate in Intelligent-Router", + )? + .trigger_event(state, closure) .await - .inspect_err(|e| { - routing_event - .set_error(serde_json::json!({"error": e.current_context().to_string()})); - state.event_handler().log_event(&routing_event); + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable( + "SR-Intelligent-Router: Failed to update success rate in Intelligent-Router", + )?; + + let _response: routing_utils::UpdateSuccessRateWindowEventResponse = events_response + .response + .ok_or(errors::ApiErrorResponse::InternalServerError) + .attach_printable( + "UpdateSuccessRateWindowEventResponse not found in RoutingEventResponse", + )?; + + let mut routing_event = events_response + .event + .ok_or(errors::RoutingError::RoutingEventsError { + message: + "SR-Intelligent-Router: RoutingEvent not found in RoutingEventsResponse" + .to_string(), + status_code: 500, }) .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable( - "unable to update success based routing window in dynamic routing service", + "SR-Intelligent-Router: RoutingEvent not found in RoutingEventsResponse", )?; - let event_response = routing_types::UpdateSuccessRateWindowEventResponse { - status: match update_response.status { - 0 => routing_types::UpdationStatusEventResponse::WindowUpdationSucceeded, - 1 => routing_types::UpdationStatusEventResponse::WindowUpdationFailed, - _ => { - return Err(errors::ApiErrorResponse::InternalServerError) - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("unknown status code from dynamic routing service") - } - }, - }; - - routing_event.set_response_body(&event_response); routing_event.set_status_code(200); - routing_event.set_payment_connector(routable_connector); + routing_event.set_payment_connector(routable_connector); // we can do this inside the event wrap by implementing an interface on the req type state.event_handler().log_event(&routing_event); Ok(()) @@ -1248,45 +1280,35 @@ pub async fn update_window_for_elimination_routing( gsm_error_category.to_string(), )]; - let event_request = routing_types::UpdateEliminationBucketEventRequest { + let event_request = routing_utils::UpdateEliminationBucketEventRequest { id: profile_id.get_string_repr().to_string(), params: elimination_routing_config_params.clone(), labels_with_bucket_name: labels_with_bucket_name .iter() - .map( - |conn_choice| routing_types::LabelWithBucketNameEventRequest { - label: conn_choice.routable_connector_choice.to_string(), - bucket_name: conn_choice.bucket_name.clone(), - }, - ) + .map(|conn_choice| { + routing_utils::LabelWithBucketNameEventRequest::from(conn_choice) + }) .collect(), config: elimination_routing_config .elimination_analyser_config - .map(|conf| routing_types::EliminationRoutingEventBucketConfig { - bucket_leak_interval_in_secs: conf.bucket_leak_interval_in_secs, - bucket_size: conf.bucket_size, - }), + .as_ref() + .map(routing_utils::EliminationRoutingEventBucketConfig::from), }; - let serialized_request = serde_json::to_value(&event_request) - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("unable to serialize success_based_routing_config_params")?; - - let mut routing_event = routing_events::RoutingEvent::new( + let routing_events_wrapper = routing_utils::RoutingEventsWrapper::new( state.tenant.tenant_id.clone(), - "".to_string(), - "Intelligent-router UpdateEliminationBucket", - serialized_request, - "EliminationAnalyser.UpdateEliminationBucket".to_string(), - routing_events::ApiMethod::Grpc, + state.request_id, payment_attempt.payment_id.get_string_repr().to_string(), profile_id.to_owned(), payment_attempt.merchant_id.to_owned(), - state.request_id, - routing_events::RoutingEngine::IntelligentRouter, + "IntelligentRouter: UpdateEliminationBucket".to_string(), + Some(event_request.clone()), + true, + false, ); - let update_response = client + let closure = || async { + let update_response_result = client .update_elimination_bucket_config( profile_id.get_string_repr().to_string(), elimination_routing_config_params, @@ -1295,29 +1317,58 @@ pub async fn update_window_for_elimination_routing( state.get_grpc_headers(), ) .await - .inspect_err(|e| { - routing_event - .set_error(serde_json::json!({"error": e.current_context().to_string()})); - state.event_handler().log_event(&routing_event); - }) - .change_context(errors::ApiErrorResponse::InternalServerError) + .change_context(errors::RoutingError::EliminationRoutingCalculationError) .attach_printable( "unable to update elimination based routing buckets in dynamic routing service", + ); + + match update_response_result { + Ok(resp) => { + let updated_resp = + routing_utils::UpdateEliminationBucketEventResponse::try_from(&resp) + .change_context(errors::RoutingError::RoutingEventsError { message: "Unable to convert to UpdateEliminationBucketEventResponse from UpdateEliminationBucketResponse".to_string(), status_code: 500 })?; + + Ok(Some(updated_resp)) + } + Err(err) => { + logger::error!( + "unable to update elimination score in dynamic routing service: {:?}", + err.current_context() + ); + + Err(err) + } + } + }; + + let events_response = routing_events_wrapper.construct_event_builder( "EliminationAnalyser.UpdateEliminationBucket".to_string(), + routing_events::RoutingEngine::IntelligentRouter, + routing_events::ApiMethod::Grpc) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Elimination-Intelligent-Router: Failed to update elimination bucket in Intelligent-Router")? + .trigger_event(state, closure) + .await + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Elimination-Intelligent-Router: Failed to update elimination bucket in Intelligent-Router")?; + + let _response: routing_utils::UpdateEliminationBucketEventResponse = events_response + .response + .ok_or(errors::ApiErrorResponse::InternalServerError) + .attach_printable( + "UpdateEliminationBucketEventResponse not found in RoutingEventResponse", )?; - let event_response = routing_types::UpdateEliminationBucketEventResponse { - status: match update_response.status { - 0 => routing_types::EliminationUpdationStatusEventResponse::BucketUpdationSucceeded, - 1 => routing_types::EliminationUpdationStatusEventResponse::BucketUpdationFailed, - _ => { - return Err(errors::ApiErrorResponse::InternalServerError) - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("unknown status code from dynamic routing service") - } - }, - }; + let mut routing_event = events_response + .event + .ok_or(errors::RoutingError::RoutingEventsError { + message: + "Elimination-Intelligent-Router: RoutingEvent not found in RoutingEventsResponse" + .to_string(), + status_code: 500, + }) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Elimination-Intelligent-Router: RoutingEvent not found in RoutingEventsResponse")?; - routing_event.set_response_body(&event_response); routing_event.set_status_code(200); routing_event.set_payment_connector(routing_types::RoutableConnectorChoice { choice_kind: api_models::routing::RoutableChoiceKind::FullStruct, @@ -1425,36 +1476,30 @@ pub async fn push_metrics_with_update_window_for_contract_based_routing( get_desired_payment_status_for_dynamic_routing_metrics(payment_attempt.status); if payment_status_attribute == common_enums::AttemptStatus::Charged { - let event_request = routing_types::UpdateContractRequestEventRequest { + let event_request = routing_utils::UpdateContractRequestEventRequest { id: profile_id.get_string_repr().to_string(), params: "".to_string(), - labels_information: vec![routing_types::ContractLabelInformationEventRequest { - label: request_label_info.label.clone(), - target_count: request_label_info.target_count, - target_time: request_label_info.target_time, - current_count: 1, - }], + labels_information: vec![ + routing_utils::ContractLabelInformationEventRequest::from( + &request_label_info, + ), + ], }; - let serialized_request = serde_json::to_value(&event_request) - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("unable to serialize success_based_routing_config_params")?; - - let mut routing_event = routing_events::RoutingEvent::new( + let routing_events_wrapper = routing_utils::RoutingEventsWrapper::new( state.tenant.tenant_id.clone(), - "".to_string(), - "Intelligent-router UpdateContract", - serialized_request, - "ContractScoreCalculator.UpdateContract".to_string(), - routing_events::ApiMethod::Grpc, + state.request_id, payment_attempt.payment_id.get_string_repr().to_string(), profile_id.to_owned(), payment_attempt.merchant_id.to_owned(), - state.request_id, - routing_events::RoutingEngine::IntelligentRouter, + "IntelligentRouter: UpdateContractScore".to_string(), + Some(event_request.clone()), + true, + false, ); - let update_response = client + let closure = || async { + let update_response_result = client .update_contracts( profile_id.get_string_repr().into(), vec![request_label_info], @@ -1464,30 +1509,60 @@ pub async fn push_metrics_with_update_window_for_contract_based_routing( state.get_grpc_headers(), ) .await - .inspect_err(|e| { - routing_event.set_error( - serde_json::json!({"error": e.current_context().to_string()}), - ); - state.event_handler().log_event(&routing_event); - }) .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable( "unable to update contract based routing window in dynamic routing service", + ); + + match update_response_result { + Ok(resp) => { + let updated_resp = + routing_utils::UpdateContractEventResponse::try_from(&resp) + .change_context(errors::RoutingError::RoutingEventsError { message: "Unable to convert to UpdateContractEventResponse from UpdateContractResponse".to_string(), status_code: 500 })?; + Ok(Some(updated_resp)) + } + Err(err) => { + logger::error!( + "unable to update elimination score in dynamic routing service: {:?}", + err.current_context() + ); + + // have to refactor errors + Err(error_stack::report!( + errors::RoutingError::ContractScoreUpdationError + )) + } + } + }; + + let events_response = routing_events_wrapper.construct_event_builder( "ContractScoreCalculator.UpdateContract".to_string(), + routing_events::RoutingEngine::IntelligentRouter, + routing_events::ApiMethod::Grpc) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("ContractRouting-Intelligent-Router: Failed to contruct RoutingEventsBuilder")? + .trigger_event(state, closure) + .await + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("ContractRouting-Intelligent-Router: Failed to update contract scores in Intelligent-Router")?; + + let _response: routing_utils::UpdateContractEventResponse = events_response + .response + .ok_or(errors::ApiErrorResponse::InternalServerError) + .attach_printable( + "UpdateContractEventResponse not found in RoutingEventResponse", )?; - let event_response = routing_types::UpdateContractEventResponse { - status: match update_response.status { - 0 => routing_types::ContractUpdationStatusEventResponse::ContractUpdationSucceeded, - 1 => routing_types::ContractUpdationStatusEventResponse::ContractUpdationFailed, - _ => { - return Err(errors::ApiErrorResponse::InternalServerError) - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("unknown status code from dynamic routing service") - } - }, - }; + let mut routing_event = events_response + .event + .ok_or(errors::RoutingError::RoutingEventsError { + message: + "ContractRouting-Intelligent-Router: RoutingEvent not found in RoutingEventsResponse" + .to_string(), + status_code: 500, + }) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("ContractRouting-Intelligent-Router: RoutingEvent not found in RoutingEventsResponse")?; - routing_event.set_response_body(&event_response); routing_event.set_payment_connector(routing_types::RoutableConnectorChoice { choice_kind: api_models::routing::RoutableChoiceKind::FullStruct, connector: common_enums::RoutableConnectors::from_str( @@ -2271,6 +2346,7 @@ pub async fn enable_decision_engine_dynamic_routing_setup( DECISION_ENGINE_RULE_CREATE_ENDPOINT, Some(default_engine_config_request), None, + None, ) .await .change_context(errors::ApiErrorResponse::InternalServerError) @@ -2348,6 +2424,7 @@ pub async fn update_decision_engine_dynamic_routing_setup( DECISION_ENGINE_RULE_UPDATE_ENDPOINT, Some(decision_engine_request), None, + None, ) .await .change_context(errors::ApiErrorResponse::InternalServerError) @@ -2397,6 +2474,7 @@ pub async fn disable_decision_engine_dynamic_routing_setup( DECISION_ENGINE_RULE_DELETE_ENDPOINT, Some(decision_engine_request), None, + None, ) .await .change_context(errors::ApiErrorResponse::InternalServerError) @@ -2447,6 +2525,7 @@ pub async fn create_decision_engine_merchant( DECISION_ENGINE_MERCHANT_CREATE_ENDPOINT, Some(merchant_account_req), None, + None, ) .await .change_context(errors::ApiErrorResponse::InternalServerError) @@ -2472,6 +2551,7 @@ pub async fn delete_decision_engine_merchant( &path, None, None, + None, ) .await .change_context(errors::ApiErrorResponse::InternalServerError)