mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-29 00:49:42 +08:00
feat(routing): build the gRPC interface for communicating with the external service to perform elimination routing (#6672)
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Co-authored-by: Chethan Rao <70657455+Chethan-rao@users.noreply.github.com>
This commit is contained in:
@ -745,8 +745,8 @@ pub struct EliminationRoutingConfig {
|
|||||||
|
|
||||||
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, ToSchema)]
|
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, ToSchema)]
|
||||||
pub struct EliminationAnalyserConfig {
|
pub struct EliminationAnalyserConfig {
|
||||||
pub bucket_size: Option<u32>,
|
pub bucket_size: Option<u64>,
|
||||||
pub bucket_ttl_in_mins: Option<f64>,
|
pub bucket_leak_interval_in_secs: Option<u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for EliminationRoutingConfig {
|
impl Default for EliminationRoutingConfig {
|
||||||
@ -755,7 +755,7 @@ impl Default for EliminationRoutingConfig {
|
|||||||
params: Some(vec![DynamicRoutingConfigParams::PaymentMethod]),
|
params: Some(vec![DynamicRoutingConfigParams::PaymentMethod]),
|
||||||
elimination_analyser_config: Some(EliminationAnalyserConfig {
|
elimination_analyser_config: Some(EliminationAnalyserConfig {
|
||||||
bucket_size: Some(5),
|
bucket_size: Some(5),
|
||||||
bucket_ttl_in_mins: Some(2.0),
|
bucket_leak_interval_in_secs: Some(2),
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -859,3 +859,18 @@ impl CurrentBlockThreshold {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
|
||||||
|
pub struct RoutableConnectorChoiceWithBucketName {
|
||||||
|
pub routable_connector_choice: RoutableConnectorChoice,
|
||||||
|
pub bucket_name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RoutableConnectorChoiceWithBucketName {
|
||||||
|
pub fn new(routable_connector_choice: RoutableConnectorChoice, bucket_name: String) -> Self {
|
||||||
|
Self {
|
||||||
|
routable_connector_choice,
|
||||||
|
bucket_name,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -6,7 +6,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
|
|
||||||
let proto_path = router_env::workspace_path().join("proto");
|
let proto_path = router_env::workspace_path().join("proto");
|
||||||
let success_rate_proto_file = proto_path.join("success_rate.proto");
|
let success_rate_proto_file = proto_path.join("success_rate.proto");
|
||||||
|
let elimination_proto_file = proto_path.join("elimination_rate.proto");
|
||||||
let health_check_proto_file = proto_path.join("health_check.proto");
|
let health_check_proto_file = proto_path.join("health_check.proto");
|
||||||
let out_dir = std::path::PathBuf::from(std::env::var("OUT_DIR")?);
|
let out_dir = std::path::PathBuf::from(std::env::var("OUT_DIR")?);
|
||||||
|
|
||||||
@ -14,7 +14,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
tonic_build::configure()
|
tonic_build::configure()
|
||||||
.out_dir(out_dir)
|
.out_dir(out_dir)
|
||||||
.compile(
|
.compile(
|
||||||
&[success_rate_proto_file, health_check_proto_file],
|
&[
|
||||||
|
success_rate_proto_file,
|
||||||
|
elimination_proto_file,
|
||||||
|
health_check_proto_file,
|
||||||
|
],
|
||||||
&[proto_path],
|
&[proto_path],
|
||||||
)
|
)
|
||||||
.expect("Failed to compile proto files");
|
.expect("Failed to compile proto files");
|
||||||
|
|||||||
@ -1,31 +1,17 @@
|
|||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
|
|
||||||
use api_models::routing::{
|
use common_utils::errors::CustomResult;
|
||||||
CurrentBlockThreshold, RoutableConnectorChoice, RoutableConnectorChoiceWithStatus,
|
|
||||||
SuccessBasedRoutingConfig, SuccessBasedRoutingConfigBody,
|
|
||||||
};
|
|
||||||
use common_utils::{errors::CustomResult, ext_traits::OptionExt, transformers::ForeignTryFrom};
|
|
||||||
use error_stack::ResultExt;
|
|
||||||
use router_env::logger;
|
use router_env::logger;
|
||||||
use serde;
|
use serde;
|
||||||
use success_rate::{
|
/// Elimination Routing Client Interface Implementation
|
||||||
success_rate_calculator_client::SuccessRateCalculatorClient, CalSuccessRateConfig,
|
pub mod elimination_rate_client;
|
||||||
CalSuccessRateRequest, CalSuccessRateResponse,
|
/// Success Routing Client Interface Implementation
|
||||||
CurrentBlockThreshold as DynamicCurrentThreshold, InvalidateWindowsRequest,
|
pub mod success_rate_client;
|
||||||
InvalidateWindowsResponse, LabelWithStatus, UpdateSuccessRateWindowConfig,
|
|
||||||
UpdateSuccessRateWindowRequest, UpdateSuccessRateWindowResponse,
|
pub use elimination_rate_client::EliminationAnalyserClient;
|
||||||
};
|
pub use success_rate_client::SuccessRateCalculatorClient;
|
||||||
|
|
||||||
use super::Client;
|
use super::Client;
|
||||||
#[allow(
|
|
||||||
missing_docs,
|
|
||||||
unused_qualifications,
|
|
||||||
clippy::unwrap_used,
|
|
||||||
clippy::as_conversions
|
|
||||||
)]
|
|
||||||
pub mod success_rate {
|
|
||||||
tonic::include_proto!("success_rate");
|
|
||||||
}
|
|
||||||
/// Result type for Dynamic Routing
|
/// Result type for Dynamic Routing
|
||||||
pub type DynamicRoutingResult<T> = CustomResult<T, DynamicRoutingError>;
|
pub type DynamicRoutingResult<T> = CustomResult<T, DynamicRoutingError>;
|
||||||
|
|
||||||
@ -38,9 +24,12 @@ pub enum DynamicRoutingError {
|
|||||||
/// The required field name
|
/// The required field name
|
||||||
field: String,
|
field: String,
|
||||||
},
|
},
|
||||||
/// Error from Dynamic Routing Server
|
/// Error from Dynamic Routing Server while performing success_rate analysis
|
||||||
#[error("Error from Dynamic Routing Server : {0}")]
|
#[error("Error from Dynamic Routing Server while perfrming success_rate analysis : {0}")]
|
||||||
SuccessRateBasedRoutingFailure(String),
|
SuccessRateBasedRoutingFailure(String),
|
||||||
|
/// Error from Dynamic Routing Server while perfrming elimination
|
||||||
|
#[error("Error from Dynamic Routing Server while perfrming elimination : {0}")]
|
||||||
|
EliminationRateRoutingFailure(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Type that consists of all the services provided by the client
|
/// Type that consists of all the services provided by the client
|
||||||
@ -48,6 +37,8 @@ pub enum DynamicRoutingError {
|
|||||||
pub struct RoutingStrategy {
|
pub struct RoutingStrategy {
|
||||||
/// success rate service for Dynamic Routing
|
/// success rate service for Dynamic Routing
|
||||||
pub success_rate_client: Option<SuccessRateCalculatorClient<Client>>,
|
pub success_rate_client: Option<SuccessRateCalculatorClient<Client>>,
|
||||||
|
/// elimination service for Dynamic Routing
|
||||||
|
pub elimination_rate_client: Option<EliminationAnalyserClient<Client>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Contains the Dynamic Routing Client Config
|
/// Contains the Dynamic Routing Client Config
|
||||||
@ -74,190 +65,23 @@ impl DynamicRoutingClientConfig {
|
|||||||
self,
|
self,
|
||||||
client: Client,
|
client: Client,
|
||||||
) -> Result<RoutingStrategy, Box<dyn std::error::Error>> {
|
) -> Result<RoutingStrategy, Box<dyn std::error::Error>> {
|
||||||
let success_rate_client = match self {
|
let (success_rate_client, elimination_rate_client) = match self {
|
||||||
Self::Enabled { host, port, .. } => {
|
Self::Enabled { host, port, .. } => {
|
||||||
let uri = format!("http://{}:{}", host, port).parse::<tonic::transport::Uri>()?;
|
let uri = format!("http://{}:{}", host, port).parse::<tonic::transport::Uri>()?;
|
||||||
logger::info!("Connection established with dynamic routing gRPC Server");
|
logger::info!("Connection established with dynamic routing gRPC Server");
|
||||||
Some(SuccessRateCalculatorClient::with_origin(client, uri))
|
(
|
||||||
|
Some(SuccessRateCalculatorClient::with_origin(
|
||||||
|
client.clone(),
|
||||||
|
uri.clone(),
|
||||||
|
)),
|
||||||
|
Some(EliminationAnalyserClient::with_origin(client, uri)),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
Self::Disabled => None,
|
Self::Disabled => (None, None),
|
||||||
};
|
};
|
||||||
Ok(RoutingStrategy {
|
Ok(RoutingStrategy {
|
||||||
success_rate_client,
|
success_rate_client,
|
||||||
})
|
elimination_rate_client,
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The trait Success Based Dynamic Routing would have the functions required to support the calculation and updation window
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
pub trait SuccessBasedDynamicRouting: dyn_clone::DynClone + Send + Sync {
|
|
||||||
/// To calculate the success rate for the list of chosen connectors
|
|
||||||
async fn calculate_success_rate(
|
|
||||||
&self,
|
|
||||||
id: String,
|
|
||||||
success_rate_based_config: SuccessBasedRoutingConfig,
|
|
||||||
params: String,
|
|
||||||
label_input: Vec<RoutableConnectorChoice>,
|
|
||||||
) -> DynamicRoutingResult<CalSuccessRateResponse>;
|
|
||||||
/// To update the success rate with the given label
|
|
||||||
async fn update_success_rate(
|
|
||||||
&self,
|
|
||||||
id: String,
|
|
||||||
success_rate_based_config: SuccessBasedRoutingConfig,
|
|
||||||
params: String,
|
|
||||||
response: Vec<RoutableConnectorChoiceWithStatus>,
|
|
||||||
) -> DynamicRoutingResult<UpdateSuccessRateWindowResponse>;
|
|
||||||
/// To invalidates the success rate routing keys
|
|
||||||
async fn invalidate_success_rate_routing_keys(
|
|
||||||
&self,
|
|
||||||
id: String,
|
|
||||||
) -> DynamicRoutingResult<InvalidateWindowsResponse>;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Client> {
|
|
||||||
async fn calculate_success_rate(
|
|
||||||
&self,
|
|
||||||
id: String,
|
|
||||||
success_rate_based_config: SuccessBasedRoutingConfig,
|
|
||||||
params: String,
|
|
||||||
label_input: Vec<RoutableConnectorChoice>,
|
|
||||||
) -> DynamicRoutingResult<CalSuccessRateResponse> {
|
|
||||||
let labels = label_input
|
|
||||||
.into_iter()
|
|
||||||
.map(|conn_choice| conn_choice.to_string())
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let config = success_rate_based_config
|
|
||||||
.config
|
|
||||||
.map(ForeignTryFrom::foreign_try_from)
|
|
||||||
.transpose()?;
|
|
||||||
|
|
||||||
let request = tonic::Request::new(CalSuccessRateRequest {
|
|
||||||
id,
|
|
||||||
params,
|
|
||||||
labels,
|
|
||||||
config,
|
|
||||||
});
|
|
||||||
|
|
||||||
let response = self
|
|
||||||
.clone()
|
|
||||||
.fetch_success_rate(request)
|
|
||||||
.await
|
|
||||||
.change_context(DynamicRoutingError::SuccessRateBasedRoutingFailure(
|
|
||||||
"Failed to fetch the success rate".to_string(),
|
|
||||||
))?
|
|
||||||
.into_inner();
|
|
||||||
|
|
||||||
Ok(response)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update_success_rate(
|
|
||||||
&self,
|
|
||||||
id: String,
|
|
||||||
success_rate_based_config: SuccessBasedRoutingConfig,
|
|
||||||
params: String,
|
|
||||||
label_input: Vec<RoutableConnectorChoiceWithStatus>,
|
|
||||||
) -> DynamicRoutingResult<UpdateSuccessRateWindowResponse> {
|
|
||||||
let config = success_rate_based_config
|
|
||||||
.config
|
|
||||||
.map(ForeignTryFrom::foreign_try_from)
|
|
||||||
.transpose()?;
|
|
||||||
|
|
||||||
let labels_with_status = label_input
|
|
||||||
.into_iter()
|
|
||||||
.map(|conn_choice| LabelWithStatus {
|
|
||||||
label: conn_choice.routable_connector_choice.to_string(),
|
|
||||||
status: conn_choice.status,
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let request = tonic::Request::new(UpdateSuccessRateWindowRequest {
|
|
||||||
id,
|
|
||||||
params,
|
|
||||||
labels_with_status,
|
|
||||||
config,
|
|
||||||
});
|
|
||||||
|
|
||||||
let response = self
|
|
||||||
.clone()
|
|
||||||
.update_success_rate_window(request)
|
|
||||||
.await
|
|
||||||
.change_context(DynamicRoutingError::SuccessRateBasedRoutingFailure(
|
|
||||||
"Failed to update the success rate window".to_string(),
|
|
||||||
))?
|
|
||||||
.into_inner();
|
|
||||||
|
|
||||||
Ok(response)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn invalidate_success_rate_routing_keys(
|
|
||||||
&self,
|
|
||||||
id: String,
|
|
||||||
) -> DynamicRoutingResult<InvalidateWindowsResponse> {
|
|
||||||
let request = tonic::Request::new(InvalidateWindowsRequest { id });
|
|
||||||
|
|
||||||
let response = self
|
|
||||||
.clone()
|
|
||||||
.invalidate_windows(request)
|
|
||||||
.await
|
|
||||||
.change_context(DynamicRoutingError::SuccessRateBasedRoutingFailure(
|
|
||||||
"Failed to invalidate the success rate routing keys".to_string(),
|
|
||||||
))?
|
|
||||||
.into_inner();
|
|
||||||
Ok(response)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ForeignTryFrom<CurrentBlockThreshold> for DynamicCurrentThreshold {
|
|
||||||
type Error = error_stack::Report<DynamicRoutingError>;
|
|
||||||
fn foreign_try_from(current_threshold: CurrentBlockThreshold) -> Result<Self, Self::Error> {
|
|
||||||
Ok(Self {
|
|
||||||
duration_in_mins: current_threshold.duration_in_mins,
|
|
||||||
max_total_count: current_threshold
|
|
||||||
.max_total_count
|
|
||||||
.get_required_value("max_total_count")
|
|
||||||
.change_context(DynamicRoutingError::MissingRequiredField {
|
|
||||||
field: "max_total_count".to_string(),
|
|
||||||
})?,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ForeignTryFrom<SuccessBasedRoutingConfigBody> for UpdateSuccessRateWindowConfig {
|
|
||||||
type Error = error_stack::Report<DynamicRoutingError>;
|
|
||||||
fn foreign_try_from(config: SuccessBasedRoutingConfigBody) -> Result<Self, Self::Error> {
|
|
||||||
Ok(Self {
|
|
||||||
max_aggregates_size: config
|
|
||||||
.max_aggregates_size
|
|
||||||
.get_required_value("max_aggregate_size")
|
|
||||||
.change_context(DynamicRoutingError::MissingRequiredField {
|
|
||||||
field: "max_aggregates_size".to_string(),
|
|
||||||
})?,
|
|
||||||
current_block_threshold: config
|
|
||||||
.current_block_threshold
|
|
||||||
.map(ForeignTryFrom::foreign_try_from)
|
|
||||||
.transpose()?,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ForeignTryFrom<SuccessBasedRoutingConfigBody> for CalSuccessRateConfig {
|
|
||||||
type Error = error_stack::Report<DynamicRoutingError>;
|
|
||||||
fn foreign_try_from(config: SuccessBasedRoutingConfigBody) -> Result<Self, Self::Error> {
|
|
||||||
Ok(Self {
|
|
||||||
min_aggregates_size: config
|
|
||||||
.min_aggregates_size
|
|
||||||
.get_required_value("min_aggregate_size")
|
|
||||||
.change_context(DynamicRoutingError::MissingRequiredField {
|
|
||||||
field: "min_aggregates_size".to_string(),
|
|
||||||
})?,
|
|
||||||
default_success_rate: config
|
|
||||||
.default_success_rate
|
|
||||||
.get_required_value("default_success_rate")
|
|
||||||
.change_context(DynamicRoutingError::MissingRequiredField {
|
|
||||||
field: "default_success_rate".to_string(),
|
|
||||||
})?,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,158 @@
|
|||||||
|
use api_models::routing::{
|
||||||
|
EliminationAnalyserConfig as EliminationConfig, RoutableConnectorChoice,
|
||||||
|
RoutableConnectorChoiceWithBucketName,
|
||||||
|
};
|
||||||
|
use common_utils::{ext_traits::OptionExt, transformers::ForeignTryFrom};
|
||||||
|
pub use elimination_rate::{
|
||||||
|
elimination_analyser_client::EliminationAnalyserClient, EliminationBucketConfig,
|
||||||
|
EliminationRequest, EliminationResponse, InvalidateBucketRequest, InvalidateBucketResponse,
|
||||||
|
LabelWithBucketName, UpdateEliminationBucketRequest, UpdateEliminationBucketResponse,
|
||||||
|
};
|
||||||
|
use error_stack::ResultExt;
|
||||||
|
#[allow(
|
||||||
|
missing_docs,
|
||||||
|
unused_qualifications,
|
||||||
|
clippy::unwrap_used,
|
||||||
|
clippy::as_conversions,
|
||||||
|
clippy::use_self
|
||||||
|
)]
|
||||||
|
pub mod elimination_rate {
|
||||||
|
tonic::include_proto!("elimination");
|
||||||
|
}
|
||||||
|
|
||||||
|
use super::{Client, DynamicRoutingError, DynamicRoutingResult};
|
||||||
|
|
||||||
|
/// The trait Elimination Based Routing would have the functions required to support performance, calculation and invalidation bucket
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
pub trait EliminationBasedRouting: dyn_clone::DynClone + Send + Sync {
|
||||||
|
/// To perform the elimination based routing for the list of connectors
|
||||||
|
async fn perform_elimination_routing(
|
||||||
|
&self,
|
||||||
|
id: String,
|
||||||
|
params: String,
|
||||||
|
labels: Vec<RoutableConnectorChoice>,
|
||||||
|
configs: Option<EliminationConfig>,
|
||||||
|
) -> DynamicRoutingResult<EliminationResponse>;
|
||||||
|
/// To update the bucket size and ttl for list of connectors with its respective bucket name
|
||||||
|
async fn update_elimination_bucket_config(
|
||||||
|
&self,
|
||||||
|
id: String,
|
||||||
|
params: String,
|
||||||
|
report: Vec<RoutableConnectorChoiceWithBucketName>,
|
||||||
|
config: Option<EliminationConfig>,
|
||||||
|
) -> DynamicRoutingResult<UpdateEliminationBucketResponse>;
|
||||||
|
/// To invalidate the previous id's bucket
|
||||||
|
async fn invalidate_elimination_bucket(
|
||||||
|
&self,
|
||||||
|
id: String,
|
||||||
|
) -> DynamicRoutingResult<InvalidateBucketResponse>;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl EliminationBasedRouting for EliminationAnalyserClient<Client> {
|
||||||
|
async fn perform_elimination_routing(
|
||||||
|
&self,
|
||||||
|
id: String,
|
||||||
|
params: String,
|
||||||
|
label_input: Vec<RoutableConnectorChoice>,
|
||||||
|
configs: Option<EliminationConfig>,
|
||||||
|
) -> DynamicRoutingResult<EliminationResponse> {
|
||||||
|
let labels = label_input
|
||||||
|
.into_iter()
|
||||||
|
.map(|conn_choice| conn_choice.to_string())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let config = configs.map(ForeignTryFrom::foreign_try_from).transpose()?;
|
||||||
|
|
||||||
|
let request = tonic::Request::new(EliminationRequest {
|
||||||
|
id,
|
||||||
|
params,
|
||||||
|
labels,
|
||||||
|
config,
|
||||||
|
});
|
||||||
|
|
||||||
|
let response = self
|
||||||
|
.clone()
|
||||||
|
.get_elimination_status(request)
|
||||||
|
.await
|
||||||
|
.change_context(DynamicRoutingError::EliminationRateRoutingFailure(
|
||||||
|
"Failed to perform the elimination analysis".to_string(),
|
||||||
|
))?
|
||||||
|
.into_inner();
|
||||||
|
|
||||||
|
Ok(response)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_elimination_bucket_config(
|
||||||
|
&self,
|
||||||
|
id: String,
|
||||||
|
params: String,
|
||||||
|
report: Vec<RoutableConnectorChoiceWithBucketName>,
|
||||||
|
configs: Option<EliminationConfig>,
|
||||||
|
) -> DynamicRoutingResult<UpdateEliminationBucketResponse> {
|
||||||
|
let config = configs.map(ForeignTryFrom::foreign_try_from).transpose()?;
|
||||||
|
|
||||||
|
let labels_with_bucket_name = report
|
||||||
|
.into_iter()
|
||||||
|
.map(|conn_choice_with_bucket| LabelWithBucketName {
|
||||||
|
label: conn_choice_with_bucket
|
||||||
|
.routable_connector_choice
|
||||||
|
.to_string(),
|
||||||
|
bucket_name: conn_choice_with_bucket.bucket_name,
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let request = tonic::Request::new(UpdateEliminationBucketRequest {
|
||||||
|
id,
|
||||||
|
params,
|
||||||
|
labels_with_bucket_name,
|
||||||
|
config,
|
||||||
|
});
|
||||||
|
|
||||||
|
let response = self
|
||||||
|
.clone()
|
||||||
|
.update_elimination_bucket(request)
|
||||||
|
.await
|
||||||
|
.change_context(DynamicRoutingError::EliminationRateRoutingFailure(
|
||||||
|
"Failed to update the elimination bucket".to_string(),
|
||||||
|
))?
|
||||||
|
.into_inner();
|
||||||
|
Ok(response)
|
||||||
|
}
|
||||||
|
async fn invalidate_elimination_bucket(
|
||||||
|
&self,
|
||||||
|
id: String,
|
||||||
|
) -> DynamicRoutingResult<InvalidateBucketResponse> {
|
||||||
|
let request = tonic::Request::new(InvalidateBucketRequest { id });
|
||||||
|
|
||||||
|
let response = self
|
||||||
|
.clone()
|
||||||
|
.invalidate_bucket(request)
|
||||||
|
.await
|
||||||
|
.change_context(DynamicRoutingError::EliminationRateRoutingFailure(
|
||||||
|
"Failed to invalidate the elimination bucket".to_string(),
|
||||||
|
))?
|
||||||
|
.into_inner();
|
||||||
|
Ok(response)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ForeignTryFrom<EliminationConfig> for EliminationBucketConfig {
|
||||||
|
type Error = error_stack::Report<DynamicRoutingError>;
|
||||||
|
fn foreign_try_from(config: EliminationConfig) -> Result<Self, Self::Error> {
|
||||||
|
Ok(Self {
|
||||||
|
bucket_size: config
|
||||||
|
.bucket_size
|
||||||
|
.get_required_value("bucket_size")
|
||||||
|
.change_context(DynamicRoutingError::MissingRequiredField {
|
||||||
|
field: "bucket_size".to_string(),
|
||||||
|
})?,
|
||||||
|
bucket_leak_interval_in_secs: config
|
||||||
|
.bucket_leak_interval_in_secs
|
||||||
|
.get_required_value("bucket_leak_interval_in_secs")
|
||||||
|
.change_context(DynamicRoutingError::MissingRequiredField {
|
||||||
|
field: "bucket_leak_interval_in_secs".to_string(),
|
||||||
|
})?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,195 @@
|
|||||||
|
use api_models::routing::{
|
||||||
|
CurrentBlockThreshold, RoutableConnectorChoice, RoutableConnectorChoiceWithStatus,
|
||||||
|
SuccessBasedRoutingConfig, SuccessBasedRoutingConfigBody,
|
||||||
|
};
|
||||||
|
use common_utils::{ext_traits::OptionExt, transformers::ForeignTryFrom};
|
||||||
|
use error_stack::ResultExt;
|
||||||
|
pub use success_rate::{
|
||||||
|
success_rate_calculator_client::SuccessRateCalculatorClient, CalSuccessRateConfig,
|
||||||
|
CalSuccessRateRequest, CalSuccessRateResponse,
|
||||||
|
CurrentBlockThreshold as DynamicCurrentThreshold, InvalidateWindowsRequest,
|
||||||
|
InvalidateWindowsResponse, LabelWithStatus, UpdateSuccessRateWindowConfig,
|
||||||
|
UpdateSuccessRateWindowRequest, UpdateSuccessRateWindowResponse,
|
||||||
|
};
|
||||||
|
#[allow(
|
||||||
|
missing_docs,
|
||||||
|
unused_qualifications,
|
||||||
|
clippy::unwrap_used,
|
||||||
|
clippy::as_conversions
|
||||||
|
)]
|
||||||
|
pub mod success_rate {
|
||||||
|
tonic::include_proto!("success_rate");
|
||||||
|
}
|
||||||
|
use super::{Client, DynamicRoutingError, DynamicRoutingResult};
|
||||||
|
/// The trait Success Based Dynamic Routing would have the functions required to support the calculation and updation window
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
pub trait SuccessBasedDynamicRouting: dyn_clone::DynClone + Send + Sync {
|
||||||
|
/// To calculate the success rate for the list of chosen connectors
|
||||||
|
async fn calculate_success_rate(
|
||||||
|
&self,
|
||||||
|
id: String,
|
||||||
|
success_rate_based_config: SuccessBasedRoutingConfig,
|
||||||
|
params: String,
|
||||||
|
label_input: Vec<RoutableConnectorChoice>,
|
||||||
|
) -> DynamicRoutingResult<CalSuccessRateResponse>;
|
||||||
|
/// To update the success rate with the given label
|
||||||
|
async fn update_success_rate(
|
||||||
|
&self,
|
||||||
|
id: String,
|
||||||
|
success_rate_based_config: SuccessBasedRoutingConfig,
|
||||||
|
params: String,
|
||||||
|
response: Vec<RoutableConnectorChoiceWithStatus>,
|
||||||
|
) -> DynamicRoutingResult<UpdateSuccessRateWindowResponse>;
|
||||||
|
/// To invalidates the success rate routing keys
|
||||||
|
async fn invalidate_success_rate_routing_keys(
|
||||||
|
&self,
|
||||||
|
id: String,
|
||||||
|
) -> DynamicRoutingResult<InvalidateWindowsResponse>;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Client> {
|
||||||
|
async fn calculate_success_rate(
|
||||||
|
&self,
|
||||||
|
id: String,
|
||||||
|
success_rate_based_config: SuccessBasedRoutingConfig,
|
||||||
|
params: String,
|
||||||
|
label_input: Vec<RoutableConnectorChoice>,
|
||||||
|
) -> DynamicRoutingResult<CalSuccessRateResponse> {
|
||||||
|
let labels = label_input
|
||||||
|
.into_iter()
|
||||||
|
.map(|conn_choice| conn_choice.to_string())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let config = success_rate_based_config
|
||||||
|
.config
|
||||||
|
.map(ForeignTryFrom::foreign_try_from)
|
||||||
|
.transpose()?;
|
||||||
|
|
||||||
|
let request = tonic::Request::new(CalSuccessRateRequest {
|
||||||
|
id,
|
||||||
|
params,
|
||||||
|
labels,
|
||||||
|
config,
|
||||||
|
});
|
||||||
|
|
||||||
|
let response = self
|
||||||
|
.clone()
|
||||||
|
.fetch_success_rate(request)
|
||||||
|
.await
|
||||||
|
.change_context(DynamicRoutingError::SuccessRateBasedRoutingFailure(
|
||||||
|
"Failed to fetch the success rate".to_string(),
|
||||||
|
))?
|
||||||
|
.into_inner();
|
||||||
|
|
||||||
|
Ok(response)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_success_rate(
|
||||||
|
&self,
|
||||||
|
id: String,
|
||||||
|
success_rate_based_config: SuccessBasedRoutingConfig,
|
||||||
|
params: String,
|
||||||
|
label_input: Vec<RoutableConnectorChoiceWithStatus>,
|
||||||
|
) -> DynamicRoutingResult<UpdateSuccessRateWindowResponse> {
|
||||||
|
let config = success_rate_based_config
|
||||||
|
.config
|
||||||
|
.map(ForeignTryFrom::foreign_try_from)
|
||||||
|
.transpose()?;
|
||||||
|
|
||||||
|
let labels_with_status = label_input
|
||||||
|
.into_iter()
|
||||||
|
.map(|conn_choice| LabelWithStatus {
|
||||||
|
label: conn_choice.routable_connector_choice.to_string(),
|
||||||
|
status: conn_choice.status,
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let request = tonic::Request::new(UpdateSuccessRateWindowRequest {
|
||||||
|
id,
|
||||||
|
params,
|
||||||
|
labels_with_status,
|
||||||
|
config,
|
||||||
|
});
|
||||||
|
|
||||||
|
let response = self
|
||||||
|
.clone()
|
||||||
|
.update_success_rate_window(request)
|
||||||
|
.await
|
||||||
|
.change_context(DynamicRoutingError::SuccessRateBasedRoutingFailure(
|
||||||
|
"Failed to update the success rate window".to_string(),
|
||||||
|
))?
|
||||||
|
.into_inner();
|
||||||
|
|
||||||
|
Ok(response)
|
||||||
|
}
|
||||||
|
async fn invalidate_success_rate_routing_keys(
|
||||||
|
&self,
|
||||||
|
id: String,
|
||||||
|
) -> DynamicRoutingResult<InvalidateWindowsResponse> {
|
||||||
|
let request = tonic::Request::new(InvalidateWindowsRequest { id });
|
||||||
|
|
||||||
|
let response = self
|
||||||
|
.clone()
|
||||||
|
.invalidate_windows(request)
|
||||||
|
.await
|
||||||
|
.change_context(DynamicRoutingError::SuccessRateBasedRoutingFailure(
|
||||||
|
"Failed to invalidate the success rate routing keys".to_string(),
|
||||||
|
))?
|
||||||
|
.into_inner();
|
||||||
|
Ok(response)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ForeignTryFrom<CurrentBlockThreshold> for DynamicCurrentThreshold {
|
||||||
|
type Error = error_stack::Report<DynamicRoutingError>;
|
||||||
|
fn foreign_try_from(current_threshold: CurrentBlockThreshold) -> Result<Self, Self::Error> {
|
||||||
|
Ok(Self {
|
||||||
|
duration_in_mins: current_threshold.duration_in_mins,
|
||||||
|
max_total_count: current_threshold
|
||||||
|
.max_total_count
|
||||||
|
.get_required_value("max_total_count")
|
||||||
|
.change_context(DynamicRoutingError::MissingRequiredField {
|
||||||
|
field: "max_total_count".to_string(),
|
||||||
|
})?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ForeignTryFrom<SuccessBasedRoutingConfigBody> for UpdateSuccessRateWindowConfig {
|
||||||
|
type Error = error_stack::Report<DynamicRoutingError>;
|
||||||
|
fn foreign_try_from(config: SuccessBasedRoutingConfigBody) -> Result<Self, Self::Error> {
|
||||||
|
Ok(Self {
|
||||||
|
max_aggregates_size: config
|
||||||
|
.max_aggregates_size
|
||||||
|
.get_required_value("max_aggregate_size")
|
||||||
|
.change_context(DynamicRoutingError::MissingRequiredField {
|
||||||
|
field: "max_aggregates_size".to_string(),
|
||||||
|
})?,
|
||||||
|
current_block_threshold: config
|
||||||
|
.current_block_threshold
|
||||||
|
.map(ForeignTryFrom::foreign_try_from)
|
||||||
|
.transpose()?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ForeignTryFrom<SuccessBasedRoutingConfigBody> for CalSuccessRateConfig {
|
||||||
|
type Error = error_stack::Report<DynamicRoutingError>;
|
||||||
|
fn foreign_try_from(config: SuccessBasedRoutingConfigBody) -> Result<Self, Self::Error> {
|
||||||
|
Ok(Self {
|
||||||
|
min_aggregates_size: config
|
||||||
|
.min_aggregates_size
|
||||||
|
.get_required_value("min_aggregate_size")
|
||||||
|
.change_context(DynamicRoutingError::MissingRequiredField {
|
||||||
|
field: "min_aggregates_size".to_string(),
|
||||||
|
})?,
|
||||||
|
default_success_rate: config
|
||||||
|
.default_success_rate
|
||||||
|
.get_required_value("default_success_rate")
|
||||||
|
.change_context(DynamicRoutingError::MissingRequiredField {
|
||||||
|
field: "default_success_rate".to_string(),
|
||||||
|
})?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -23,8 +23,8 @@ use euclid::{
|
|||||||
frontend::{ast, dir as euclid_dir},
|
frontend::{ast, dir as euclid_dir},
|
||||||
};
|
};
|
||||||
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
|
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
|
||||||
use external_services::grpc_client::dynamic_routing::{
|
use external_services::grpc_client::dynamic_routing::success_rate_client::{
|
||||||
success_rate::CalSuccessRateResponse, SuccessBasedDynamicRouting,
|
CalSuccessRateResponse, SuccessBasedDynamicRouting,
|
||||||
};
|
};
|
||||||
use hyperswitch_domain_models::address::Address;
|
use hyperswitch_domain_models::address::Address;
|
||||||
use kgraph_utils::{
|
use kgraph_utils::{
|
||||||
|
|||||||
@ -12,7 +12,7 @@ use common_utils::ext_traits::AsyncExt;
|
|||||||
use diesel_models::routing_algorithm::RoutingAlgorithm;
|
use diesel_models::routing_algorithm::RoutingAlgorithm;
|
||||||
use error_stack::ResultExt;
|
use error_stack::ResultExt;
|
||||||
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
|
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
|
||||||
use external_services::grpc_client::dynamic_routing::SuccessBasedDynamicRouting;
|
use external_services::grpc_client::dynamic_routing::success_rate_client::SuccessBasedDynamicRouting;
|
||||||
use hyperswitch_domain_models::{mandates, payment_address};
|
use hyperswitch_domain_models::{mandates, payment_address};
|
||||||
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
|
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
|
||||||
use router_env::logger;
|
use router_env::logger;
|
||||||
|
|||||||
@ -18,7 +18,7 @@ use diesel_models::dynamic_routing_stats::DynamicRoutingStatsNew;
|
|||||||
use diesel_models::routing_algorithm;
|
use diesel_models::routing_algorithm;
|
||||||
use error_stack::ResultExt;
|
use error_stack::ResultExt;
|
||||||
#[cfg(all(feature = "dynamic_routing", feature = "v1"))]
|
#[cfg(all(feature = "dynamic_routing", feature = "v1"))]
|
||||||
use external_services::grpc_client::dynamic_routing::SuccessBasedDynamicRouting;
|
use external_services::grpc_client::dynamic_routing::success_rate_client::SuccessBasedDynamicRouting;
|
||||||
#[cfg(feature = "v1")]
|
#[cfg(feature = "v1")]
|
||||||
use hyperswitch_domain_models::api::ApplicationResponse;
|
use hyperswitch_domain_models::api::ApplicationResponse;
|
||||||
#[cfg(all(feature = "dynamic_routing", feature = "v1"))]
|
#[cfg(all(feature = "dynamic_routing", feature = "v1"))]
|
||||||
|
|||||||
67
proto/elimination_rate.proto
Normal file
67
proto/elimination_rate.proto
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
syntax = "proto3";
|
||||||
|
package elimination;
|
||||||
|
|
||||||
|
service EliminationAnalyser {
|
||||||
|
rpc GetEliminationStatus (EliminationRequest) returns (EliminationResponse);
|
||||||
|
|
||||||
|
rpc UpdateEliminationBucket (UpdateEliminationBucketRequest) returns (UpdateEliminationBucketResponse);
|
||||||
|
|
||||||
|
rpc InvalidateBucket (InvalidateBucketRequest) returns (InvalidateBucketResponse);
|
||||||
|
}
|
||||||
|
|
||||||
|
// API-1 types
|
||||||
|
message EliminationRequest {
|
||||||
|
string id = 1;
|
||||||
|
string params = 2;
|
||||||
|
repeated string labels = 3;
|
||||||
|
EliminationBucketConfig config = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message EliminationBucketConfig {
|
||||||
|
uint64 bucket_size = 1;
|
||||||
|
uint64 bucket_leak_interval_in_secs = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message EliminationResponse {
|
||||||
|
repeated LabelWithStatus labels_with_status = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message LabelWithStatus {
|
||||||
|
string label = 1;
|
||||||
|
bool is_eliminated = 2;
|
||||||
|
string bucket_name = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
// API-2 types
|
||||||
|
message UpdateEliminationBucketRequest {
|
||||||
|
string id = 1;
|
||||||
|
string params = 2;
|
||||||
|
repeated LabelWithBucketName labels_with_bucket_name = 3;
|
||||||
|
EliminationBucketConfig config = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message LabelWithBucketName {
|
||||||
|
string label = 1;
|
||||||
|
string bucket_name = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message UpdateEliminationBucketResponse {
|
||||||
|
enum UpdationStatus {
|
||||||
|
BUCKET_UPDATION_SUCCEEDED = 0;
|
||||||
|
BUCKET_UPDATION_FAILED = 1;
|
||||||
|
}
|
||||||
|
UpdationStatus status = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// API-3 types
|
||||||
|
message InvalidateBucketRequest {
|
||||||
|
string id = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message InvalidateBucketResponse {
|
||||||
|
enum InvalidationStatus {
|
||||||
|
BUCKET_INVALIDATION_SUCCEEDED = 0;
|
||||||
|
BUCKET_INVALIDATION_FAILED = 1;
|
||||||
|
}
|
||||||
|
InvalidationStatus status = 1;
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user