mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-11-02 12:06:56 +08:00
feat: Added grpc based health check (#6441)
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
@ -1,11 +1,28 @@
|
||||
/// Dyanimc Routing Client interface implementation
|
||||
#[cfg(feature = "dynamic_routing")]
|
||||
pub mod dynamic_routing;
|
||||
/// gRPC based Heath Check Client interface implementation
|
||||
#[cfg(feature = "dynamic_routing")]
|
||||
pub mod health_check_client;
|
||||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
#[cfg(feature = "dynamic_routing")]
|
||||
use dynamic_routing::{DynamicRoutingClientConfig, RoutingStrategy};
|
||||
#[cfg(feature = "dynamic_routing")]
|
||||
use health_check_client::HealthCheckClient;
|
||||
#[cfg(feature = "dynamic_routing")]
|
||||
use http_body_util::combinators::UnsyncBoxBody;
|
||||
#[cfg(feature = "dynamic_routing")]
|
||||
use hyper::body::Bytes;
|
||||
#[cfg(feature = "dynamic_routing")]
|
||||
use hyper_util::client::legacy::connect::HttpConnector;
|
||||
use serde;
|
||||
#[cfg(feature = "dynamic_routing")]
|
||||
use tonic::Status;
|
||||
|
||||
#[cfg(feature = "dynamic_routing")]
|
||||
/// Hyper based Client type for maintaining connection pool for all gRPC services
|
||||
pub type Client = hyper_util::client::legacy::Client<HttpConnector, UnsyncBoxBody<Bytes, Status>>;
|
||||
|
||||
/// Struct contains all the gRPC Clients
|
||||
#[derive(Debug, Clone)]
|
||||
@ -13,6 +30,9 @@ pub struct GrpcClients {
|
||||
/// The routing client
|
||||
#[cfg(feature = "dynamic_routing")]
|
||||
pub dynamic_routing: RoutingStrategy,
|
||||
/// Health Check client for all gRPC services
|
||||
#[cfg(feature = "dynamic_routing")]
|
||||
pub health_client: HealthCheckClient,
|
||||
}
|
||||
/// Type that contains the configs required to construct a gRPC client with its respective services.
|
||||
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize, Default)]
|
||||
@ -29,17 +49,30 @@ impl GrpcClientSettings {
|
||||
/// This function will be called at service startup.
|
||||
#[allow(clippy::expect_used)]
|
||||
pub async fn get_grpc_client_interface(&self) -> Arc<GrpcClients> {
|
||||
#[cfg(feature = "dynamic_routing")]
|
||||
let client =
|
||||
hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
|
||||
.http2_only(true)
|
||||
.build_http();
|
||||
|
||||
#[cfg(feature = "dynamic_routing")]
|
||||
let dynamic_routing_connection = self
|
||||
.dynamic_routing_client
|
||||
.clone()
|
||||
.get_dynamic_routing_connection()
|
||||
.get_dynamic_routing_connection(client.clone())
|
||||
.await
|
||||
.expect("Failed to establish a connection with the Dynamic Routing Server");
|
||||
|
||||
#[cfg(feature = "dynamic_routing")]
|
||||
let health_client = HealthCheckClient::build_connections(self, client)
|
||||
.await
|
||||
.expect("Failed to build gRPC connections");
|
||||
|
||||
Arc::new(GrpcClients {
|
||||
#[cfg(feature = "dynamic_routing")]
|
||||
dynamic_routing: dynamic_routing_connection,
|
||||
#[cfg(feature = "dynamic_routing")]
|
||||
health_client,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -6,9 +6,6 @@ use api_models::routing::{
|
||||
};
|
||||
use common_utils::{errors::CustomResult, ext_traits::OptionExt, transformers::ForeignTryFrom};
|
||||
use error_stack::ResultExt;
|
||||
use http_body_util::combinators::UnsyncBoxBody;
|
||||
use hyper::body::Bytes;
|
||||
use hyper_util::client::legacy::connect::HttpConnector;
|
||||
use router_env::logger;
|
||||
use serde;
|
||||
use success_rate::{
|
||||
@ -18,7 +15,8 @@ use success_rate::{
|
||||
InvalidateWindowsResponse, LabelWithStatus, UpdateSuccessRateWindowConfig,
|
||||
UpdateSuccessRateWindowRequest, UpdateSuccessRateWindowResponse,
|
||||
};
|
||||
use tonic::Status;
|
||||
|
||||
use super::Client;
|
||||
#[allow(
|
||||
missing_docs,
|
||||
unused_qualifications,
|
||||
@ -45,8 +43,6 @@ pub enum DynamicRoutingError {
|
||||
SuccessRateBasedRoutingFailure(String),
|
||||
}
|
||||
|
||||
type Client = hyper_util::client::legacy::Client<HttpConnector, UnsyncBoxBody<Bytes, Status>>;
|
||||
|
||||
/// Type that consists of all the services provided by the client
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RoutingStrategy {
|
||||
@ -64,6 +60,8 @@ pub enum DynamicRoutingClientConfig {
|
||||
host: String,
|
||||
/// The port of the client
|
||||
port: u16,
|
||||
/// Service name
|
||||
service: String,
|
||||
},
|
||||
#[default]
|
||||
/// If the dynamic routing client config has been disabled
|
||||
@ -74,13 +72,10 @@ impl DynamicRoutingClientConfig {
|
||||
/// establish connection with the server
|
||||
pub async fn get_dynamic_routing_connection(
|
||||
self,
|
||||
client: Client,
|
||||
) -> Result<RoutingStrategy, Box<dyn std::error::Error>> {
|
||||
let client =
|
||||
hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
|
||||
.http2_only(true)
|
||||
.build_http();
|
||||
let success_rate_client = match self {
|
||||
Self::Enabled { host, port } => {
|
||||
Self::Enabled { host, port, .. } => {
|
||||
let uri = format!("http://{}:{}", host, port).parse::<tonic::transport::Uri>()?;
|
||||
logger::info!("Connection established with dynamic routing gRPC Server");
|
||||
Some(SuccessRateCalculatorClient::with_origin(client, uri))
|
||||
|
||||
149
crates/external_services/src/grpc_client/health_check_client.rs
Normal file
149
crates/external_services/src/grpc_client/health_check_client.rs
Normal file
@ -0,0 +1,149 @@
|
||||
use std::{collections::HashMap, fmt::Debug};
|
||||
|
||||
use api_models::health_check::{HealthCheckMap, HealthCheckServices};
|
||||
use common_utils::{errors::CustomResult, ext_traits::AsyncExt};
|
||||
use error_stack::ResultExt;
|
||||
pub use health_check::{
|
||||
health_check_response::ServingStatus, health_client::HealthClient, HealthCheckRequest,
|
||||
HealthCheckResponse,
|
||||
};
|
||||
use router_env::logger;
|
||||
|
||||
#[allow(
|
||||
missing_docs,
|
||||
unused_qualifications,
|
||||
clippy::unwrap_used,
|
||||
clippy::as_conversions,
|
||||
clippy::use_self
|
||||
)]
|
||||
pub mod health_check {
|
||||
tonic::include_proto!("grpc.health.v1");
|
||||
}
|
||||
|
||||
use super::{Client, DynamicRoutingClientConfig, GrpcClientSettings};
|
||||
|
||||
/// Result type for Dynamic Routing
|
||||
pub type HealthCheckResult<T> = CustomResult<T, HealthCheckError>;
|
||||
/// Dynamic Routing Errors
|
||||
#[derive(Debug, Clone, thiserror::Error)]
|
||||
pub enum HealthCheckError {
|
||||
/// The required input is missing
|
||||
#[error("Missing fields: {0} for building the Health check connection")]
|
||||
MissingFields(String),
|
||||
/// Error from gRPC Server
|
||||
#[error("Error from gRPC Server : {0}")]
|
||||
ConnectionError(String),
|
||||
/// status is invalid
|
||||
#[error("Invalid Status from server")]
|
||||
InvalidStatus,
|
||||
}
|
||||
|
||||
/// Health Check Client type
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct HealthCheckClient {
|
||||
/// Health clients for all gRPC based services
|
||||
pub clients: HashMap<HealthCheckServices, HealthClient<Client>>,
|
||||
}
|
||||
|
||||
impl HealthCheckClient {
|
||||
/// Build connections to all gRPC services
|
||||
pub async fn build_connections(
|
||||
config: &GrpcClientSettings,
|
||||
client: Client,
|
||||
) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let dynamic_routing_config = &config.dynamic_routing_client;
|
||||
let connection = match dynamic_routing_config {
|
||||
DynamicRoutingClientConfig::Enabled {
|
||||
host,
|
||||
port,
|
||||
service,
|
||||
} => Some((host.clone(), *port, service.clone())),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let mut client_map = HashMap::new();
|
||||
|
||||
if let Some(conn) = connection {
|
||||
let uri = format!("http://{}:{}", conn.0, conn.1).parse::<tonic::transport::Uri>()?;
|
||||
let health_client = HealthClient::with_origin(client, uri);
|
||||
|
||||
client_map.insert(HealthCheckServices::DynamicRoutingService, health_client);
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
clients: client_map,
|
||||
})
|
||||
}
|
||||
/// Perform health check for all services involved
|
||||
pub async fn perform_health_check(
|
||||
&self,
|
||||
config: &GrpcClientSettings,
|
||||
) -> HealthCheckResult<HealthCheckMap> {
|
||||
let dynamic_routing_config = &config.dynamic_routing_client;
|
||||
let connection = match dynamic_routing_config {
|
||||
DynamicRoutingClientConfig::Enabled {
|
||||
host,
|
||||
port,
|
||||
service,
|
||||
} => Some((host.clone(), *port, service.clone())),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let health_client = self
|
||||
.clients
|
||||
.get(&HealthCheckServices::DynamicRoutingService);
|
||||
|
||||
// SAFETY : This is a safe cast as there exists a valid
|
||||
// integer value for this variant
|
||||
#[allow(clippy::as_conversions)]
|
||||
let expected_status = ServingStatus::Serving as i32;
|
||||
|
||||
let mut service_map = HealthCheckMap::new();
|
||||
|
||||
let health_check_succeed = connection
|
||||
.as_ref()
|
||||
.async_map(|conn| self.get_response_from_grpc_service(conn.2.clone(), health_client))
|
||||
.await
|
||||
.transpose()
|
||||
.change_context(HealthCheckError::ConnectionError(
|
||||
"error calling dynamic routing service".to_string(),
|
||||
))
|
||||
.map_err(|err| logger::error!(error=?err))
|
||||
.ok()
|
||||
.flatten()
|
||||
.is_some_and(|resp| resp.status == expected_status);
|
||||
|
||||
connection.and_then(|_conn| {
|
||||
service_map.insert(
|
||||
HealthCheckServices::DynamicRoutingService,
|
||||
health_check_succeed,
|
||||
)
|
||||
});
|
||||
|
||||
Ok(service_map)
|
||||
}
|
||||
|
||||
async fn get_response_from_grpc_service(
|
||||
&self,
|
||||
service: String,
|
||||
client: Option<&HealthClient<Client>>,
|
||||
) -> HealthCheckResult<HealthCheckResponse> {
|
||||
let request = tonic::Request::new(HealthCheckRequest { service });
|
||||
|
||||
let mut client = client
|
||||
.ok_or(HealthCheckError::MissingFields(
|
||||
"[health_client]".to_string(),
|
||||
))?
|
||||
.clone();
|
||||
|
||||
let response = client
|
||||
.check(request)
|
||||
.await
|
||||
.change_context(HealthCheckError::ConnectionError(
|
||||
"Failed to call dynamic routing service".to_string(),
|
||||
))?
|
||||
.into_inner();
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user