refactor(dynamic_routing): add info logs to log the grpc request and response (#6962)

This commit is contained in:
Chethan Rao
2025-01-16 15:57:56 +05:30
committed by GitHub
parent 343465165b
commit 72904842ed
5 changed files with 90 additions and 43 deletions

View File

@ -128,3 +128,13 @@ impl<T> AddHeaders for tonic::Request<T> {
}); });
} }
} }
#[cfg(feature = "dynamic_routing")]
pub(crate) fn create_grpc_request<T: Debug>(message: T, headers: GrpcHeaders) -> tonic::Request<T> {
let mut request = tonic::Request::new(message);
request.add_headers_to_grpc_request(headers);
logger::info!(dynamic_routing_request=?request);
request
}

View File

@ -9,6 +9,7 @@ pub use elimination_rate::{
LabelWithBucketName, UpdateEliminationBucketRequest, UpdateEliminationBucketResponse, LabelWithBucketName, UpdateEliminationBucketRequest, UpdateEliminationBucketResponse,
}; };
use error_stack::ResultExt; use error_stack::ResultExt;
use router_env::{instrument, logger, tracing};
#[allow( #[allow(
missing_docs, missing_docs,
unused_qualifications, unused_qualifications,
@ -21,7 +22,7 @@ pub mod elimination_rate {
} }
use super::{Client, DynamicRoutingError, DynamicRoutingResult}; use super::{Client, DynamicRoutingError, DynamicRoutingResult};
use crate::grpc_client::{AddHeaders, GrpcHeaders}; use crate::grpc_client::{self, GrpcHeaders};
/// The trait Elimination Based Routing would have the functions required to support performance, calculation and invalidation bucket /// The trait Elimination Based Routing would have the functions required to support performance, calculation and invalidation bucket
#[async_trait::async_trait] #[async_trait::async_trait]
@ -54,6 +55,7 @@ pub trait EliminationBasedRouting: dyn_clone::DynClone + Send + Sync {
#[async_trait::async_trait] #[async_trait::async_trait]
impl EliminationBasedRouting for EliminationAnalyserClient<Client> { impl EliminationBasedRouting for EliminationAnalyserClient<Client> {
#[instrument(skip_all)]
async fn perform_elimination_routing( async fn perform_elimination_routing(
&self, &self,
id: String, id: String,
@ -69,14 +71,15 @@ impl EliminationBasedRouting for EliminationAnalyserClient<Client> {
let config = configs.map(ForeignTryFrom::foreign_try_from).transpose()?; let config = configs.map(ForeignTryFrom::foreign_try_from).transpose()?;
let mut request = tonic::Request::new(EliminationRequest { let request = grpc_client::create_grpc_request(
id, EliminationRequest {
params, id,
labels, params,
config, labels,
}); config,
},
request.add_headers_to_grpc_request(headers); headers,
);
let response = self let response = self
.clone() .clone()
@ -87,9 +90,12 @@ impl EliminationBasedRouting for EliminationAnalyserClient<Client> {
))? ))?
.into_inner(); .into_inner();
logger::info!(dynamic_routing_response=?response);
Ok(response) Ok(response)
} }
#[instrument(skip_all)]
async fn update_elimination_bucket_config( async fn update_elimination_bucket_config(
&self, &self,
id: String, id: String,
@ -110,14 +116,15 @@ impl EliminationBasedRouting for EliminationAnalyserClient<Client> {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let mut request = tonic::Request::new(UpdateEliminationBucketRequest { let request = grpc_client::create_grpc_request(
id, UpdateEliminationBucketRequest {
params, id,
labels_with_bucket_name, params,
config, labels_with_bucket_name,
}); config,
},
request.add_headers_to_grpc_request(headers); headers,
);
let response = self let response = self
.clone() .clone()
@ -127,16 +134,19 @@ impl EliminationBasedRouting for EliminationAnalyserClient<Client> {
"Failed to update the elimination bucket".to_string(), "Failed to update the elimination bucket".to_string(),
))? ))?
.into_inner(); .into_inner();
logger::info!(dynamic_routing_response=?response);
Ok(response) Ok(response)
} }
#[instrument(skip_all)]
async fn invalidate_elimination_bucket( async fn invalidate_elimination_bucket(
&self, &self,
id: String, id: String,
headers: GrpcHeaders, headers: GrpcHeaders,
) -> DynamicRoutingResult<InvalidateBucketResponse> { ) -> DynamicRoutingResult<InvalidateBucketResponse> {
let mut request = tonic::Request::new(InvalidateBucketRequest { id }); let request = grpc_client::create_grpc_request(InvalidateBucketRequest { id }, headers);
request.add_headers_to_grpc_request(headers);
let response = self let response = self
.clone() .clone()
@ -146,6 +156,9 @@ impl EliminationBasedRouting for EliminationAnalyserClient<Client> {
"Failed to invalidate the elimination bucket".to_string(), "Failed to invalidate the elimination bucket".to_string(),
))? ))?
.into_inner(); .into_inner();
logger::info!(dynamic_routing_response=?response);
Ok(response) Ok(response)
} }
} }

