diff --git a/config/config.example.toml b/config/config.example.toml index 4bdcdcf79d..ba14ed881c 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -790,3 +790,4 @@ connector_list = "cybersource" # Supported connectors for network tokenization [grpc_client.dynamic_routing_client] # Dynamic Routing Client Configuration host = "localhost" # Client Host port = 7000 # Client Port +service = "dynamo" # Service name diff --git a/config/deployments/env_specific.toml b/config/deployments/env_specific.toml index 0eab330652..fa0c0484a7 100644 --- a/config/deployments/env_specific.toml +++ b/config/deployments/env_specific.toml @@ -326,3 +326,4 @@ check_token_status_url= "" # base url to check token status from token servic [grpc_client.dynamic_routing_client] # Dynamic Routing Client Configuration host = "localhost" # Client Host port = 7000 # Client Port +service = "dynamo" # Service name diff --git a/crates/api_models/Cargo.toml b/crates/api_models/Cargo.toml index e8668aab50..a5d702e26f 100644 --- a/crates/api_models/Cargo.toml +++ b/crates/api_models/Cargo.toml @@ -20,6 +20,7 @@ v1 = ["common_utils/v1"] v2 = ["common_utils/v2", "customer_v2"] customer_v2 = ["common_utils/customer_v2"] payment_methods_v2 = ["common_utils/payment_methods_v2"] +dynamic_routing = [] [dependencies] actix-web = { version = "4.5.1", optional = true } diff --git a/crates/api_models/src/health_check.rs b/crates/api_models/src/health_check.rs index 1e86e2964c..4a1c009e43 100644 --- a/crates/api_models/src/health_check.rs +++ b/crates/api_models/src/health_check.rs @@ -1,3 +1,4 @@ +use std::collections::hash_map::HashMap; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct RouterHealthCheckResponse { pub database: bool, @@ -9,10 +10,22 @@ pub struct RouterHealthCheckResponse { #[cfg(feature = "olap")] pub opensearch: bool, pub outgoing_request: bool, + #[cfg(feature = "dynamic_routing")] + pub grpc_health_check: HealthCheckMap, } impl common_utils::events::ApiEventMetric for RouterHealthCheckResponse {} +/// gRPC based services eligible for Health check +#[derive(Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum HealthCheckServices { + /// Dynamic routing service + DynamicRoutingService, +} + +pub type HealthCheckMap = HashMap; + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct SchedulerHealthCheckResponse { pub database: bool, diff --git a/crates/external_services/build.rs b/crates/external_services/build.rs index 605ef69971..0a4938e94b 100644 --- a/crates/external_services/build.rs +++ b/crates/external_services/build.rs @@ -3,11 +3,21 @@ fn main() -> Result<(), Box> { #[cfg(feature = "dynamic_routing")] { // Get the directory of the current crate - let proto_file = router_env::workspace_path() - .join("proto") - .join("success_rate.proto"); + + let proto_path = router_env::workspace_path().join("proto"); + let success_rate_proto_file = proto_path.join("success_rate.proto"); + + let health_check_proto_file = proto_path.join("health_check.proto"); + let out_dir = std::path::PathBuf::from(std::env::var("OUT_DIR")?); + // Compile the .proto file - tonic_build::compile_protos(proto_file).expect("Failed to compile success rate proto file"); + tonic_build::configure() + .out_dir(out_dir) + .compile( + &[success_rate_proto_file, health_check_proto_file], + &[proto_path], + ) + .expect("Failed to compile proto files"); } Ok(()) } diff --git a/crates/external_services/src/grpc_client.rs b/crates/external_services/src/grpc_client.rs index e7b229a807..8981a1094d 100644 --- a/crates/external_services/src/grpc_client.rs +++ b/crates/external_services/src/grpc_client.rs @@ -1,11 +1,28 @@ /// Dyanimc Routing Client interface implementation #[cfg(feature = "dynamic_routing")] pub mod dynamic_routing; +/// gRPC based Heath Check Client interface implementation +#[cfg(feature = "dynamic_routing")] +pub mod health_check_client; use std::{fmt::Debug, sync::Arc}; #[cfg(feature = "dynamic_routing")] use dynamic_routing::{DynamicRoutingClientConfig, RoutingStrategy}; +#[cfg(feature = "dynamic_routing")] +use health_check_client::HealthCheckClient; +#[cfg(feature = "dynamic_routing")] +use http_body_util::combinators::UnsyncBoxBody; +#[cfg(feature = "dynamic_routing")] +use hyper::body::Bytes; +#[cfg(feature = "dynamic_routing")] +use hyper_util::client::legacy::connect::HttpConnector; use serde; +#[cfg(feature = "dynamic_routing")] +use tonic::Status; + +#[cfg(feature = "dynamic_routing")] +/// Hyper based Client type for maintaining connection pool for all gRPC services +pub type Client = hyper_util::client::legacy::Client>; /// Struct contains all the gRPC Clients #[derive(Debug, Clone)] @@ -13,6 +30,9 @@ pub struct GrpcClients { /// The routing client #[cfg(feature = "dynamic_routing")] pub dynamic_routing: RoutingStrategy, + /// Health Check client for all gRPC services + #[cfg(feature = "dynamic_routing")] + pub health_client: HealthCheckClient, } /// Type that contains the configs required to construct a gRPC client with its respective services. #[derive(Debug, Clone, serde::Deserialize, serde::Serialize, Default)] @@ -29,17 +49,30 @@ impl GrpcClientSettings { /// This function will be called at service startup. #[allow(clippy::expect_used)] pub async fn get_grpc_client_interface(&self) -> Arc { + #[cfg(feature = "dynamic_routing")] + let client = + hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new()) + .http2_only(true) + .build_http(); + #[cfg(feature = "dynamic_routing")] let dynamic_routing_connection = self .dynamic_routing_client .clone() - .get_dynamic_routing_connection() + .get_dynamic_routing_connection(client.clone()) .await .expect("Failed to establish a connection with the Dynamic Routing Server"); + #[cfg(feature = "dynamic_routing")] + let health_client = HealthCheckClient::build_connections(self, client) + .await + .expect("Failed to build gRPC connections"); + Arc::new(GrpcClients { #[cfg(feature = "dynamic_routing")] dynamic_routing: dynamic_routing_connection, + #[cfg(feature = "dynamic_routing")] + health_client, }) } } diff --git a/crates/external_services/src/grpc_client/dynamic_routing.rs b/crates/external_services/src/grpc_client/dynamic_routing.rs index 3264f065b5..7ec42de0d7 100644 --- a/crates/external_services/src/grpc_client/dynamic_routing.rs +++ b/crates/external_services/src/grpc_client/dynamic_routing.rs @@ -6,9 +6,6 @@ use api_models::routing::{ }; use common_utils::{errors::CustomResult, ext_traits::OptionExt, transformers::ForeignTryFrom}; use error_stack::ResultExt; -use http_body_util::combinators::UnsyncBoxBody; -use hyper::body::Bytes; -use hyper_util::client::legacy::connect::HttpConnector; use router_env::logger; use serde; use success_rate::{ @@ -18,7 +15,8 @@ use success_rate::{ InvalidateWindowsResponse, LabelWithStatus, UpdateSuccessRateWindowConfig, UpdateSuccessRateWindowRequest, UpdateSuccessRateWindowResponse, }; -use tonic::Status; + +use super::Client; #[allow( missing_docs, unused_qualifications, @@ -45,8 +43,6 @@ pub enum DynamicRoutingError { SuccessRateBasedRoutingFailure(String), } -type Client = hyper_util::client::legacy::Client>; - /// Type that consists of all the services provided by the client #[derive(Debug, Clone)] pub struct RoutingStrategy { @@ -64,6 +60,8 @@ pub enum DynamicRoutingClientConfig { host: String, /// The port of the client port: u16, + /// Service name + service: String, }, #[default] /// If the dynamic routing client config has been disabled @@ -74,13 +72,10 @@ impl DynamicRoutingClientConfig { /// establish connection with the server pub async fn get_dynamic_routing_connection( self, + client: Client, ) -> Result> { - let client = - hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new()) - .http2_only(true) - .build_http(); let success_rate_client = match self { - Self::Enabled { host, port } => { + Self::Enabled { host, port, .. } => { let uri = format!("http://{}:{}", host, port).parse::()?; logger::info!("Connection established with dynamic routing gRPC Server"); Some(SuccessRateCalculatorClient::with_origin(client, uri)) diff --git a/crates/external_services/src/grpc_client/health_check_client.rs b/crates/external_services/src/grpc_client/health_check_client.rs new file mode 100644 index 0000000000..94d78df795 --- /dev/null +++ b/crates/external_services/src/grpc_client/health_check_client.rs @@ -0,0 +1,149 @@ +use std::{collections::HashMap, fmt::Debug}; + +use api_models::health_check::{HealthCheckMap, HealthCheckServices}; +use common_utils::{errors::CustomResult, ext_traits::AsyncExt}; +use error_stack::ResultExt; +pub use health_check::{ + health_check_response::ServingStatus, health_client::HealthClient, HealthCheckRequest, + HealthCheckResponse, +}; +use router_env::logger; + +#[allow( + missing_docs, + unused_qualifications, + clippy::unwrap_used, + clippy::as_conversions, + clippy::use_self +)] +pub mod health_check { + tonic::include_proto!("grpc.health.v1"); +} + +use super::{Client, DynamicRoutingClientConfig, GrpcClientSettings}; + +/// Result type for Dynamic Routing +pub type HealthCheckResult = CustomResult; +/// Dynamic Routing Errors +#[derive(Debug, Clone, thiserror::Error)] +pub enum HealthCheckError { + /// The required input is missing + #[error("Missing fields: {0} for building the Health check connection")] + MissingFields(String), + /// Error from gRPC Server + #[error("Error from gRPC Server : {0}")] + ConnectionError(String), + /// status is invalid + #[error("Invalid Status from server")] + InvalidStatus, +} + +/// Health Check Client type +#[derive(Debug, Clone)] +pub struct HealthCheckClient { + /// Health clients for all gRPC based services + pub clients: HashMap>, +} + +impl HealthCheckClient { + /// Build connections to all gRPC services + pub async fn build_connections( + config: &GrpcClientSettings, + client: Client, + ) -> Result> { + let dynamic_routing_config = &config.dynamic_routing_client; + let connection = match dynamic_routing_config { + DynamicRoutingClientConfig::Enabled { + host, + port, + service, + } => Some((host.clone(), *port, service.clone())), + _ => None, + }; + + let mut client_map = HashMap::new(); + + if let Some(conn) = connection { + let uri = format!("http://{}:{}", conn.0, conn.1).parse::()?; + let health_client = HealthClient::with_origin(client, uri); + + client_map.insert(HealthCheckServices::DynamicRoutingService, health_client); + } + + Ok(Self { + clients: client_map, + }) + } + /// Perform health check for all services involved + pub async fn perform_health_check( + &self, + config: &GrpcClientSettings, + ) -> HealthCheckResult { + let dynamic_routing_config = &config.dynamic_routing_client; + let connection = match dynamic_routing_config { + DynamicRoutingClientConfig::Enabled { + host, + port, + service, + } => Some((host.clone(), *port, service.clone())), + _ => None, + }; + + let health_client = self + .clients + .get(&HealthCheckServices::DynamicRoutingService); + + // SAFETY : This is a safe cast as there exists a valid + // integer value for this variant + #[allow(clippy::as_conversions)] + let expected_status = ServingStatus::Serving as i32; + + let mut service_map = HealthCheckMap::new(); + + let health_check_succeed = connection + .as_ref() + .async_map(|conn| self.get_response_from_grpc_service(conn.2.clone(), health_client)) + .await + .transpose() + .change_context(HealthCheckError::ConnectionError( + "error calling dynamic routing service".to_string(), + )) + .map_err(|err| logger::error!(error=?err)) + .ok() + .flatten() + .is_some_and(|resp| resp.status == expected_status); + + connection.and_then(|_conn| { + service_map.insert( + HealthCheckServices::DynamicRoutingService, + health_check_succeed, + ) + }); + + Ok(service_map) + } + + async fn get_response_from_grpc_service( + &self, + service: String, + client: Option<&HealthClient>, + ) -> HealthCheckResult { + let request = tonic::Request::new(HealthCheckRequest { service }); + + let mut client = client + .ok_or(HealthCheckError::MissingFields( + "[health_client]".to_string(), + ))? + .clone(); + + let response = client + .check(request) + .await + .change_context(HealthCheckError::ConnectionError( + "Failed to call dynamic routing service".to_string(), + ))? + .into_inner(); + + Ok(response) + } +} diff --git a/crates/router/Cargo.toml b/crates/router/Cargo.toml index b794443a04..f6e1f0efc6 100644 --- a/crates/router/Cargo.toml +++ b/crates/router/Cargo.toml @@ -37,7 +37,7 @@ v2 = ["customer_v2", "payment_methods_v2", "common_default", "api_models/v2", "d v1 = ["common_default", "api_models/v1", "diesel_models/v1", "hyperswitch_domain_models/v1", "storage_impl/v1", "hyperswitch_interfaces/v1", "kgraph_utils/v1", "common_utils/v1"] customer_v2 = ["api_models/customer_v2", "diesel_models/customer_v2", "hyperswitch_domain_models/customer_v2", "storage_impl/customer_v2"] payment_methods_v2 = ["api_models/payment_methods_v2", "diesel_models/payment_methods_v2", "hyperswitch_domain_models/payment_methods_v2", "storage_impl/payment_methods_v2", "common_utils/payment_methods_v2"] -dynamic_routing = ["external_services/dynamic_routing", "storage_impl/dynamic_routing"] +dynamic_routing = ["external_services/dynamic_routing", "storage_impl/dynamic_routing", "api_models/dynamic_routing"] # Partial Auth # The feature reduces the overhead of the router authenticating the merchant for every request, and trusts on `x-merchant-id` header to be present in the request. diff --git a/crates/router/src/core/health_check.rs b/crates/router/src/core/health_check.rs index 83faee677d..31e8cc75f5 100644 --- a/crates/router/src/core/health_check.rs +++ b/crates/router/src/core/health_check.rs @@ -1,5 +1,7 @@ #[cfg(feature = "olap")] use analytics::health_check::HealthCheck; +#[cfg(feature = "dynamic_routing")] +use api_models::health_check::HealthCheckMap; use api_models::health_check::HealthState; use error_stack::ResultExt; use router_env::logger; @@ -28,6 +30,11 @@ pub trait HealthCheckInterface { async fn health_check_opensearch( &self, ) -> CustomResult; + + #[cfg(feature = "dynamic_routing")] + async fn health_check_grpc( + &self, + ) -> CustomResult; } #[async_trait::async_trait] @@ -158,4 +165,20 @@ impl HealthCheckInterface for app::SessionState { logger::debug!("Outgoing request successful"); Ok(HealthState::Running) } + + #[cfg(feature = "dynamic_routing")] + async fn health_check_grpc( + &self, + ) -> CustomResult { + let health_client = &self.grpc_client.health_client; + let grpc_config = &self.conf.grpc_client; + + let health_check_map = health_client + .perform_health_check(grpc_config) + .await + .change_context(errors::HealthCheckGRPCServiceError::FailedToCallService)?; + + logger::debug!("Health check successful"); + Ok(health_check_map) + } } diff --git a/crates/router/src/routes/health.rs b/crates/router/src/routes/health.rs index c1cf28d00a..0f2e633364 100644 --- a/crates/router/src/routes/health.rs +++ b/crates/router/src/routes/health.rs @@ -95,6 +95,19 @@ async fn deep_health_check_func( logger::debug!("Analytics health check end"); + logger::debug!("gRPC health check begin"); + + #[cfg(feature = "dynamic_routing")] + let grpc_health_check = state.health_check_grpc().await.map_err(|error| { + let message = error.to_string(); + error.change_context(errors::ApiErrorResponse::HealthCheckError { + component: "gRPC services", + message, + }) + })?; + + logger::debug!("gRPC health check end"); + logger::debug!("Opensearch health check begin"); #[cfg(feature = "olap")] @@ -129,6 +142,8 @@ async fn deep_health_check_func( #[cfg(feature = "olap")] opensearch: opensearch_status.into(), outgoing_request: outgoing_check.into(), + #[cfg(feature = "dynamic_routing")] + grpc_health_check, }; Ok(api::ApplicationResponse::Json(response)) diff --git a/crates/storage_impl/src/errors.rs b/crates/storage_impl/src/errors.rs index 1cee96f49e..10d7f4dc8e 100644 --- a/crates/storage_impl/src/errors.rs +++ b/crates/storage_impl/src/errors.rs @@ -292,3 +292,9 @@ pub enum HealthCheckLockerError { #[error("Failed to establish Locker connection")] FailedToCallLocker, } + +#[derive(Debug, Clone, thiserror::Error)] +pub enum HealthCheckGRPCServiceError { + #[error("Failed to establish connection with gRPC service")] + FailedToCallService, +} diff --git a/proto/health_check.proto b/proto/health_check.proto new file mode 100644 index 0000000000..b246f59f69 --- /dev/null +++ b/proto/health_check.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; + +package grpc.health.v1; + +message HealthCheckRequest { + string service = 1; +} + +message HealthCheckResponse { + enum ServingStatus { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + SERVICE_UNKNOWN = 3; // Used only by the Watch method. + } + ServingStatus status = 1; +} + +service Health { + rpc Check(HealthCheckRequest) returns (HealthCheckResponse); +} \ No newline at end of file