mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-11-02 21:07:58 +08:00
fix: lazy connection pools for dynamic routing service (#6437)
Co-authored-by: Nishant Joshi <nishant.joshi@juspay.in> Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
7
Cargo.lock
generated
7
Cargo.lock
generated
@ -3126,8 +3126,10 @@ dependencies = [
|
|||||||
"dyn-clone",
|
"dyn-clone",
|
||||||
"error-stack",
|
"error-stack",
|
||||||
"hex",
|
"hex",
|
||||||
|
"http-body-util",
|
||||||
"hyper 0.14.30",
|
"hyper 0.14.30",
|
||||||
"hyper-proxy",
|
"hyper-proxy",
|
||||||
|
"hyper-util",
|
||||||
"hyperswitch_interfaces",
|
"hyperswitch_interfaces",
|
||||||
"masking",
|
"masking",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
@ -3959,9 +3961,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hyper-util"
|
name = "hyper-util"
|
||||||
version = "0.1.7"
|
version = "0.1.9"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9"
|
checksum = "41296eb09f183ac68eec06e03cdbea2e759633d4067b2f6552fc2e009bcad08b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes 1.7.1",
|
"bytes 1.7.1",
|
||||||
"futures-channel",
|
"futures-channel",
|
||||||
@ -3972,7 +3974,6 @@ dependencies = [
|
|||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"socket2",
|
"socket2",
|
||||||
"tokio 1.40.0",
|
"tokio 1.40.0",
|
||||||
"tower",
|
|
||||||
"tower-service",
|
"tower-service",
|
||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|||||||
@ -13,7 +13,7 @@ email = ["dep:aws-config"]
|
|||||||
aws_s3 = ["dep:aws-config", "dep:aws-sdk-s3"]
|
aws_s3 = ["dep:aws-config", "dep:aws-sdk-s3"]
|
||||||
hashicorp-vault = ["dep:vaultrs"]
|
hashicorp-vault = ["dep:vaultrs"]
|
||||||
v1 = ["hyperswitch_interfaces/v1", "common_utils/v1"]
|
v1 = ["hyperswitch_interfaces/v1", "common_utils/v1"]
|
||||||
dynamic_routing = ["dep:prost", "dep:tonic", "dep:tonic-reflection", "dep:tonic-types", "dep:api_models", "tokio/macros", "tokio/rt-multi-thread", "dep:tonic-build", "dep:router_env"]
|
dynamic_routing = ["dep:prost", "dep:tonic", "dep:tonic-reflection", "dep:tonic-types", "dep:api_models", "tokio/macros", "tokio/rt-multi-thread", "dep:tonic-build", "dep:router_env", "dep:hyper-util", "dep:http-body-util"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
async-trait = "0.1.79"
|
async-trait = "0.1.79"
|
||||||
@ -38,6 +38,8 @@ tokio = "1.37.0"
|
|||||||
tonic = { version = "0.12.2", optional = true }
|
tonic = { version = "0.12.2", optional = true }
|
||||||
tonic-reflection = { version = "0.12.2", optional = true }
|
tonic-reflection = { version = "0.12.2", optional = true }
|
||||||
tonic-types = { version = "0.12.2", optional = true }
|
tonic-types = { version = "0.12.2", optional = true }
|
||||||
|
hyper-util = { version = "0.1.9", optional = true }
|
||||||
|
http-body-util = { version = "0.1.2", optional = true }
|
||||||
|
|
||||||
|
|
||||||
# First party crates
|
# First party crates
|
||||||
|
|||||||
@ -6,6 +6,9 @@ use api_models::routing::{
|
|||||||
};
|
};
|
||||||
use common_utils::{errors::CustomResult, ext_traits::OptionExt, transformers::ForeignTryFrom};
|
use common_utils::{errors::CustomResult, ext_traits::OptionExt, transformers::ForeignTryFrom};
|
||||||
use error_stack::ResultExt;
|
use error_stack::ResultExt;
|
||||||
|
use http_body_util::combinators::UnsyncBoxBody;
|
||||||
|
use hyper::body::Bytes;
|
||||||
|
use hyper_util::client::legacy::connect::HttpConnector;
|
||||||
use serde;
|
use serde;
|
||||||
use success_rate::{
|
use success_rate::{
|
||||||
success_rate_calculator_client::SuccessRateCalculatorClient, CalSuccessRateConfig,
|
success_rate_calculator_client::SuccessRateCalculatorClient, CalSuccessRateConfig,
|
||||||
@ -13,7 +16,7 @@ use success_rate::{
|
|||||||
CurrentBlockThreshold as DynamicCurrentThreshold, LabelWithStatus,
|
CurrentBlockThreshold as DynamicCurrentThreshold, LabelWithStatus,
|
||||||
UpdateSuccessRateWindowConfig, UpdateSuccessRateWindowRequest, UpdateSuccessRateWindowResponse,
|
UpdateSuccessRateWindowConfig, UpdateSuccessRateWindowRequest, UpdateSuccessRateWindowResponse,
|
||||||
};
|
};
|
||||||
use tonic::transport::Channel;
|
use tonic::Status;
|
||||||
#[allow(
|
#[allow(
|
||||||
missing_docs,
|
missing_docs,
|
||||||
unused_qualifications,
|
unused_qualifications,
|
||||||
@ -40,11 +43,13 @@ pub enum DynamicRoutingError {
|
|||||||
SuccessRateBasedRoutingFailure(String),
|
SuccessRateBasedRoutingFailure(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Client = hyper_util::client::legacy::Client<HttpConnector, UnsyncBoxBody<Bytes, Status>>;
|
||||||
|
|
||||||
/// Type that consists of all the services provided by the client
|
/// Type that consists of all the services provided by the client
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct RoutingStrategy {
|
pub struct RoutingStrategy {
|
||||||
/// success rate service for Dynamic Routing
|
/// success rate service for Dynamic Routing
|
||||||
pub success_rate_client: Option<SuccessRateCalculatorClient<Channel>>,
|
pub success_rate_client: Option<SuccessRateCalculatorClient<Client>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Contains the Dynamic Routing Client Config
|
/// Contains the Dynamic Routing Client Config
|
||||||
@ -68,11 +73,14 @@ impl DynamicRoutingClientConfig {
|
|||||||
pub async fn get_dynamic_routing_connection(
|
pub async fn get_dynamic_routing_connection(
|
||||||
self,
|
self,
|
||||||
) -> Result<RoutingStrategy, Box<dyn std::error::Error>> {
|
) -> Result<RoutingStrategy, Box<dyn std::error::Error>> {
|
||||||
|
let client =
|
||||||
|
hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
|
||||||
|
.http2_only(true)
|
||||||
|
.build_http();
|
||||||
let success_rate_client = match self {
|
let success_rate_client = match self {
|
||||||
Self::Enabled { host, port } => {
|
Self::Enabled { host, port } => {
|
||||||
let uri = format!("http://{}:{}", host, port);
|
let uri = format!("http://{}:{}", host, port).parse::<tonic::transport::Uri>()?;
|
||||||
let channel = tonic::transport::Endpoint::new(uri)?.connect().await?;
|
Some(SuccessRateCalculatorClient::with_origin(client, uri))
|
||||||
Some(SuccessRateCalculatorClient::new(channel))
|
|
||||||
}
|
}
|
||||||
Self::Disabled => None,
|
Self::Disabled => None,
|
||||||
};
|
};
|
||||||
@ -102,7 +110,7 @@ pub trait SuccessBasedDynamicRouting: dyn_clone::DynClone + Send + Sync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Channel> {
|
impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Client> {
|
||||||
async fn calculate_success_rate(
|
async fn calculate_success_rate(
|
||||||
&self,
|
&self,
|
||||||
id: String,
|
id: String,
|
||||||
|
|||||||
Reference in New Issue
Block a user