View File

@ -4,6 +4,7 @@ use api_models::routing::{
}; };
use common_utils::{ext_traits::OptionExt, transformers::ForeignTryFrom}; use common_utils::{ext_traits::OptionExt, transformers::ForeignTryFrom};
use error_stack::ResultExt; use error_stack::ResultExt;
use router_env::{instrument, logger, tracing};
pub use success_rate::{ pub use success_rate::{
success_rate_calculator_client::SuccessRateCalculatorClient, CalSuccessRateConfig, success_rate_calculator_client::SuccessRateCalculatorClient, CalSuccessRateConfig,
CalSuccessRateRequest, CalSuccessRateResponse, CalSuccessRateRequest, CalSuccessRateResponse,
@ -15,13 +16,14 @@ pub use success_rate::{
missing_docs, missing_docs,
unused_qualifications, unused_qualifications,
clippy::unwrap_used, clippy::unwrap_used,
clippy::as_conversions clippy::as_conversions,
clippy::use_self
)] )]
pub mod success_rate { pub mod success_rate {
tonic::include_proto!("success_rate"); tonic::include_proto!("success_rate");
} }
use super::{Client, DynamicRoutingError, DynamicRoutingResult}; use super::{Client, DynamicRoutingError, DynamicRoutingResult};
use crate::grpc_client::{AddHeaders, GrpcHeaders}; use crate::grpc_client::{self, GrpcHeaders};
/// The trait Success Based Dynamic Routing would have the functions required to support the calculation and updation window /// The trait Success Based Dynamic Routing would have the functions required to support the calculation and updation window
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait SuccessBasedDynamicRouting: dyn_clone::DynClone + Send + Sync { pub trait SuccessBasedDynamicRouting: dyn_clone::DynClone + Send + Sync {
@ -53,6 +55,7 @@ pub trait SuccessBasedDynamicRouting: dyn_clone::DynClone + Send + Sync {
#[async_trait::async_trait] #[async_trait::async_trait]
impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Client> { impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Client> {
#[instrument(skip_all)]
async fn calculate_success_rate( async fn calculate_success_rate(
&self, &self,
id: String, id: String,
@ -71,14 +74,15 @@ impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Client> {
.map(ForeignTryFrom::foreign_try_from) .map(ForeignTryFrom::foreign_try_from)
.transpose()?; .transpose()?;
let mut request = tonic::Request::new(CalSuccessRateRequest { let request = grpc_client::create_grpc_request(
id, CalSuccessRateRequest {
params, id,
labels, params,
config, labels,
}); config,
},
request.add_headers_to_grpc_request(headers); headers,
);
let response = self let response = self
.clone() .clone()
@ -89,9 +93,12 @@ impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Client> {
))? ))?
.into_inner(); .into_inner();
logger::info!(dynamic_routing_response=?response);
Ok(response) Ok(response)
} }
#[instrument(skip_all)]
async fn update_success_rate( async fn update_success_rate(
&self, &self,
id: String, id: String,
@ -113,14 +120,15 @@ impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Client> {
}) })
.collect(); .collect();
let mut request = tonic::Request::new(UpdateSuccessRateWindowRequest { let request = grpc_client::create_grpc_request(
id, UpdateSuccessRateWindowRequest {
params, id,
labels_with_status, params,
config, labels_with_status,
}); config,
},
request.add_headers_to_grpc_request(headers); headers,
);
let response = self let response = self
.clone() .clone()
@ -131,16 +139,18 @@ impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Client> {
))? ))?
.into_inner(); .into_inner();
logger::info!(dynamic_routing_response=?response);
Ok(response) Ok(response)
} }
#[instrument(skip_all)]
async fn invalidate_success_rate_routing_keys( async fn invalidate_success_rate_routing_keys(
&self, &self,
id: String, id: String,
headers: GrpcHeaders, headers: GrpcHeaders,
) -> DynamicRoutingResult<InvalidateWindowsResponse> { ) -> DynamicRoutingResult<InvalidateWindowsResponse> {
let mut request = tonic::Request::new(InvalidateWindowsRequest { id }); let request = grpc_client::create_grpc_request(InvalidateWindowsRequest { id }, headers);
request.add_headers_to_grpc_request(headers);
let response = self let response = self
.clone() .clone()
@ -150,6 +160,9 @@ impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Client> {
"Failed to invalidate the success rate routing keys".to_string(), "Failed to invalidate the success rate routing keys".to_string(),
))? ))?
.into_inner(); .into_inner();
logger::info!(dynamic_routing_response=?response);
Ok(response) Ok(response)
} }
} }

