refactor(ucs): introduce dedicated gRPC header type and enhance lineage ID handling (#9275)

This commit is contained in:
Hrithikesh
2025-09-09 14:51:55 +05:30
committed by GitHub
parent 77498ee52d
commit 876ea3f61d
11 changed files with 147 additions and 67 deletions

22
Cargo.lock generated
View File

@ -3071,6 +3071,7 @@ dependencies = [
"rust-grpc-client",
"serde",
"serde_json",
"serde_urlencoded",
"thiserror 1.0.69",
"time",
"tokio 1.45.1",
@ -3078,6 +3079,7 @@ dependencies = [
"tonic-build",
"tonic-reflection",
"tonic-types",
"typed-builder",
"url",
"vaultrs",
]
@ -9350,6 +9352,26 @@ dependencies = [
"static_assertions",
]
[[package]]
name = "typed-builder"
version = "0.21.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fef81aec2ca29576f9f6ae8755108640d0a86dd3161b2e8bca6cfa554e98f77d"
dependencies = [
"typed-builder-macro",
]
[[package]]
name = "typed-builder-macro"
version = "0.21.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ecb9ecf7799210407c14a8cfdfe0173365780968dc57973ed082211958e0b18"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.101",
]
[[package]]
name = "typeid"
version = "1.0.2"

View File

@ -52,6 +52,7 @@ once_cell = "1.21.3"
serde = { version = "1.0.219", features = ["derive"] }
thiserror = "1.0.69"
serde_json = "1.0.140"
serde_urlencoded = "0.7.1"
vaultrs = { version = "0.7.4", optional = true }
prost = { version = "0.13", optional = true }
prost-types = { version = "0.13", optional = true }
@ -60,6 +61,7 @@ tokio = "1.45.1"
tonic = "0.13.1"
tonic-reflection = "0.13.1"
tonic-types = "0.13.1"
typed-builder = "0.21.2"
hyper-util = { version = "0.1.12", optional = true }
http-body-util = { version = "0.1.3", optional = true }
reqwest = { version = "0.11.27", features = ["rustls-tls"] }

View File

@ -14,6 +14,7 @@ use std::{fmt::Debug, sync::Arc};
#[cfg(feature = "dynamic_routing")]
use common_utils::consts;
use common_utils::id_type;
#[cfg(feature = "dynamic_routing")]
use dynamic_routing::{DynamicRoutingClientConfig, RoutingStrategy};
#[cfg(feature = "dynamic_routing")]
@ -22,8 +23,10 @@ use health_check_client::HealthCheckClient;
use hyper_util::client::legacy::connect::HttpConnector;
#[cfg(any(feature = "dynamic_routing", feature = "revenue_recovery"))]
use router_env::logger;
use serde_urlencoded;
#[cfg(any(feature = "dynamic_routing", feature = "revenue_recovery"))]
use tonic::body::Body;
use typed_builder::TypedBuilder;
#[cfg(feature = "revenue_recovery")]
pub use self::revenue_recovery::{
@ -148,6 +151,38 @@ pub struct GrpcHeaders {
pub request_id: Option<String>,
}
/// Contains grpc headers for Ucs
#[derive(Debug, TypedBuilder)]
pub struct GrpcHeadersUcs {
/// Tenant id
tenant_id: String,
/// Lineage ids
lineage_ids: LineageIds,
/// External vault proxy metadata
external_vault_proxy_metadata: Option<String>,
}
/// Type aliase for GrpcHeaders builder in initial stage
pub type GrpcHeadersUcsBuilderInitial = GrpcHeadersUcsBuilder<((String,), (), ())>;
/// Type aliase for GrpcHeaders builder in intermediate stage
pub type GrpcHeadersUcsBuilderIntermediate =
GrpcHeadersUcsBuilder<((String,), (), (Option<String>,))>;
/// struct to represent set of Lineage ids
#[derive(Debug, serde::Serialize)]
pub struct LineageIds {
merchant_id: id_type::MerchantId,
}
impl LineageIds {
/// constructor for LineageIds
pub fn new(merchant_id: id_type::MerchantId) -> Self {
Self { merchant_id }
}
/// get url encoded string representation of LineageIds
pub fn get_url_encoded_string(self) -> Result<String, serde_urlencoded::ser::Error> {
serde_urlencoded::to_string(&self)
}
}
#[cfg(feature = "dynamic_routing")]
/// Trait to add necessary headers to the tonic Request
pub(crate) trait AddHeaders {

View File

@ -18,7 +18,7 @@ use unified_connector_service_client::payments::{
use crate::{
consts,
grpc_client::{GrpcClientSettings, GrpcHeaders},
grpc_client::{GrpcClientSettings, GrpcHeadersUcs},
utils::deserialize_hashset,
};
@ -227,17 +227,13 @@ impl UnifiedConnectorServiceClient {
&self,
payment_authorize_request: payments_grpc::PaymentServiceAuthorizeRequest,
connector_auth_metadata: ConnectorAuthMetadata,
external_vault_proxy_metadata: Option<String>,
grpc_headers: GrpcHeaders,
grpc_headers: GrpcHeadersUcs,
) -> UnifiedConnectorServiceResult<tonic::Response<PaymentServiceAuthorizeResponse>> {
let mut request = tonic::Request::new(payment_authorize_request);
let connector_name = connector_auth_metadata.connector_name.clone();
let metadata = build_unified_connector_service_grpc_headers(
connector_auth_metadata,
external_vault_proxy_metadata,
grpc_headers,
)?;
let metadata =
build_unified_connector_service_grpc_headers(connector_auth_metadata, grpc_headers)?;
*request.metadata_mut() = metadata;
@ -261,17 +257,14 @@ impl UnifiedConnectorServiceClient {
&self,
payment_get_request: payments_grpc::PaymentServiceGetRequest,
connector_auth_metadata: ConnectorAuthMetadata,
grpc_headers: GrpcHeaders,
grpc_headers: GrpcHeadersUcs,
) -> UnifiedConnectorServiceResult<tonic::Response<payments_grpc::PaymentServiceGetResponse>>
{
let mut request = tonic::Request::new(payment_get_request);
let connector_name = connector_auth_metadata.connector_name.clone();
let metadata = build_unified_connector_service_grpc_headers(
connector_auth_metadata,
None,
grpc_headers,
)?;
let metadata =
build_unified_connector_service_grpc_headers(connector_auth_metadata, grpc_headers)?;
*request.metadata_mut() = metadata;
self.client
@ -294,17 +287,14 @@ impl UnifiedConnectorServiceClient {
&self,
payment_register_request: payments_grpc::PaymentServiceRegisterRequest,
connector_auth_metadata: ConnectorAuthMetadata,
grpc_headers: GrpcHeaders,
grpc_headers: GrpcHeadersUcs,
) -> UnifiedConnectorServiceResult<tonic::Response<payments_grpc::PaymentServiceRegisterResponse>>
{
let mut request = tonic::Request::new(payment_register_request);
let connector_name = connector_auth_metadata.connector_name.clone();
let metadata = build_unified_connector_service_grpc_headers(
connector_auth_metadata,
None,
grpc_headers,
)?;
let metadata =
build_unified_connector_service_grpc_headers(connector_auth_metadata, grpc_headers)?;
*request.metadata_mut() = metadata;
self.client
@ -327,18 +317,15 @@ impl UnifiedConnectorServiceClient {
&self,
payment_repeat_request: payments_grpc::PaymentServiceRepeatEverythingRequest,
connector_auth_metadata: ConnectorAuthMetadata,
grpc_headers: GrpcHeaders,
grpc_headers: GrpcHeadersUcs,
) -> UnifiedConnectorServiceResult<
tonic::Response<payments_grpc::PaymentServiceRepeatEverythingResponse>,
> {
let mut request = tonic::Request::new(payment_repeat_request);
let connector_name = connector_auth_metadata.connector_name.clone();
let metadata = build_unified_connector_service_grpc_headers(
connector_auth_metadata,
None,
grpc_headers,
)?;
let metadata =
build_unified_connector_service_grpc_headers(connector_auth_metadata, grpc_headers)?;
*request.metadata_mut() = metadata;
self.client
@ -361,16 +348,13 @@ impl UnifiedConnectorServiceClient {
&self,
webhook_transform_request: PaymentServiceTransformRequest,
connector_auth_metadata: ConnectorAuthMetadata,
grpc_headers: GrpcHeaders,
grpc_headers: GrpcHeadersUcs,
) -> UnifiedConnectorServiceResult<tonic::Response<PaymentServiceTransformResponse>> {
let mut request = tonic::Request::new(webhook_transform_request);
let connector_name = connector_auth_metadata.connector_name.clone();
let metadata = build_unified_connector_service_grpc_headers(
connector_auth_metadata,
None,
grpc_headers,
)?;
let metadata =
build_unified_connector_service_grpc_headers(connector_auth_metadata, grpc_headers)?;
*request.metadata_mut() = metadata;
self.client
@ -392,8 +376,7 @@ impl UnifiedConnectorServiceClient {
/// Build the gRPC Headers for Unified Connector Service Request
pub fn build_unified_connector_service_grpc_headers(
meta: ConnectorAuthMetadata,
external_vault_proxy_metadata: Option<String>,
grpc_headers: GrpcHeaders,
grpc_headers: GrpcHeadersUcs,
) -> Result<MetadataMap, UnifiedConnectorServiceError> {
let mut metadata = MetadataMap::new();
let parse =
@ -444,13 +427,25 @@ pub fn build_unified_connector_service_grpc_headers(
parse(common_utils_consts::X_MERCHANT_ID, meta.merchant_id.peek())?,
);
if let Some(external_vault_proxy_metadata) = external_vault_proxy_metadata {
if let Some(external_vault_proxy_metadata) = grpc_headers.external_vault_proxy_metadata {
metadata.append(
consts::UCS_HEADER_EXTERNAL_VAULT_METADATA,
parse("external_vault_metadata", &external_vault_proxy_metadata)?,
);
};
let lineage_ids_str = grpc_headers
.lineage_ids
.get_url_encoded_string()
.map_err(|err| {
logger::error!(?err);
UnifiedConnectorServiceError::HeaderInjectionFailed(consts::UCS_LINEAGE_IDS.to_string())
})?;
metadata.append(
consts::UCS_LINEAGE_IDS,
parse(consts::UCS_LINEAGE_IDS, &lineage_ids_str)?,
);
if let Err(err) = grpc_headers
.tenant_id
.parse()

View File

@ -94,6 +94,8 @@ pub mod consts {
/// Header key for sending the EXTERNAL VAULT METADATA in proxy payments
pub(crate) const UCS_HEADER_EXTERNAL_VAULT_METADATA: &str = "x-external-vault-metadata";
pub(crate) const UCS_LINEAGE_IDS: &str = "x-lineage-ids";
}
/// Metrics for interactions with external systems.

View File

@ -853,18 +853,20 @@ async fn call_unified_connector_service_authorize(
build_unified_connector_service_auth_metadata(merchant_connector_account, merchant_context)
.change_context(ApiErrorResponse::InternalServerError)
.attach_printable("Failed to construct request metadata")?;
let headers_builder = state
.get_grpc_headers_ucs()
.external_vault_proxy_metadata(None);
let updated_router_data = Box::pin(ucs_logging_wrapper(
router_data.clone(),
state,
payment_authorize_request,
|mut router_data, payment_authorize_request| async move {
headers_builder,
|mut router_data, payment_authorize_request, grpc_headers| async move {
let response = client
.payment_authorize(
payment_authorize_request,
connector_auth_metadata,
None,
state.get_grpc_headers(),
grpc_headers,
)
.await
.change_context(ApiErrorResponse::InternalServerError)
@ -924,17 +926,20 @@ async fn call_unified_connector_service_repeat_payment(
build_unified_connector_service_auth_metadata(merchant_connector_account, merchant_context)
.change_context(ApiErrorResponse::InternalServerError)
.attach_printable("Failed to construct request metadata")?;
let headers_builder = state
.get_grpc_headers_ucs()
.external_vault_proxy_metadata(None);
let updated_router_data = Box::pin(ucs_logging_wrapper(
router_data.clone(),
state,
payment_repeat_request,
|mut router_data, payment_repeat_request| async move {
headers_builder,
|mut router_data, payment_repeat_request, grpc_headers| async move {
let response = client
.payment_repeat(
payment_repeat_request,
connector_auth_metadata.clone(),
state.get_grpc_headers(),
grpc_headers,
)
.await
.change_context(ApiErrorResponse::InternalServerError)

View File

@ -393,18 +393,20 @@ impl Feature<api::ExternalVaultProxy, types::ExternalVaultProxyPaymentsData>
)
.change_context(ApiErrorResponse::InternalServerError)
.attach_printable("Failed to construct external vault proxy metadata")?;
let headers_builder = state
.get_grpc_headers_ucs()
.external_vault_proxy_metadata(Some(external_vault_proxy_metadata));
let updated_router_data = Box::pin(ucs_logging_wrapper(
self.clone(),
state,
payment_authorize_request.clone(),
|mut router_data, payment_authorize_request| async move {
headers_builder,
|mut router_data, payment_authorize_request, grpc_headers| async move {
let response = client
.payment_authorize(
payment_authorize_request,
connector_auth_metadata,
Some(external_vault_proxy_metadata),
state.get_grpc_headers(),
grpc_headers,
)
.await
.change_context(ApiErrorResponse::InternalServerError)

View File

@ -266,18 +266,17 @@ impl Feature<api::PSync, types::PaymentsSyncData>
)
.change_context(ApiErrorResponse::InternalServerError)
.attach_printable("Failed to construct request metadata")?;
let header_payload = state
.get_grpc_headers_ucs()
.external_vault_proxy_metadata(None);
let updated_router_data = Box::pin(ucs_logging_wrapper(
self.clone(),
state,
payment_get_request,
|mut router_data, payment_get_request| async move {
header_payload,
|mut router_data, payment_get_request, grpc_headers| async move {
let response = client
.payment_get(
payment_get_request,
connector_auth_metadata,
state.get_grpc_headers(),
)
.payment_get(payment_get_request, connector_auth_metadata, grpc_headers)
.await
.change_context(ApiErrorResponse::InternalServerError)
.attach_printable("Failed to get payment")?;

View File

@ -282,17 +282,20 @@ impl Feature<api::SetupMandate, types::SetupMandateRequestData> for types::Setup
)
.change_context(ApiErrorResponse::InternalServerError)
.attach_printable("Failed to construct request metadata")?;
let header_payload = state
.get_grpc_headers_ucs()
.external_vault_proxy_metadata(None);
let updated_router_data = Box::pin(ucs_logging_wrapper(
self.clone(),
state,
payment_register_request,
|mut router_data, payment_register_request| async move {
header_payload,
|mut router_data, payment_register_request, grpc_headers| async move {
let response = client
.payment_setup_mandate(
payment_register_request,
connector_auth_metadata,
state.get_grpc_headers(),
grpc_headers,
)
.await
.change_context(ApiErrorResponse::InternalServerError)

View File

@ -9,8 +9,9 @@ use common_utils::consts::BASE64_ENGINE;
use common_utils::{errors::CustomResult, ext_traits::ValueExt};
use diesel_models::types::FeatureMetadata;
use error_stack::ResultExt;
use external_services::grpc_client::unified_connector_service::{
ConnectorAuthMetadata, UnifiedConnectorServiceError,
use external_services::grpc_client::{
unified_connector_service::{ConnectorAuthMetadata, UnifiedConnectorServiceError},
LineageIds,
};
use hyperswitch_connectors::utils::CardData;
#[cfg(feature = "v2")]
@ -47,7 +48,6 @@ use crate::{
events::connector_api_logs::ConnectorEvent,
routes::SessionState,
types::transformers::ForeignTryFrom,
utils,
};
pub mod transformers;
@ -769,10 +769,13 @@ pub async fn call_unified_connector_service_for_webhook(
})?;
// Build gRPC headers
let grpc_headers = external_services::grpc_client::GrpcHeaders {
tenant_id: state.tenant.tenant_id.get_string_repr().to_string(),
request_id: Some(utils::generate_id(consts::ID_LENGTH, "webhook_req")),
};
let grpc_headers = state
.get_grpc_headers_ucs()
.lineage_ids(LineageIds::new(
merchant_context.get_merchant_account().get_id().clone(),
))
.external_vault_proxy_metadata(None)
.build();
// Make UCS call - client availability already verified
match ucs_client
@ -814,6 +817,7 @@ pub async fn ucs_logging_wrapper<T, F, Fut, Req, Resp, GrpcReq, GrpcResp>(
router_data: RouterData<T, Req, Resp>,
state: &SessionState,
grpc_request: GrpcReq,
grpc_header_builder: external_services::grpc_client::GrpcHeadersUcsBuilderIntermediate,
handler: F,
) -> RouterResult<RouterData<T, Req, Resp>>
where
@ -822,7 +826,12 @@ where
Resp: std::fmt::Debug + Clone + Send + Sync + 'static,
GrpcReq: serde::Serialize,
GrpcResp: serde::Serialize,
F: FnOnce(RouterData<T, Req, Resp>, GrpcReq) -> Fut + Send,
F: FnOnce(
RouterData<T, Req, Resp>,
GrpcReq,
external_services::grpc_client::GrpcHeadersUcs,
) -> Fut
+ Send,
Fut: std::future::Future<Output = RouterResult<(RouterData<T, Req, Resp>, GrpcResp)>> + Send,
{
tracing::Span::current().record("connector_name", &router_data.connector);
@ -835,7 +844,9 @@ where
let merchant_id = router_data.merchant_id.clone();
let refund_id = router_data.refund_id.clone();
let dispute_id = router_data.dispute_id.clone();
let grpc_header = grpc_header_builder
.lineage_ids(LineageIds::new(merchant_id.clone()))
.build();
// Log the actual gRPC request with masking
let grpc_request_body = masking::masked_serialize(&grpc_request)
.unwrap_or_else(|_| serde_json::json!({"error": "failed_to_serialize_grpc_request"}));
@ -857,7 +868,7 @@ where
// Execute UCS function and measure timing
let start_time = Instant::now();
let result = handler(router_data, grpc_request).await;
let result = handler(router_data, grpc_request, grpc_header).await;
let external_latency = start_time.elapsed().as_millis();
// Create and emit connector event after UCS call

View File

@ -17,7 +17,7 @@ use external_services::email::{
use external_services::grpc_client::revenue_recovery::GrpcRecoveryHeaders;
use external_services::{
file_storage::FileStorageInterface,
grpc_client::{GrpcClients, GrpcHeaders},
grpc_client::{GrpcClients, GrpcHeaders, GrpcHeadersUcs, GrpcHeadersUcsBuilderInitial},
};
use hyperswitch_interfaces::{
crm::CrmInterface,
@ -153,6 +153,10 @@ impl SessionState {
request_id: self.request_id.map(|req_id| (*req_id).to_string()),
}
}
pub fn get_grpc_headers_ucs(&self) -> GrpcHeadersUcsBuilderInitial {
let tenant_id = self.tenant.tenant_id.get_string_repr().to_string();
GrpcHeadersUcs::builder().tenant_id(tenant_id)
}
#[cfg(all(feature = "revenue_recovery", feature = "v2"))]
pub fn get_recovery_grpc_headers(&self) -> GrpcRecoveryHeaders {
GrpcRecoveryHeaders {