mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-29 17:19:15 +08:00
refactor(grpc): send x-tenant-id and x-request-id in grpc headers (#6904)
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
@ -146,3 +146,6 @@ pub const CONNECTOR_TRANSACTION_ID_HASH_BYTES: usize = 25;
|
|||||||
/// Apple Pay validation url
|
/// Apple Pay validation url
|
||||||
pub const APPLEPAY_VALIDATION_URL: &str =
|
pub const APPLEPAY_VALIDATION_URL: &str =
|
||||||
"https://apple-pay-gateway-cert.apple.com/paymentservices/startSession";
|
"https://apple-pay-gateway-cert.apple.com/paymentservices/startSession";
|
||||||
|
|
||||||
|
/// Request ID
|
||||||
|
pub const X_REQUEST_ID: &str = "x-request-id";
|
||||||
|
|||||||
@ -6,6 +6,8 @@ pub mod dynamic_routing;
|
|||||||
pub mod health_check_client;
|
pub mod health_check_client;
|
||||||
use std::{fmt::Debug, sync::Arc};
|
use std::{fmt::Debug, sync::Arc};
|
||||||
|
|
||||||
|
#[cfg(feature = "dynamic_routing")]
|
||||||
|
use common_utils::consts;
|
||||||
#[cfg(feature = "dynamic_routing")]
|
#[cfg(feature = "dynamic_routing")]
|
||||||
use dynamic_routing::{DynamicRoutingClientConfig, RoutingStrategy};
|
use dynamic_routing::{DynamicRoutingClientConfig, RoutingStrategy};
|
||||||
#[cfg(feature = "dynamic_routing")]
|
#[cfg(feature = "dynamic_routing")]
|
||||||
@ -16,6 +18,8 @@ use http_body_util::combinators::UnsyncBoxBody;
|
|||||||
use hyper::body::Bytes;
|
use hyper::body::Bytes;
|
||||||
#[cfg(feature = "dynamic_routing")]
|
#[cfg(feature = "dynamic_routing")]
|
||||||
use hyper_util::client::legacy::connect::HttpConnector;
|
use hyper_util::client::legacy::connect::HttpConnector;
|
||||||
|
#[cfg(feature = "dynamic_routing")]
|
||||||
|
use router_env::logger;
|
||||||
use serde;
|
use serde;
|
||||||
#[cfg(feature = "dynamic_routing")]
|
#[cfg(feature = "dynamic_routing")]
|
||||||
use tonic::Status;
|
use tonic::Status;
|
||||||
@ -76,3 +80,51 @@ impl GrpcClientSettings {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Contains grpc headers
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct GrpcHeaders {
|
||||||
|
/// Tenant id
|
||||||
|
pub tenant_id: String,
|
||||||
|
/// Request id
|
||||||
|
pub request_id: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "dynamic_routing")]
|
||||||
|
/// Trait to add necessary headers to the tonic Request
|
||||||
|
pub(crate) trait AddHeaders {
|
||||||
|
/// Add necessary header fields to the tonic Request
|
||||||
|
fn add_headers_to_grpc_request(&mut self, headers: GrpcHeaders);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "dynamic_routing")]
|
||||||
|
impl<T> AddHeaders for tonic::Request<T> {
|
||||||
|
#[track_caller]
|
||||||
|
fn add_headers_to_grpc_request(&mut self, headers: GrpcHeaders) {
|
||||||
|
headers.tenant_id
|
||||||
|
.parse()
|
||||||
|
.map(|tenant_id| {
|
||||||
|
self
|
||||||
|
.metadata_mut()
|
||||||
|
.append(consts::TENANT_HEADER, tenant_id)
|
||||||
|
})
|
||||||
|
.inspect_err(
|
||||||
|
|err| logger::warn!(header_parse_error=?err,"invalid {} received",consts::TENANT_HEADER),
|
||||||
|
)
|
||||||
|
.ok();
|
||||||
|
|
||||||
|
headers.request_id.map(|request_id| {
|
||||||
|
request_id
|
||||||
|
.parse()
|
||||||
|
.map(|request_id| {
|
||||||
|
self
|
||||||
|
.metadata_mut()
|
||||||
|
.append(consts::X_REQUEST_ID, request_id)
|
||||||
|
})
|
||||||
|
.inspect_err(
|
||||||
|
|err| logger::warn!(header_parse_error=?err,"invalid {} received",consts::X_REQUEST_ID),
|
||||||
|
)
|
||||||
|
.ok();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -21,6 +21,7 @@ pub mod elimination_rate {
|
|||||||
}
|
}
|
||||||
|
|
||||||
use super::{Client, DynamicRoutingError, DynamicRoutingResult};
|
use super::{Client, DynamicRoutingError, DynamicRoutingResult};
|
||||||
|
use crate::grpc_client::{AddHeaders, GrpcHeaders};
|
||||||
|
|
||||||
/// The trait Elimination Based Routing would have the functions required to support performance, calculation and invalidation bucket
|
/// The trait Elimination Based Routing would have the functions required to support performance, calculation and invalidation bucket
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
@ -32,6 +33,7 @@ pub trait EliminationBasedRouting: dyn_clone::DynClone + Send + Sync {
|
|||||||
params: String,
|
params: String,
|
||||||
labels: Vec<RoutableConnectorChoice>,
|
labels: Vec<RoutableConnectorChoice>,
|
||||||
configs: Option<EliminationConfig>,
|
configs: Option<EliminationConfig>,
|
||||||
|
headers: GrpcHeaders,
|
||||||
) -> DynamicRoutingResult<EliminationResponse>;
|
) -> DynamicRoutingResult<EliminationResponse>;
|
||||||
/// To update the bucket size and ttl for list of connectors with its respective bucket name
|
/// To update the bucket size and ttl for list of connectors with its respective bucket name
|
||||||
async fn update_elimination_bucket_config(
|
async fn update_elimination_bucket_config(
|
||||||
@ -40,11 +42,13 @@ pub trait EliminationBasedRouting: dyn_clone::DynClone + Send + Sync {
|
|||||||
params: String,
|
params: String,
|
||||||
report: Vec<RoutableConnectorChoiceWithBucketName>,
|
report: Vec<RoutableConnectorChoiceWithBucketName>,
|
||||||
config: Option<EliminationConfig>,
|
config: Option<EliminationConfig>,
|
||||||
|
headers: GrpcHeaders,
|
||||||
) -> DynamicRoutingResult<UpdateEliminationBucketResponse>;
|
) -> DynamicRoutingResult<UpdateEliminationBucketResponse>;
|
||||||
/// To invalidate the previous id's bucket
|
/// To invalidate the previous id's bucket
|
||||||
async fn invalidate_elimination_bucket(
|
async fn invalidate_elimination_bucket(
|
||||||
&self,
|
&self,
|
||||||
id: String,
|
id: String,
|
||||||
|
headers: GrpcHeaders,
|
||||||
) -> DynamicRoutingResult<InvalidateBucketResponse>;
|
) -> DynamicRoutingResult<InvalidateBucketResponse>;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -56,6 +60,7 @@ impl EliminationBasedRouting for EliminationAnalyserClient<Client> {
|
|||||||
params: String,
|
params: String,
|
||||||
label_input: Vec<RoutableConnectorChoice>,
|
label_input: Vec<RoutableConnectorChoice>,
|
||||||
configs: Option<EliminationConfig>,
|
configs: Option<EliminationConfig>,
|
||||||
|
headers: GrpcHeaders,
|
||||||
) -> DynamicRoutingResult<EliminationResponse> {
|
) -> DynamicRoutingResult<EliminationResponse> {
|
||||||
let labels = label_input
|
let labels = label_input
|
||||||
.into_iter()
|
.into_iter()
|
||||||
@ -64,13 +69,15 @@ impl EliminationBasedRouting for EliminationAnalyserClient<Client> {
|
|||||||
|
|
||||||
let config = configs.map(ForeignTryFrom::foreign_try_from).transpose()?;
|
let config = configs.map(ForeignTryFrom::foreign_try_from).transpose()?;
|
||||||
|
|
||||||
let request = tonic::Request::new(EliminationRequest {
|
let mut request = tonic::Request::new(EliminationRequest {
|
||||||
id,
|
id,
|
||||||
params,
|
params,
|
||||||
labels,
|
labels,
|
||||||
config,
|
config,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
request.add_headers_to_grpc_request(headers);
|
||||||
|
|
||||||
let response = self
|
let response = self
|
||||||
.clone()
|
.clone()
|
||||||
.get_elimination_status(request)
|
.get_elimination_status(request)
|
||||||
@ -89,6 +96,7 @@ impl EliminationBasedRouting for EliminationAnalyserClient<Client> {
|
|||||||
params: String,
|
params: String,
|
||||||
report: Vec<RoutableConnectorChoiceWithBucketName>,
|
report: Vec<RoutableConnectorChoiceWithBucketName>,
|
||||||
configs: Option<EliminationConfig>,
|
configs: Option<EliminationConfig>,
|
||||||
|
headers: GrpcHeaders,
|
||||||
) -> DynamicRoutingResult<UpdateEliminationBucketResponse> {
|
) -> DynamicRoutingResult<UpdateEliminationBucketResponse> {
|
||||||
let config = configs.map(ForeignTryFrom::foreign_try_from).transpose()?;
|
let config = configs.map(ForeignTryFrom::foreign_try_from).transpose()?;
|
||||||
|
|
||||||
@ -102,13 +110,15 @@ impl EliminationBasedRouting for EliminationAnalyserClient<Client> {
|
|||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let request = tonic::Request::new(UpdateEliminationBucketRequest {
|
let mut request = tonic::Request::new(UpdateEliminationBucketRequest {
|
||||||
id,
|
id,
|
||||||
params,
|
params,
|
||||||
labels_with_bucket_name,
|
labels_with_bucket_name,
|
||||||
config,
|
config,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
request.add_headers_to_grpc_request(headers);
|
||||||
|
|
||||||
let response = self
|
let response = self
|
||||||
.clone()
|
.clone()
|
||||||
.update_elimination_bucket(request)
|
.update_elimination_bucket(request)
|
||||||
@ -122,8 +132,11 @@ impl EliminationBasedRouting for EliminationAnalyserClient<Client> {
|
|||||||
async fn invalidate_elimination_bucket(
|
async fn invalidate_elimination_bucket(
|
||||||
&self,
|
&self,
|
||||||
id: String,
|
id: String,
|
||||||
|
headers: GrpcHeaders,
|
||||||
) -> DynamicRoutingResult<InvalidateBucketResponse> {
|
) -> DynamicRoutingResult<InvalidateBucketResponse> {
|
||||||
let request = tonic::Request::new(InvalidateBucketRequest { id });
|
let mut request = tonic::Request::new(InvalidateBucketRequest { id });
|
||||||
|
|
||||||
|
request.add_headers_to_grpc_request(headers);
|
||||||
|
|
||||||
let response = self
|
let response = self
|
||||||
.clone()
|
.clone()
|
||||||
|
|||||||
@ -21,6 +21,7 @@ pub mod success_rate {
|
|||||||
tonic::include_proto!("success_rate");
|
tonic::include_proto!("success_rate");
|
||||||
}
|
}
|
||||||
use super::{Client, DynamicRoutingError, DynamicRoutingResult};
|
use super::{Client, DynamicRoutingError, DynamicRoutingResult};
|
||||||
|
use crate::grpc_client::{AddHeaders, GrpcHeaders};
|
||||||
/// The trait Success Based Dynamic Routing would have the functions required to support the calculation and updation window
|
/// The trait Success Based Dynamic Routing would have the functions required to support the calculation and updation window
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
pub trait SuccessBasedDynamicRouting: dyn_clone::DynClone + Send + Sync {
|
pub trait SuccessBasedDynamicRouting: dyn_clone::DynClone + Send + Sync {
|
||||||
@ -31,6 +32,7 @@ pub trait SuccessBasedDynamicRouting: dyn_clone::DynClone + Send + Sync {
|
|||||||
success_rate_based_config: SuccessBasedRoutingConfig,
|
success_rate_based_config: SuccessBasedRoutingConfig,
|
||||||
params: String,
|
params: String,
|
||||||
label_input: Vec<RoutableConnectorChoice>,
|
label_input: Vec<RoutableConnectorChoice>,
|
||||||
|
headers: GrpcHeaders,
|
||||||
) -> DynamicRoutingResult<CalSuccessRateResponse>;
|
) -> DynamicRoutingResult<CalSuccessRateResponse>;
|
||||||
/// To update the success rate with the given label
|
/// To update the success rate with the given label
|
||||||
async fn update_success_rate(
|
async fn update_success_rate(
|
||||||
@ -39,11 +41,13 @@ pub trait SuccessBasedDynamicRouting: dyn_clone::DynClone + Send + Sync {
|
|||||||
success_rate_based_config: SuccessBasedRoutingConfig,
|
success_rate_based_config: SuccessBasedRoutingConfig,
|
||||||
params: String,
|
params: String,
|
||||||
response: Vec<RoutableConnectorChoiceWithStatus>,
|
response: Vec<RoutableConnectorChoiceWithStatus>,
|
||||||
|
headers: GrpcHeaders,
|
||||||
) -> DynamicRoutingResult<UpdateSuccessRateWindowResponse>;
|
) -> DynamicRoutingResult<UpdateSuccessRateWindowResponse>;
|
||||||
/// To invalidates the success rate routing keys
|
/// To invalidates the success rate routing keys
|
||||||
async fn invalidate_success_rate_routing_keys(
|
async fn invalidate_success_rate_routing_keys(
|
||||||
&self,
|
&self,
|
||||||
id: String,
|
id: String,
|
||||||
|
headers: GrpcHeaders,
|
||||||
) -> DynamicRoutingResult<InvalidateWindowsResponse>;
|
) -> DynamicRoutingResult<InvalidateWindowsResponse>;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -55,6 +59,7 @@ impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Client> {
|
|||||||
success_rate_based_config: SuccessBasedRoutingConfig,
|
success_rate_based_config: SuccessBasedRoutingConfig,
|
||||||
params: String,
|
params: String,
|
||||||
label_input: Vec<RoutableConnectorChoice>,
|
label_input: Vec<RoutableConnectorChoice>,
|
||||||
|
headers: GrpcHeaders,
|
||||||
) -> DynamicRoutingResult<CalSuccessRateResponse> {
|
) -> DynamicRoutingResult<CalSuccessRateResponse> {
|
||||||
let labels = label_input
|
let labels = label_input
|
||||||
.into_iter()
|
.into_iter()
|
||||||
@ -66,13 +71,15 @@ impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Client> {
|
|||||||
.map(ForeignTryFrom::foreign_try_from)
|
.map(ForeignTryFrom::foreign_try_from)
|
||||||
.transpose()?;
|
.transpose()?;
|
||||||
|
|
||||||
let request = tonic::Request::new(CalSuccessRateRequest {
|
let mut request = tonic::Request::new(CalSuccessRateRequest {
|
||||||
id,
|
id,
|
||||||
params,
|
params,
|
||||||
labels,
|
labels,
|
||||||
config,
|
config,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
request.add_headers_to_grpc_request(headers);
|
||||||
|
|
||||||
let response = self
|
let response = self
|
||||||
.clone()
|
.clone()
|
||||||
.fetch_success_rate(request)
|
.fetch_success_rate(request)
|
||||||
@ -91,6 +98,7 @@ impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Client> {
|
|||||||
success_rate_based_config: SuccessBasedRoutingConfig,
|
success_rate_based_config: SuccessBasedRoutingConfig,
|
||||||
params: String,
|
params: String,
|
||||||
label_input: Vec<RoutableConnectorChoiceWithStatus>,
|
label_input: Vec<RoutableConnectorChoiceWithStatus>,
|
||||||
|
headers: GrpcHeaders,
|
||||||
) -> DynamicRoutingResult<UpdateSuccessRateWindowResponse> {
|
) -> DynamicRoutingResult<UpdateSuccessRateWindowResponse> {
|
||||||
let config = success_rate_based_config
|
let config = success_rate_based_config
|
||||||
.config
|
.config
|
||||||
@ -105,13 +113,15 @@ impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Client> {
|
|||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let request = tonic::Request::new(UpdateSuccessRateWindowRequest {
|
let mut request = tonic::Request::new(UpdateSuccessRateWindowRequest {
|
||||||
id,
|
id,
|
||||||
params,
|
params,
|
||||||
labels_with_status,
|
labels_with_status,
|
||||||
config,
|
config,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
request.add_headers_to_grpc_request(headers);
|
||||||
|
|
||||||
let response = self
|
let response = self
|
||||||
.clone()
|
.clone()
|
||||||
.update_success_rate_window(request)
|
.update_success_rate_window(request)
|
||||||
@ -126,8 +136,11 @@ impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Client> {
|
|||||||
async fn invalidate_success_rate_routing_keys(
|
async fn invalidate_success_rate_routing_keys(
|
||||||
&self,
|
&self,
|
||||||
id: String,
|
id: String,
|
||||||
|
headers: GrpcHeaders,
|
||||||
) -> DynamicRoutingResult<InvalidateWindowsResponse> {
|
) -> DynamicRoutingResult<InvalidateWindowsResponse> {
|
||||||
let request = tonic::Request::new(InvalidateWindowsRequest { id });
|
let mut request = tonic::Request::new(InvalidateWindowsRequest { id });
|
||||||
|
|
||||||
|
request.add_headers_to_grpc_request(headers);
|
||||||
|
|
||||||
let response = self
|
let response = self
|
||||||
.clone()
|
.clone()
|
||||||
|
|||||||
@ -1343,6 +1343,7 @@ pub async fn perform_success_based_routing(
|
|||||||
success_based_routing_configs,
|
success_based_routing_configs,
|
||||||
success_based_routing_config_params,
|
success_based_routing_config_params,
|
||||||
routable_connectors,
|
routable_connectors,
|
||||||
|
state.get_grpc_headers(),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.change_context(errors::RoutingError::SuccessRateCalculationError)
|
.change_context(errors::RoutingError::SuccessRateCalculationError)
|
||||||
|
|||||||
@ -1408,7 +1408,10 @@ pub async fn success_based_routing_update_configs(
|
|||||||
.as_ref()
|
.as_ref()
|
||||||
.async_map(|sr_client| async {
|
.async_map(|sr_client| async {
|
||||||
sr_client
|
sr_client
|
||||||
.invalidate_success_rate_routing_keys(prefix_of_dynamic_routing_keys)
|
.invalidate_success_rate_routing_keys(
|
||||||
|
prefix_of_dynamic_routing_keys,
|
||||||
|
state.get_grpc_headers(),
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.change_context(errors::ApiErrorResponse::GenericNotFoundError {
|
.change_context(errors::ApiErrorResponse::GenericNotFoundError {
|
||||||
message: "Failed to invalidate the routing keys".to_string(),
|
message: "Failed to invalidate the routing keys".to_string(),
|
||||||
|
|||||||
@ -719,6 +719,7 @@ pub async fn push_metrics_with_update_window_for_success_based_routing(
|
|||||||
success_based_routing_configs.clone(),
|
success_based_routing_configs.clone(),
|
||||||
success_based_routing_config_params.clone(),
|
success_based_routing_config_params.clone(),
|
||||||
routable_connectors.clone(),
|
routable_connectors.clone(),
|
||||||
|
state.get_grpc_headers(),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||||
@ -855,6 +856,7 @@ pub async fn push_metrics_with_update_window_for_success_based_routing(
|
|||||||
},
|
},
|
||||||
payment_status_attribute == common_enums::AttemptStatus::Charged,
|
payment_status_attribute == common_enums::AttemptStatus::Charged,
|
||||||
)],
|
)],
|
||||||
|
state.get_grpc_headers(),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||||
|
|||||||
@ -12,7 +12,10 @@ use common_utils::id_type;
|
|||||||
use external_services::email::{
|
use external_services::email::{
|
||||||
no_email::NoEmailClient, ses::AwsSes, smtp::SmtpServer, EmailClientConfigs, EmailService,
|
no_email::NoEmailClient, ses::AwsSes, smtp::SmtpServer, EmailClientConfigs, EmailService,
|
||||||
};
|
};
|
||||||
use external_services::{file_storage::FileStorageInterface, grpc_client::GrpcClients};
|
use external_services::{
|
||||||
|
file_storage::FileStorageInterface,
|
||||||
|
grpc_client::{GrpcClients, GrpcHeaders},
|
||||||
|
};
|
||||||
use hyperswitch_interfaces::{
|
use hyperswitch_interfaces::{
|
||||||
encryption_interface::EncryptionManagementInterface,
|
encryption_interface::EncryptionManagementInterface,
|
||||||
secrets_interface::secret_state::{RawSecret, SecuredSecret},
|
secrets_interface::secret_state::{RawSecret, SecuredSecret},
|
||||||
@ -119,6 +122,12 @@ impl SessionState {
|
|||||||
event_context: events::EventContext::new(self.event_handler.clone()),
|
event_context: events::EventContext::new(self.event_handler.clone()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pub fn get_grpc_headers(&self) -> GrpcHeaders {
|
||||||
|
GrpcHeaders {
|
||||||
|
tenant_id: self.tenant.tenant_id.get_string_repr().to_string(),
|
||||||
|
request_id: self.request_id.map(|req_id| (*req_id).to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait SessionStateInfo {
|
pub trait SessionStateInfo {
|
||||||
|
|||||||
Reference in New Issue
Block a user