View File

@ -37,6 +37,8 @@ use rand::{
distributions::{self, Distribution}, distributions::{self, Distribution},
SeedableRng, SeedableRng,
}; };
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
use router_env::{instrument, tracing};
use rustc_hash::FxHashMap; use rustc_hash::FxHashMap;
use storage_impl::redis::cache::{CacheKey, CGRAPH_CACHE, ROUTING_CACHE}; use storage_impl::redis::cache::{CacheKey, CGRAPH_CACHE, ROUTING_CACHE};
@ -1281,6 +1283,7 @@ pub fn make_dsl_input_for_surcharge(
/// success based dynamic routing /// success based dynamic routing
#[cfg(all(feature = "v1", feature = "dynamic_routing"))] #[cfg(all(feature = "v1", feature = "dynamic_routing"))]
#[instrument(skip_all)]
pub async fn perform_success_based_routing( pub async fn perform_success_based_routing(
state: &SessionState, state: &SessionState,
routable_connectors: Vec<api_routing::RoutableConnectorChoice>, routable_connectors: Vec<api_routing::RoutableConnectorChoice>,

View File

@ -55,7 +55,11 @@ message CurrentBlockThreshold {
} }
message UpdateSuccessRateWindowResponse { message UpdateSuccessRateWindowResponse {
string message = 1; enum UpdationStatus {
WINDOW_UPDATION_SUCCEEDED = 0;
WINDOW_UPDATION_FAILED = 1;
}
UpdationStatus status = 1;
} }
// API-3 types // API-3 types
@ -64,5 +68,9 @@ message InvalidateWindowsRequest {
} }
message InvalidateWindowsResponse { message InvalidateWindowsResponse {
string message = 1; enum InvalidationStatus {
WINDOW_INVALIDATION_SUCCEEDED = 0;
WINDOW_INVALIDATION_FAILED = 1;
}
InvalidationStatus status = 1;
} }