From 809c92bdcb07a89f37dfdceecc7b72e75e8d1343 Mon Sep 17 00:00:00 2001 From: Prajjwal Kumar Date: Thu, 26 Sep 2024 12:47:00 +0530 Subject: [PATCH] feat(routing): success based routing metrics (#5951) Co-authored-by: Aprabhat19 Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Co-authored-by: Amisha Prabhat <55580080+Aprabhat19@users.noreply.github.com> --- crates/api_models/src/routing.rs | 10 + crates/common_enums/src/enums.rs | 22 ++ crates/router/Cargo.toml | 2 +- crates/router/src/core/metrics.rs | 1 + crates/router/src/core/payments.rs | 21 ++ crates/router/src/core/payments/operations.rs | 6 + .../payments/operations/payment_response.rs | 112 ++++++ crates/router/src/core/routing.rs | 26 +- crates/router/src/core/routing/helpers.rs | 356 +++++++++++++++++- crates/router/src/routes/routing.rs | 7 - crates/router/src/types/api.rs | 29 +- crates/storage_impl/Cargo.toml | 1 + crates/storage_impl/src/redis/cache.rs | 11 + crates/storage_impl/src/redis/pub_sub.rs | 18 +- 14 files changed, 601 insertions(+), 21 deletions(-) diff --git a/crates/api_models/src/routing.rs b/crates/api_models/src/routing.rs index 2b551eaebb..fc65ab037c 100644 --- a/crates/api_models/src/routing.rs +++ b/crates/api_models/src/routing.rs @@ -259,6 +259,16 @@ pub struct RoutableConnectorChoiceWithStatus { pub routable_connector_choice: RoutableConnectorChoice, pub status: bool, } + +impl RoutableConnectorChoiceWithStatus { + pub fn new(routable_connector_choice: RoutableConnectorChoice, status: bool) -> Self { + Self { + routable_connector_choice, + status, + } + } +} + #[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize, strum::Display, ToSchema)] #[serde(rename_all = "snake_case")] #[strum(serialize_all = "snake_case")] diff --git a/crates/common_enums/src/enums.rs b/crates/common_enums/src/enums.rs index a9ece7e9c0..126df9269d 100644 --- a/crates/common_enums/src/enums.rs +++ b/crates/common_enums/src/enums.rs @@ -3219,6 +3219,28 @@ pub enum DeleteStatus { Redacted, } +#[derive( + Clone, Copy, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize, strum::Display, Hash, +)] +#[serde(rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] +pub enum SuccessBasedRoutingConclusiveState { + // pc: payment connector + // sc: success based routing outcome/first connector + // status: payment status + // + // status = success && pc == sc + TruePositive, + // status = failed && pc == sc + FalsePositive, + // status = failed && pc != sc + TrueNegative, + // status = success && pc != sc + FalseNegative, + // status = processing + NonDeterministic, +} + /// Whether 3ds authentication is requested or not #[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize, Default, ToSchema)] pub enum External3dsAuthenticationRequest { diff --git a/crates/router/Cargo.toml b/crates/router/Cargo.toml index 83d159f6d2..7159f34e4b 100644 --- a/crates/router/Cargo.toml +++ b/crates/router/Cargo.toml @@ -38,7 +38,7 @@ v1 = ["common_default", "api_models/v1", "diesel_models/v1", "hyperswitch_domain customer_v2 = ["api_models/customer_v2", "diesel_models/customer_v2", "hyperswitch_domain_models/customer_v2", "storage_impl/customer_v2"] payment_v2 = ["api_models/payment_v2", "diesel_models/payment_v2", "hyperswitch_domain_models/payment_v2", "storage_impl/payment_v2"] payment_methods_v2 = ["api_models/payment_methods_v2", "diesel_models/payment_methods_v2", "hyperswitch_domain_models/payment_methods_v2", "storage_impl/payment_methods_v2", "common_utils/payment_methods_v2"] -dynamic_routing = ["external_services/dynamic_routing"] +dynamic_routing = ["external_services/dynamic_routing", "storage_impl/dynamic_routing"] # Partial Auth # The feature reduces the overhead of the router authenticating the merchant for every request, and trusts on `x-merchant-id` header to be present in the request. diff --git a/crates/router/src/core/metrics.rs b/crates/router/src/core/metrics.rs index 7cdfc6e6ea..8956c38e7e 100644 --- a/crates/router/src/core/metrics.rs +++ b/crates/router/src/core/metrics.rs @@ -83,6 +83,7 @@ counter_metric!( ROUTING_RETRIEVE_CONFIG_FOR_PROFILE_SUCCESS_RESPONSE, GLOBAL_METER ); +counter_metric!(DYNAMIC_SUCCESS_BASED_ROUTING, GLOBAL_METER); #[cfg(feature = "partial-auth")] counter_metric!(PARTIAL_AUTH_FAILURE, GLOBAL_METER); diff --git a/crates/router/src/core/payments.rs b/crates/router/src/core/payments.rs index 1cf1264521..4bfca852fb 100644 --- a/crates/router/src/core/payments.rs +++ b/crates/router/src/core/payments.rs @@ -70,6 +70,8 @@ use super::{ }; #[cfg(feature = "frm")] use crate::core::fraud_check as frm_core; +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +use crate::types::api::convert_connector_data_to_routable_connectors; use crate::{ configs::settings::{ApplePayPreDecryptFlow, PaymentMethodTypeTokenFilter}, connector::utils::missing_field_err, @@ -294,6 +296,11 @@ where }; payment_data = match connector_details { ConnectorCallType::PreDetermined(connector) => { + #[cfg(all(feature = "dynamic_routing", feature = "v1"))] + let routable_connectors = + convert_connector_data_to_routable_connectors(&[connector.clone()]) + .map_err(|e| logger::error!(routable_connector_error=?e)) + .unwrap_or_default(); let schedule_time = if should_add_task_to_process_tracker { payment_sync::get_sync_process_schedule_time( &*state.store, @@ -361,6 +368,10 @@ where &key_store, merchant_account.storage_scheme, &locale, + #[cfg(all(feature = "dynamic_routing", feature = "v1"))] + routable_connectors, + #[cfg(all(feature = "dynamic_routing", feature = "v1"))] + &business_profile, ) .await?; @@ -383,6 +394,12 @@ where } ConnectorCallType::Retryable(connectors) => { + #[cfg(all(feature = "dynamic_routing", feature = "v1"))] + let routable_connectors = + convert_connector_data_to_routable_connectors(&connectors) + .map_err(|e| logger::error!(routable_connector_error=?e)) + .unwrap_or_default(); + let mut connectors = connectors.into_iter(); let connector_data = get_connector_data(&mut connectors)?; @@ -486,6 +503,10 @@ where &key_store, merchant_account.storage_scheme, &locale, + #[cfg(all(feature = "dynamic_routing", feature = "v1"))] + routable_connectors, + #[cfg(all(feature = "dynamic_routing", feature = "v1"))] + &business_profile, ) .await?; diff --git a/crates/router/src/core/payments/operations.rs b/crates/router/src/core/payments/operations.rs index 5199583eb5..f91ef52fde 100644 --- a/crates/router/src/core/payments/operations.rs +++ b/crates/router/src/core/payments/operations.rs @@ -27,6 +27,8 @@ pub mod payments_incremental_authorization; pub mod tax_calculation; use api_models::enums::FrmSuggestion; +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +use api_models::routing::RoutableConnectorChoice; use async_trait::async_trait; use error_stack::{report, ResultExt}; use router_env::{instrument, tracing}; @@ -263,6 +265,10 @@ pub trait PostUpdateTracker: Send { key_store: &domain::MerchantKeyStore, storage_scheme: enums::MerchantStorageScheme, locale: &Option, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] routable_connector: Vec< + RoutableConnectorChoice, + >, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] business_profile: &domain::Profile, ) -> RouterResult where F: 'b + Send + Sync; diff --git a/crates/router/src/core/payments/operations/payment_response.rs b/crates/router/src/core/payments/operations/payment_response.rs index 538f6d70c4..fc54ba5e64 100644 --- a/crates/router/src/core/payments/operations/payment_response.rs +++ b/crates/router/src/core/payments/operations/payment_response.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +use api_models::routing::RoutableConnectorChoice; use async_trait::async_trait; use common_enums::AuthorizationStatus; use common_utils::{ @@ -15,6 +17,8 @@ use storage_impl::DataModelExt; use tracing_futures::Instrument; use super::{Operation, OperationSessionSetters, PostUpdateTracker}; +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +use crate::core::routing::helpers::push_metrics_for_success_based_routing; use crate::{ connector::utils::PaymentResponseRouterData, consts, @@ -72,6 +76,10 @@ impl PostUpdateTracker, types::PaymentsAuthor key_store: &domain::MerchantKeyStore, storage_scheme: enums::MerchantStorageScheme, locale: &Option, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] routable_connector: Vec< + RoutableConnectorChoice, + >, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] business_profile: &domain::Profile, ) -> RouterResult> where F: 'b, @@ -88,6 +96,10 @@ impl PostUpdateTracker, types::PaymentsAuthor key_store, storage_scheme, locale, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + routable_connector, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + business_profile, )) .await?; @@ -345,6 +357,11 @@ impl PostUpdateTracker, types::PaymentsIncrementalAu key_store: &domain::MerchantKeyStore, storage_scheme: enums::MerchantStorageScheme, _locale: &Option, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] _routable_connector: Vec< + RoutableConnectorChoice, + >, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + _business_profile: &domain::Profile, ) -> RouterResult> where F: 'b + Send, @@ -496,6 +513,10 @@ impl PostUpdateTracker, types::PaymentsSyncData> for key_store: &domain::MerchantKeyStore, storage_scheme: enums::MerchantStorageScheme, locale: &Option, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] routable_connector: Vec< + RoutableConnectorChoice, + >, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] business_profile: &domain::Profile, ) -> RouterResult> where F: 'b + Send, @@ -508,6 +529,10 @@ impl PostUpdateTracker, types::PaymentsSyncData> for key_store, storage_scheme, locale, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + routable_connector, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + business_profile, )) .await } @@ -552,6 +577,10 @@ impl PostUpdateTracker, types::PaymentsSessionData> key_store: &domain::MerchantKeyStore, storage_scheme: enums::MerchantStorageScheme, locale: &Option, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] routable_connector: Vec< + RoutableConnectorChoice, + >, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] business_profile: &domain::Profile, ) -> RouterResult> where F: 'b + Send, @@ -564,6 +593,10 @@ impl PostUpdateTracker, types::PaymentsSessionData> key_store, storage_scheme, locale, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + routable_connector, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + business_profile, )) .await?; @@ -589,6 +622,11 @@ impl PostUpdateTracker, types::SdkPaymentsSessionUpd _key_store: &domain::MerchantKeyStore, _storage_scheme: enums::MerchantStorageScheme, _locale: &Option, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] _routable_connector: Vec< + RoutableConnectorChoice, + >, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + _business_profile: &domain::Profile, ) -> RouterResult> where F: 'b + Send, @@ -659,6 +697,10 @@ impl PostUpdateTracker, types::PaymentsCaptureData> key_store: &domain::MerchantKeyStore, storage_scheme: enums::MerchantStorageScheme, locale: &Option, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] routable_connector: Vec< + RoutableConnectorChoice, + >, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] business_profile: &domain::Profile, ) -> RouterResult> where F: 'b + Send, @@ -671,6 +713,10 @@ impl PostUpdateTracker, types::PaymentsCaptureData> key_store, storage_scheme, locale, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + routable_connector, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + business_profile, )) .await?; @@ -690,6 +736,10 @@ impl PostUpdateTracker, types::PaymentsCancelData> f key_store: &domain::MerchantKeyStore, storage_scheme: enums::MerchantStorageScheme, locale: &Option, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] routable_connector: Vec< + RoutableConnectorChoice, + >, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] business_profile: &domain::Profile, ) -> RouterResult> where F: 'b + Send, @@ -702,6 +752,10 @@ impl PostUpdateTracker, types::PaymentsCancelData> f key_store, storage_scheme, locale, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + routable_connector, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + business_profile, )) .await?; @@ -723,6 +777,10 @@ impl PostUpdateTracker, types::PaymentsApproveData> key_store: &domain::MerchantKeyStore, storage_scheme: enums::MerchantStorageScheme, locale: &Option, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] routable_connector: Vec< + RoutableConnectorChoice, + >, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] business_profile: &domain::Profile, ) -> RouterResult> where F: 'b + Send, @@ -735,6 +793,10 @@ impl PostUpdateTracker, types::PaymentsApproveData> key_store, storage_scheme, locale, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + routable_connector, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + business_profile, )) .await?; @@ -754,6 +816,10 @@ impl PostUpdateTracker, types::PaymentsRejectData> f key_store: &domain::MerchantKeyStore, storage_scheme: enums::MerchantStorageScheme, locale: &Option, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] routable_connector: Vec< + RoutableConnectorChoice, + >, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] business_profile: &domain::Profile, ) -> RouterResult> where F: 'b + Send, @@ -766,6 +832,10 @@ impl PostUpdateTracker, types::PaymentsRejectData> f key_store, storage_scheme, locale, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + routable_connector, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + business_profile, )) .await?; @@ -791,6 +861,10 @@ impl PostUpdateTracker, types::SetupMandateRequestDa key_store: &domain::MerchantKeyStore, storage_scheme: enums::MerchantStorageScheme, locale: &Option, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] routable_connector: Vec< + RoutableConnectorChoice, + >, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] business_profile: &domain::Profile, ) -> RouterResult> where F: 'b + Send, @@ -808,6 +882,10 @@ impl PostUpdateTracker, types::SetupMandateRequestDa key_store, storage_scheme, locale, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + routable_connector, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + business_profile, )) .await?; @@ -894,6 +972,10 @@ impl PostUpdateTracker, types::CompleteAuthorizeData key_store: &domain::MerchantKeyStore, storage_scheme: enums::MerchantStorageScheme, locale: &Option, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] routable_connector: Vec< + RoutableConnectorChoice, + >, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] business_profile: &domain::Profile, ) -> RouterResult> where F: 'b + Send, @@ -906,6 +988,10 @@ impl PostUpdateTracker, types::CompleteAuthorizeData key_store, storage_scheme, locale, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + routable_connector, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + business_profile, )) .await } @@ -952,6 +1038,7 @@ async fn payment_response_update_tracker( #[cfg(feature = "v1")] #[instrument(skip_all)] +#[allow(clippy::too_many_arguments)] async fn payment_response_update_tracker( state: &SessionState, _payment_id: &api::PaymentIdType, @@ -960,8 +1047,13 @@ async fn payment_response_update_tracker( key_store: &domain::MerchantKeyStore, storage_scheme: enums::MerchantStorageScheme, locale: &Option, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] routable_connectors: Vec< + RoutableConnectorChoice, + >, + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] business_profile: &domain::Profile, ) -> RouterResult> { // Update additional payment data with the payment method response that we received from connector + let additional_payment_method_data = match payment_data.payment_method_data.clone() { Some(payment_method_data) => match payment_method_data { hyperswitch_domain_models::payment_method_data::PaymentMethodData::Card(_) @@ -1616,6 +1708,26 @@ async fn payment_response_update_tracker( utils::flatten_join_error(payment_attempt_fut) )?; + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + { + let state = state.clone(); + let business_profile = business_profile.clone(); + let payment_attempt = payment_attempt.clone(); + tokio::spawn( + async move { + push_metrics_for_success_based_routing( + &state, + &payment_attempt, + routable_connectors, + &business_profile, + ) + .await + .map_err(|e| logger::error!(dynamic_routing_metrics_error=?e)) + .ok(); + } + .in_current_span(), + ); + } payment_data.payment_intent = payment_intent; payment_data.payment_attempt = payment_attempt; router_data.payment_method_status.and_then(|status| { diff --git a/crates/router/src/core/routing.rs b/crates/router/src/core/routing.rs index a1ba53bcd7..4347b1418e 100644 --- a/crates/router/src/core/routing.rs +++ b/crates/router/src/core/routing.rs @@ -8,8 +8,12 @@ use api_models::{ use diesel_models::routing_algorithm::RoutingAlgorithm; use error_stack::ResultExt; use hyperswitch_domain_models::{mandates, payment_address}; +#[cfg(feature = "v1")] +use router_env::logger; use router_env::metrics::add_attributes; use rustc_hash::FxHashSet; +#[cfg(feature = "v1")] +use storage_impl::redis::cache; #[cfg(feature = "payouts")] use super::payouts; @@ -34,6 +38,7 @@ use crate::{ }, utils::{self, OptionExt}, }; + pub enum TransactionData<'a> { Payment(PaymentsDslInput<'a>), #[cfg(feature = "payouts")] @@ -1298,6 +1303,7 @@ pub async fn success_based_routing_update_configs( &add_attributes([("profile_id", profile_id.get_string_repr().to_owned())]), ); let db = state.store.as_ref(); + let dynamic_routing_algo_to_update = db .find_routing_algorithm_by_profile_id_algorithm_id(&profile_id, &algorithm_id) .await @@ -1311,10 +1317,10 @@ pub async fn success_based_routing_update_configs( config_to_update.update(request); - let algorithm_id = common_utils::generate_routing_id_of_default_length(); + let updated_algorithm_id = common_utils::generate_routing_id_of_default_length(); let timestamp = common_utils::date_time::now(); let algo = RoutingAlgorithm { - algorithm_id, + algorithm_id: updated_algorithm_id, profile_id: dynamic_routing_algo_to_update.profile_id, merchant_id: dynamic_routing_algo_to_update.merchant_id, name: dynamic_routing_algo_to_update.name, @@ -1331,6 +1337,22 @@ pub async fn success_based_routing_update_configs( .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Unable to insert record in routing algorithm table")?; + // redact cache for success based routing configs + let cache_key = format!( + "{}_{}", + profile_id.get_string_repr(), + algorithm_id.get_string_repr() + ); + let cache_entries_to_redact = vec![cache::CacheKind::SuccessBasedDynamicRoutingCache( + cache_key.into(), + )]; + let _ = cache::publish_into_redact_channel( + state.store.get_cache_store().as_ref(), + cache_entries_to_redact, + ) + .await + .map_err(|e| logger::error!("unable to redact the success based routing config cache {e:?}")); + let new_record = record.foreign_into(); metrics::ROUTING_UPDATE_CONFIG_FOR_PROFILE_SUCCESS_RESPONSE.add( diff --git a/crates/router/src/core/routing/helpers.rs b/crates/router/src/core/routing/helpers.rs index f60a6589d5..ef62119786 100644 --- a/crates/router/src/core/routing/helpers.rs +++ b/crates/router/src/core/routing/helpers.rs @@ -2,10 +2,21 @@ //! //! Functions that are used to perform the retrieval of merchant's //! routing dict, configs, defaults +#[cfg(all(feature = "dynamic_routing", feature = "v1"))] +use std::str::FromStr; +#[cfg(any(feature = "dynamic_routing", feature = "v1"))] +use std::sync::Arc; + use api_models::routing as routing_types; -use common_utils::{ext_traits::Encode, types::keymanager::KeyManagerState}; +#[cfg(all(feature = "dynamic_routing", feature = "v1"))] +use common_utils::ext_traits::ValueExt; +use common_utils::{ext_traits::Encode, id_type, types::keymanager::KeyManagerState}; use diesel_models::configs; use error_stack::ResultExt; +#[cfg(feature = "dynamic_routing")] +use external_services::grpc_client::dynamic_routing::SuccessBasedDynamicRouting; +#[cfg(all(feature = "dynamic_routing", feature = "v1"))] +use router_env::{instrument, metrics::add_attributes, tracing}; use rustc_hash::FxHashSet; use storage_impl::redis::cache; @@ -18,6 +29,8 @@ use crate::{ types::{domain, storage}, utils::StringExt, }; +#[cfg(all(feature = "dynamic_routing", feature = "v1"))] +use crate::{core::metrics as core_metrics, routes::metrics}; /// Provides us with all the configured configs of the Merchant in the ascending time configured /// manner and chooses the first of them @@ -273,10 +286,7 @@ pub struct RoutingAlgorithmHelpers<'h> { #[derive(Clone, Debug)] pub struct ConnectNameAndMCAIdForProfile<'a>( - pub FxHashSet<( - &'a String, - common_utils::id_type::MerchantConnectorAccountId, - )>, + pub FxHashSet<(&'a String, id_type::MerchantConnectorAccountId)>, ); #[derive(Clone, Debug)] pub struct ConnectNameForProfile<'a>(pub FxHashSet<&'a String>); @@ -288,7 +298,7 @@ pub struct MerchantConnectorAccounts(pub Vec); #[cfg(feature = "v2")] impl MerchantConnectorAccounts { pub async fn get_all_mcas( - merchant_id: &common_utils::id_type::MerchantId, + merchant_id: &id_type::MerchantId, key_store: &domain::MerchantKeyStore, state: &SessionState, ) -> RouterResult { @@ -327,7 +337,7 @@ impl MerchantConnectorAccounts { pub fn filter_by_profile<'a, T>( &'a self, - profile_id: &'a common_utils::id_type::ProfileId, + profile_id: &'a id_type::ProfileId, func: impl Fn(&'a MerchantConnectorAccount) -> T, ) -> FxHashSet where @@ -422,8 +432,8 @@ impl<'h> RoutingAlgorithmHelpers<'h> { pub async fn validate_connectors_in_routing_config( state: &SessionState, key_store: &domain::MerchantKeyStore, - merchant_id: &common_utils::id_type::MerchantId, - profile_id: &common_utils::id_type::ProfileId, + merchant_id: &id_type::MerchantId, + profile_id: &id_type::ProfileId, routing_algorithm: &routing_types::RoutingAlgorithm, ) -> RouterResult<()> { let all_mcas = &*state @@ -543,3 +553,331 @@ pub fn get_default_config_key( storage::enums::TransactionType::Payout => format!("routing_default_po_{merchant_id}"), } } + +/// Retrieves cached success_based routing configs specific to tenant and profile +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +pub async fn get_cached_success_based_routing_config_for_profile<'a>( + state: &SessionState, + key: &str, +) -> Option> { + cache::SUCCESS_BASED_DYNAMIC_ALGORITHM_CACHE + .get_val::>(cache::CacheKey { + key: key.to_string(), + prefix: state.tenant.redis_key_prefix.clone(), + }) + .await +} + +/// Refreshes the cached success_based routing configs specific to tenant and profile +#[cfg(feature = "v1")] +pub async fn refresh_success_based_routing_cache( + state: &SessionState, + key: &str, + success_based_routing_config: routing_types::SuccessBasedRoutingConfig, +) -> Arc { + let config = Arc::new(success_based_routing_config); + cache::SUCCESS_BASED_DYNAMIC_ALGORITHM_CACHE + .push( + cache::CacheKey { + key: key.to_string(), + prefix: state.tenant.redis_key_prefix.clone(), + }, + config.clone(), + ) + .await; + config +} + +/// Checked fetch of success based routing configs +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +#[instrument(skip_all)] +pub async fn fetch_success_based_routing_configs( + state: &SessionState, + business_profile: &domain::Profile, +) -> RouterResult { + let dynamic_routing_algorithm = business_profile.dynamic_routing_algorithm.clone().ok_or( + errors::ApiErrorResponse::GenericNotFoundError { + message: "unable to find dynamic_routing_algorithm in business profile".to_string(), + }, + )?; + + let dynamic_routing_algorithm_ref = dynamic_routing_algorithm + .parse_value::("DynamicRoutingAlgorithmRef") + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("unable to parse dynamic_routing_algorithm_ref")?; + + let success_based_routing_id = dynamic_routing_algorithm_ref + .success_based_algorithm + .ok_or(errors::ApiErrorResponse::GenericNotFoundError { + message: "success_based_algorithm not found in dynamic_routing_algorithm_ref" + .to_string(), + })? + .algorithm_id + .ok_or(errors::ApiErrorResponse::GenericNotFoundError { + message: "unable to find algorithm id in success based algorithm config".to_string(), + })?; + + let key = format!( + "{}_{}", + business_profile.get_id().get_string_repr(), + success_based_routing_id.get_string_repr() + ); + + if let Some(config) = + get_cached_success_based_routing_config_for_profile(state, key.as_str()).await + { + Ok(config.as_ref().clone()) + } else { + let success_rate_algorithm = state + .store + .find_routing_algorithm_by_profile_id_algorithm_id( + business_profile.get_id(), + &success_based_routing_id, + ) + .await + .change_context(errors::ApiErrorResponse::ResourceIdNotFound) + .attach_printable("unable to retrieve success_rate_algorithm for profile from db")?; + + let success_rate_config = success_rate_algorithm + .algorithm_data + .parse_value::("SuccessBasedRoutingConfig") + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("unable to parse success_based_routing_config struct")?; + + refresh_success_based_routing_cache(state, key.as_str(), success_rate_config.clone()).await; + + Ok(success_rate_config) + } +} + +/// metrics for success based dynamic routing +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +#[instrument(skip_all)] +pub async fn push_metrics_for_success_based_routing( + state: &SessionState, + payment_attempt: &storage::PaymentAttempt, + routable_connectors: Vec, + business_profile: &domain::Profile, +) -> RouterResult<()> { + let client = state + .grpc_client + .dynamic_routing + .success_rate_client + .as_ref() + .ok_or(errors::ApiErrorResponse::GenericNotFoundError { + message: "success_rate gRPC client not found".to_string(), + })?; + + let payment_connector = &payment_attempt.connector.clone().ok_or( + errors::ApiErrorResponse::GenericNotFoundError { + message: "unable to derive payment connector from payment attempt".to_string(), + }, + )?; + + let success_based_routing_configs = + fetch_success_based_routing_configs(state, business_profile) + .await + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("unable to retrieve success_rate configs")?; + + let tenant_business_profile_id = format!( + "{}:{}", + state.tenant.redis_key_prefix, + business_profile.get_id().get_string_repr() + ); + + let success_based_connectors = client + .calculate_success_rate( + tenant_business_profile_id.clone(), + success_based_routing_configs.clone(), + routable_connectors.clone(), + ) + .await + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("unable to calculate/fetch success rate from dynamic routing service")?; + + let payment_status_attribute = + get_desired_payment_status_for_success_routing_metrics(&payment_attempt.status); + + let first_success_based_connector_label = &success_based_connectors + .labels_with_score + .first() + .ok_or(errors::ApiErrorResponse::InternalServerError) + .attach_printable( + "unable to fetch the first connector from list of connectors obtained from dynamic routing service", + )? + .label + .to_string(); + + let (first_success_based_connector, merchant_connector_id) = first_success_based_connector_label + .split_once(':') + .ok_or(errors::ApiErrorResponse::InternalServerError) + .attach_printable( + "unable to split connector_name and mca_id from the first connector obtained from dynamic routing service", + )?; + + let outcome = get_success_based_metrics_outcome_for_payment( + &payment_status_attribute, + payment_connector.to_string(), + first_success_based_connector.to_string(), + ); + + core_metrics::DYNAMIC_SUCCESS_BASED_ROUTING.add( + &metrics::CONTEXT, + 1, + &add_attributes([ + ("tenant", state.tenant.name.clone()), + ( + "merchant_id", + payment_attempt.merchant_id.get_string_repr().to_string(), + ), + ( + "profile_id", + payment_attempt.profile_id.get_string_repr().to_string(), + ), + ("merchant_connector_id", merchant_connector_id.to_string()), + ( + "payment_id", + payment_attempt.payment_id.get_string_repr().to_string(), + ), + ( + "success_based_routing_connector", + first_success_based_connector.to_string(), + ), + ("payment_connector", payment_connector.to_string()), + ( + "currency", + payment_attempt + .currency + .ok_or(errors::ApiErrorResponse::InternalServerError) + .attach_printable("payment currency not found in payment_attempt")? + .to_string(), + ), + ( + "payment_method", + payment_attempt + .payment_method + .ok_or(errors::ApiErrorResponse::InternalServerError) + .attach_printable("payment method not found in payment_attempt")? + .to_string(), + ), + ( + "payment_method_type", + payment_attempt + .payment_method_type + .ok_or(errors::ApiErrorResponse::InternalServerError) + .attach_printable("payment method type not found in payment_attempt")? + .to_string(), + ), + ( + "capture_method", + payment_attempt + .capture_method + .ok_or(errors::ApiErrorResponse::InternalServerError) + .attach_printable("capture method not found in payment_attempt")? + .to_string(), + ), + ( + "authentication_type", + payment_attempt + .authentication_type + .ok_or(errors::ApiErrorResponse::InternalServerError) + .attach_printable("authentication type not found in payment_attempt")? + .to_string(), + ), + ("payment_status", payment_attempt.status.to_string()), + ("conclusive_classification", outcome.to_string()), + ]), + ); + + client + .update_success_rate( + tenant_business_profile_id, + success_based_routing_configs, + vec![routing_types::RoutableConnectorChoiceWithStatus::new( + routing_types::RoutableConnectorChoice { + choice_kind: api_models::routing::RoutableChoiceKind::FullStruct, + connector: common_enums::RoutableConnectors::from_str( + payment_connector.as_str(), + ) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("unable to infer routable_connector from connector")?, + merchant_connector_id: payment_attempt.merchant_connector_id.clone(), + }, + payment_status_attribute == common_enums::AttemptStatus::Charged, + )], + ) + .await + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable( + "unable to update success based routing window in dynamic routing service", + )?; + Ok(()) +} + +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +fn get_desired_payment_status_for_success_routing_metrics( + attempt_status: &common_enums::AttemptStatus, +) -> common_enums::AttemptStatus { + match attempt_status { + common_enums::AttemptStatus::Charged + | common_enums::AttemptStatus::Authorized + | common_enums::AttemptStatus::PartialCharged + | common_enums::AttemptStatus::PartialChargedAndChargeable => { + common_enums::AttemptStatus::Charged + } + common_enums::AttemptStatus::Failure + | common_enums::AttemptStatus::AuthorizationFailed + | common_enums::AttemptStatus::AuthenticationFailed + | common_enums::AttemptStatus::CaptureFailed + | common_enums::AttemptStatus::RouterDeclined => common_enums::AttemptStatus::Failure, + common_enums::AttemptStatus::Started + | common_enums::AttemptStatus::AuthenticationPending + | common_enums::AttemptStatus::AuthenticationSuccessful + | common_enums::AttemptStatus::Authorizing + | common_enums::AttemptStatus::CodInitiated + | common_enums::AttemptStatus::Voided + | common_enums::AttemptStatus::VoidInitiated + | common_enums::AttemptStatus::CaptureInitiated + | common_enums::AttemptStatus::VoidFailed + | common_enums::AttemptStatus::AutoRefunded + | common_enums::AttemptStatus::Unresolved + | common_enums::AttemptStatus::Pending + | common_enums::AttemptStatus::PaymentMethodAwaited + | common_enums::AttemptStatus::ConfirmationAwaited + | common_enums::AttemptStatus::DeviceDataCollectionPending => { + common_enums::AttemptStatus::Pending + } + } +} + +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +fn get_success_based_metrics_outcome_for_payment( + payment_status_attribute: &common_enums::AttemptStatus, + payment_connector: String, + first_success_based_connector: String, +) -> common_enums::SuccessBasedRoutingConclusiveState { + match payment_status_attribute { + common_enums::AttemptStatus::Charged + if *first_success_based_connector == *payment_connector => + { + common_enums::SuccessBasedRoutingConclusiveState::TruePositive + } + common_enums::AttemptStatus::Failure + if *first_success_based_connector == *payment_connector => + { + common_enums::SuccessBasedRoutingConclusiveState::FalsePositive + } + common_enums::AttemptStatus::Failure + if *first_success_based_connector != *payment_connector => + { + common_enums::SuccessBasedRoutingConclusiveState::TrueNegative + } + common_enums::AttemptStatus::Charged + if *first_success_based_connector != *payment_connector => + { + common_enums::SuccessBasedRoutingConclusiveState::FalseNegative + } + _ => common_enums::SuccessBasedRoutingConclusiveState::NonDeterministic, + } +} diff --git a/crates/router/src/routes/routing.rs b/crates/router/src/routes/routing.rs index 70da80c675..8fe543f851 100644 --- a/crates/router/src/routes/routing.rs +++ b/crates/router/src/routes/routing.rs @@ -1009,7 +1009,6 @@ pub async fn toggle_success_based_routing( wrapper.profile_id, ) }, - #[cfg(not(feature = "release"))] auth::auth_type( &auth::HeaderAuth(auth::ApiKeyAuth), &auth::JWTAuthProfileFromRoute { @@ -1019,12 +1018,6 @@ pub async fn toggle_success_based_routing( }, req.headers(), ), - #[cfg(feature = "release")] - &auth::JWTAuthProfileFromRoute { - profile_id: wrapper.profile_id, - required_permission: Permission::RoutingWrite, - minimum_entity_level: EntityType::Merchant, - }, api_locking::LockAction::NotApplicable, )) .await diff --git a/crates/router/src/types/api.rs b/crates/router/src/types/api.rs index dd5e47b100..80ad6da8d2 100644 --- a/crates/router/src/types/api.rs +++ b/crates/router/src/types/api.rs @@ -38,6 +38,8 @@ pub mod refunds_v2; use std::{fmt::Debug, str::FromStr}; +use api_models::routing::{self as api_routing, RoutableConnectorChoice}; +use common_enums::RoutableConnectors; use error_stack::{report, ResultExt}; pub use hyperswitch_domain_models::router_flow_types::{ access_token_auth::AccessTokenAuth, mandate_revoke::MandateRevoke, @@ -58,6 +60,7 @@ pub use self::{ payment_link::*, payment_methods::*, payments::*, poll::*, refunds::*, refunds_v2::*, webhooks::*, }; +use super::transformers::ForeignTryFrom; use crate::{ configs::settings::Connectors, connector, @@ -68,7 +71,6 @@ use crate::{ services::{connector_integration_interface::ConnectorEnum, ConnectorRedirectResponse}, types::{self, api::enums as api_enums}, }; - #[derive(Clone)] pub enum ConnectorCallType { PreDetermined(ConnectorData), @@ -228,6 +230,31 @@ impl SessionConnectorData { } } +pub fn convert_connector_data_to_routable_connectors( + connectors: &[ConnectorData], +) -> CustomResult, common_utils::errors::ValidationError> { + connectors + .iter() + .map(|connector_data| RoutableConnectorChoice::foreign_try_from(connector_data.clone())) + .collect() +} + +impl ForeignTryFrom for RoutableConnectorChoice { + type Error = error_stack::Report; + fn foreign_try_from(from: ConnectorData) -> Result { + match RoutableConnectors::foreign_try_from(from.connector_name) { + Ok(connector) => Ok(Self { + choice_kind: api_routing::RoutableChoiceKind::FullStruct, + connector, + merchant_connector_id: from.merchant_connector_id, + }), + Err(e) => Err(common_utils::errors::ValidationError::InvalidValue { + message: format!("This is not a routable connector: {:?}", e), + })?, + } + } +} + /// Session Surcharge type pub enum SessionSurchargeDetails { /// Surcharge is calculated by hyperswitch diff --git a/crates/storage_impl/Cargo.toml b/crates/storage_impl/Cargo.toml index 22badd47cd..b3dc6d57c6 100644 --- a/crates/storage_impl/Cargo.toml +++ b/crates/storage_impl/Cargo.toml @@ -9,6 +9,7 @@ license.workspace = true [features] default = ["olap", "oltp"] +dynamic_routing = [] oltp = [] olap = ["hyperswitch_domain_models/olap"] payouts = ["hyperswitch_domain_models/payouts"] diff --git a/crates/storage_impl/src/redis/cache.rs b/crates/storage_impl/src/redis/cache.rs index ba4a8200fa..6993105795 100644 --- a/crates/storage_impl/src/redis/cache.rs +++ b/crates/storage_impl/src/redis/cache.rs @@ -72,6 +72,16 @@ pub static PM_FILTERS_CGRAPH_CACHE: Lazy = Lazy::new(|| { ) }); +/// Dynamic Algorithm Cache +pub static SUCCESS_BASED_DYNAMIC_ALGORITHM_CACHE: Lazy = Lazy::new(|| { + Cache::new( + "SUCCESS_BASED_DYNAMIC_ALGORITHM_CACHE", + CACHE_TTL, + CACHE_TTI, + Some(MAX_CAPACITY), + ) +}); + /// Trait which defines the behaviour of types that's gonna be stored in Cache pub trait Cacheable: Any + Send + Sync + DynClone { fn as_any(&self) -> &dyn Any; @@ -91,6 +101,7 @@ pub enum CacheKind<'a> { DecisionManager(Cow<'a, str>), Surcharge(Cow<'a, str>), CGraph(Cow<'a, str>), + SuccessBasedDynamicRoutingCache(Cow<'a, str>), PmFiltersCGraph(Cow<'a, str>), All(Cow<'a, str>), } diff --git a/crates/storage_impl/src/redis/pub_sub.rs b/crates/storage_impl/src/redis/pub_sub.rs index 7d7557a7f5..6a2012f9b2 100644 --- a/crates/storage_impl/src/redis/pub_sub.rs +++ b/crates/storage_impl/src/redis/pub_sub.rs @@ -6,7 +6,8 @@ use router_env::{logger, tracing::Instrument}; use crate::redis::cache::{ CacheKey, CacheKind, CacheRedact, ACCOUNTS_CACHE, CGRAPH_CACHE, CONFIG_CACHE, - DECISION_MANAGER_CACHE, PM_FILTERS_CGRAPH_CACHE, ROUTING_CACHE, SURCHARGE_CACHE, + DECISION_MANAGER_CACHE, PM_FILTERS_CGRAPH_CACHE, ROUTING_CACHE, + SUCCESS_BASED_DYNAMIC_ALGORITHM_CACHE, SURCHARGE_CACHE, }; #[async_trait::async_trait] @@ -137,6 +138,15 @@ impl PubSubInterface for std::sync::Arc { .await; key } + CacheKind::SuccessBasedDynamicRoutingCache(key) => { + SUCCESS_BASED_DYNAMIC_ALGORITHM_CACHE + .remove(CacheKey { + key: key.to_string(), + prefix: message.tenant.clone(), + }) + .await; + key + } CacheKind::Routing(key) => { ROUTING_CACHE .remove(CacheKey { @@ -189,6 +199,12 @@ impl PubSubInterface for std::sync::Arc { prefix: message.tenant.clone(), }) .await; + SUCCESS_BASED_DYNAMIC_ALGORITHM_CACHE + .remove(CacheKey { + key: key.to_string(), + prefix: message.tenant.clone(), + }) + .await; ROUTING_CACHE .remove(CacheKey { key: key.to_string(),