feat(injector): adding tracing to injector for dependency issues (#9124)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Shivansh Mathur
2025-09-01 14:46:13 +05:30
committed by GitHub
parent 971e17e0f3
commit aae1994ea1
4 changed files with 179 additions and 118 deletions

2
Cargo.lock generated
View File

@ -4402,6 +4402,7 @@ name = "injector"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"base64 0.22.1",
"common_utils", "common_utils",
"error-stack 0.4.1", "error-stack 0.4.1",
"masking 0.1.0", "masking 0.1.0",
@ -4412,6 +4413,7 @@ dependencies = [
"serde_json", "serde_json",
"thiserror 1.0.69", "thiserror 1.0.69",
"tokio 1.45.1", "tokio 1.45.1",
"tracing",
"url", "url",
] ]

View File

@ -18,13 +18,15 @@ masking = { version = "0.1.0", path = "../masking" }
router_env = { version = "0.1.0", path = "../router_env" } router_env = { version = "0.1.0", path = "../router_env" }
async-trait = { version = "0.1.88" } async-trait = { version = "0.1.88" }
base64 = { version = "0.22.1" }
error-stack = { version = "0.4.1" } error-stack = { version = "0.4.1" }
nom = { version = "7.1.3" } nom = { version = "7.1.3" }
reqwest = { version = "0.12.0", features = ["json", "stream"] } reqwest = { version = "0.12.0", features = ["json", "stream", "rustls-tls"] }
serde = { version = "1.0.219", features = ["derive"] } serde = { version = "1.0.219", features = ["derive"] }
serde_json = { version = "1.0.140" } serde_json = { version = "1.0.140" }
thiserror = { version = "1.0.69" } thiserror = { version = "1.0.69" }
tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] } tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] }
tracing = { version = "0.1.40" }
url = { version = "2.5.4", features = ["serde"] } url = { version = "2.5.4", features = ["serde"] }
[lints] [lints]

View File

@ -2,7 +2,10 @@ pub mod core {
use std::collections::HashMap; use std::collections::HashMap;
use async_trait::async_trait; use async_trait::async_trait;
use common_utils::request::{Method, RequestBuilder, RequestContent}; use common_utils::{
errors::CustomResult,
request::{Method, RequestBuilder, RequestContent},
};
use error_stack::ResultExt; use error_stack::ResultExt;
use masking::{self, ExposeInterface}; use masking::{self, ExposeInterface};
use nom::{ use nom::{
@ -55,6 +58,120 @@ pub mod core {
} }
} }
/// Error type for HTTP client creation
#[derive(Error, Debug)]
pub enum HttpClientError {
#[error("Client construction failed")]
ClientConstructionFailed,
#[error("Certificate decode failed")]
CertificateDecodeFailed,
}
/// Create identity from certificate and key for mutual TLS
pub fn create_identity_from_certificate_and_key(
encoded_certificate: masking::Secret<String>,
encoded_certificate_key: masking::Secret<String>,
) -> CustomResult<reqwest::Identity, HttpClientError> {
let certificate = encoded_certificate.expose();
let certificate_key = encoded_certificate_key.expose();
// Combine certificate and key into a single PEM block
let combined_pem = format!("{certificate_key}\n{certificate}");
reqwest::Identity::from_pem(combined_pem.as_bytes())
.change_context(HttpClientError::CertificateDecodeFailed)
}
/// Create certificate list from encoded certificate
pub fn create_certificate(
encoded_certificate: masking::Secret<String>,
) -> CustomResult<Vec<reqwest::Certificate>, HttpClientError> {
let certificate = encoded_certificate.expose();
reqwest::Certificate::from_pem_bundle(certificate.as_bytes())
.change_context(HttpClientError::CertificateDecodeFailed)
}
/// Get client builder with proxy configuration
fn get_client_builder(
proxy_config: &Proxy,
) -> CustomResult<reqwest::ClientBuilder, HttpClientError> {
let mut client_builder =
reqwest::Client::builder().redirect(reqwest::redirect::Policy::none());
// Configure proxy if provided
if let Some(url) = proxy_config.https_url.as_ref() {
if !url.is_empty() {
let proxy = reqwest::Proxy::https(url)
.change_context(HttpClientError::ClientConstructionFailed)?;
client_builder = client_builder.proxy(proxy);
}
}
if let Some(url) = proxy_config.http_url.as_ref() {
if !url.is_empty() && proxy_config.https_url.is_none() {
let proxy = reqwest::Proxy::http(url)
.change_context(HttpClientError::ClientConstructionFailed)?;
client_builder = client_builder.proxy(proxy);
}
}
Ok(client_builder)
}
/// Create HTTP client with proper certificate handling
#[allow(missing_docs)]
pub fn create_client(
proxy_config: &Proxy,
client_certificate: Option<masking::Secret<String>>,
client_certificate_key: Option<masking::Secret<String>>,
ca_certificate: Option<masking::Secret<String>>,
) -> CustomResult<reqwest::Client, HttpClientError> {
// Case 1: Mutual TLS with client certificate and key
if let (Some(encoded_certificate), Some(encoded_certificate_key)) =
(client_certificate.clone(), client_certificate_key.clone())
{
let client_builder = get_client_builder(proxy_config)?;
let identity = create_identity_from_certificate_and_key(
encoded_certificate.clone(),
encoded_certificate_key,
)?;
let certificate_list = create_certificate(encoded_certificate)?;
let client_builder = certificate_list
.into_iter()
.fold(client_builder, |client_builder, certificate| {
client_builder.add_root_certificate(certificate)
});
return client_builder
.identity(identity)
.use_rustls_tls()
.build()
.change_context(HttpClientError::ClientConstructionFailed)
.attach_printable(
"Failed to construct client with certificate and certificate key",
);
}
// Case 2: Use provided CA certificate for server authentication only (one-way TLS)
if let Some(ca_pem) = ca_certificate {
let pem = ca_pem.expose().replace("\\r\\n", "\n"); // Fix escaped newlines
let cert = reqwest::Certificate::from_pem(pem.as_bytes())
.change_context(HttpClientError::ClientConstructionFailed)
.attach_printable("Failed to parse CA certificate PEM block")?;
let client_builder = get_client_builder(proxy_config)?.add_root_certificate(cert);
return client_builder
.use_rustls_tls()
.build()
.change_context(HttpClientError::ClientConstructionFailed)
.attach_printable("Failed to construct client with CA certificate");
}
// Case 3: Default client (no certs)
get_client_builder(proxy_config)?
.build()
.change_context(HttpClientError::ClientConstructionFailed)
}
/// Simplified HTTP client for injector (copied from external_services to make injector standalone) /// Simplified HTTP client for injector (copied from external_services to make injector standalone)
/// This is a minimal implementation that covers the essential functionality needed by injector /// This is a minimal implementation that covers the essential functionality needed by injector
#[instrument(skip_all)] #[instrument(skip_all)]
@ -63,32 +180,14 @@ pub mod core {
request: common_utils::request::Request, request: common_utils::request::Request,
_option_timeout_secs: Option<u64>, _option_timeout_secs: Option<u64>,
) -> error_stack::Result<reqwest::Response, InjectorError> { ) -> error_stack::Result<reqwest::Response, InjectorError> {
logger::info!("Making HTTP request using standalone injector HTTP client"); // Use the proper create_client function
let client = create_client(
// Create reqwest client with proxy configuration client_proxy,
let mut client_builder = reqwest::Client::builder(); request.certificate.clone(),
request.certificate_key.clone(),
// Configure proxy if provided request.ca_certificate.clone(),
if let Some(proxy_url) = &client_proxy.https_url { )
let proxy = reqwest::Proxy::https(proxy_url).map_err(|e| { .map_err(|_e| error_stack::Report::new(InjectorError::HttpRequestFailed))?;
logger::error!("Failed to configure HTTPS proxy: {}", e);
error_stack::Report::new(InjectorError::HttpRequestFailed)
})?;
client_builder = client_builder.proxy(proxy);
}
if let Some(proxy_url) = &client_proxy.http_url {
let proxy = reqwest::Proxy::http(proxy_url).map_err(|e| {
logger::error!("Failed to configure HTTP proxy: {}", e);
error_stack::Report::new(InjectorError::HttpRequestFailed)
})?;
client_builder = client_builder.proxy(proxy);
}
let client = client_builder.build().map_err(|e| {
logger::error!("Failed to build HTTP client: {}", e);
error_stack::Report::new(InjectorError::HttpRequestFailed)
})?;
// Build the request // Build the request
let method = match request.method { let method = match request.method {
@ -99,7 +198,7 @@ pub mod core {
Method::Delete => reqwest::Method::DELETE, Method::Delete => reqwest::Method::DELETE,
}; };
let mut req_builder = client.request(method, &request.url); let mut req_builder = client.request(method.clone(), &request.url);
// Add headers // Add headers
for (key, value) in &request.headers { for (key, value) in &request.headers {
@ -123,15 +222,15 @@ pub mod core {
req_builder = req_builder.body(payload); req_builder = req_builder.body(payload);
} }
_ => { _ => {
logger::warn!("Unsupported request content type, using raw bytes"); // Unsupported request content type
} }
} }
} }
// Send the request // Send the request
let response = req_builder.send().await.map_err(|e| { let response = req_builder.send().await.map_err(|e| {
logger::error!("HTTP request failed: {}", e);
error_stack::Report::new(InjectorError::HttpRequestFailed) error_stack::Report::new(InjectorError::HttpRequestFailed)
.attach_printable(format!("HTTP request failed: {e}"))
})?; })?;
Ok(response) Ok(response)
@ -153,7 +252,6 @@ pub mod core {
pub async fn injector_core( pub async fn injector_core(
request: InjectorRequest, request: InjectorRequest,
) -> error_stack::Result<InjectorResponse, InjectorError> { ) -> error_stack::Result<InjectorResponse, InjectorError> {
logger::info!("Starting injector_core processing");
let injector = Injector::new(); let injector = Injector::new();
injector.injector_core(request).await injector.injector_core(request).await
} }
@ -325,12 +423,6 @@ pub mod core {
field_name: &str, field_name: &str,
vault_connector: &injector_types::VaultConnectors, vault_connector: &injector_types::VaultConnectors,
) -> error_stack::Result<Value, InjectorError> { ) -> error_stack::Result<Value, InjectorError> {
logger::debug!(
"Extracting field '{}' from vault data using vault type {:?}",
field_name,
vault_connector
);
match vault_data { match vault_data {
Value::Object(obj) => { Value::Object(obj) => {
let raw_value = find_field_recursively_in_vault_data(obj, field_name) let raw_value = find_field_recursively_in_vault_data(obj, field_name)
@ -356,16 +448,10 @@ pub mod core {
&self, &self,
extracted_field_value: Value, extracted_field_value: Value,
vault_connector: &injector_types::VaultConnectors, vault_connector: &injector_types::VaultConnectors,
field_name: &str, _field_name: &str,
) -> error_stack::Result<Value, InjectorError> { ) -> error_stack::Result<Value, InjectorError> {
match vault_connector { match vault_connector {
injector_types::VaultConnectors::VGS => { injector_types::VaultConnectors::VGS => Ok(extracted_field_value),
logger::debug!(
"VGS vault: Using direct token replacement for field '{}'",
field_name
);
Ok(extracted_field_value)
}
} }
} }
@ -376,7 +462,7 @@ pub mod core {
payload: &str, payload: &str,
content_type: &ContentType, content_type: &ContentType,
) -> error_stack::Result<Value, InjectorError> { ) -> error_stack::Result<Value, InjectorError> {
logger::info!( logger::debug!(
method = ?config.http_method, method = ?config.http_method,
base_url = %config.base_url, base_url = %config.base_url,
endpoint = %config.endpoint_path, endpoint = %config.endpoint_path,
@ -393,13 +479,8 @@ pub mod core {
)))?; )))?;
} }
// Construct URL safely by joining base URL with endpoint path // Construct URL by concatenating base URL with endpoint path
let url = config.base_url.join(&config.endpoint_path).map_err(|e| { let url = format!("{}{}", config.base_url, config.endpoint_path);
logger::error!("Failed to join base URL with endpoint path: {}", e);
error_stack::Report::new(InjectorError::InvalidTemplate(format!(
"Invalid URL construction: {e}"
)))
})?;
logger::debug!("Constructed URL: {}", url); logger::debug!("Constructed URL: {}", url);
@ -422,8 +503,8 @@ pub mod core {
Ok(json) => Some(RequestContent::Json(Box::new(json))), Ok(json) => Some(RequestContent::Json(Box::new(json))),
Err(e) => { Err(e) => {
logger::debug!( logger::debug!(
"Failed to parse payload as JSON: {}, falling back to raw bytes", error = %e,
e "Failed to parse payload as JSON, falling back to raw bytes"
); );
Some(RequestContent::RawBytes(payload.as_bytes().to_vec())) Some(RequestContent::RawBytes(payload.as_bytes().to_vec()))
} }
@ -448,7 +529,7 @@ pub mod core {
// Build request safely // Build request safely
let mut request_builder = RequestBuilder::new() let mut request_builder = RequestBuilder::new()
.method(method) .method(method)
.url(url.as_str()) .url(&url)
.headers(headers); .headers(headers);
if let Some(content) = request_content { if let Some(content) = request_content {
@ -471,8 +552,7 @@ pub mod core {
request_builder = request_builder.add_ca_certificate_pem(Some(ca_content.clone())); request_builder = request_builder.add_ca_certificate_pem(Some(ca_content.clone()));
} }
// Log certificate configuration (but not the actual content) logger::debug!(
logger::info!(
has_client_cert = config.client_cert.is_some(), has_client_cert = config.client_cert.is_some(),
has_client_key = config.client_key.is_some(), has_client_key = config.client_key.is_some(),
has_ca_cert = config.ca_cert.is_some(), has_ca_cert = config.ca_cert.is_some(),
@ -483,23 +563,25 @@ pub mod core {
let request = request_builder.build(); let request = request_builder.build();
logger::debug!(
url = %request.url,
method = ?request.method,
headers_count = request.headers.len(),
has_body = request.body.is_some(),
has_cert = request.certificate.is_some(),
has_key = request.certificate_key.is_some(),
has_ca = request.ca_certificate.is_some(),
"Built common_utils request successfully"
);
let proxy = if let Some(proxy_url) = &config.proxy_url { let proxy = if let Some(proxy_url) = &config.proxy_url {
logger::debug!("Using proxy: {}", proxy_url); let proxy_url_exposed = proxy_url.clone().expose();
// Determine if it's HTTP or HTTPS proxy based on URL scheme logger::debug!(proxy_url = %proxy_url_exposed, "Using proxy");
if proxy_url.scheme() == "https" { Proxy {
Proxy { http_url: Some(proxy_url_exposed.to_string()),
http_url: None, https_url: Some(proxy_url_exposed.to_string()),
https_url: Some(proxy_url.to_string()), idle_pool_connection_timeout: Some(90),
idle_pool_connection_timeout: Some(90), bypass_proxy_hosts: None,
bypass_proxy_hosts: None,
}
} else {
Proxy {
http_url: Some(proxy_url.to_string()),
https_url: None,
idle_pool_connection_timeout: Some(90),
bypass_proxy_hosts: None,
}
} }
} else { } else {
logger::debug!("No proxy configured, using direct connection"); logger::debug!("No proxy configured, using direct connection");
@ -510,7 +592,7 @@ pub mod core {
logger::debug!("Sending HTTP request to connector"); logger::debug!("Sending HTTP request to connector");
let response = send_request(&proxy, request, None).await?; let response = send_request(&proxy, request, None).await?;
logger::info!( logger::debug!(
status_code = response.status().as_u16(), status_code = response.status().as_u16(),
"Received response from connector" "Received response from connector"
); );
@ -533,8 +615,8 @@ pub mod core {
} }
Err(e) => { Err(e) => {
logger::debug!( logger::debug!(
"Failed to parse response as JSON: {}, returning as string", error = %e,
e "Failed to parse response as JSON, returning as string"
); );
Ok(Value::String(response_text)) Ok(Value::String(response_text))
} }
@ -555,8 +637,6 @@ pub mod core {
&self, &self,
request: InjectorRequest, request: InjectorRequest,
) -> error_stack::Result<InjectorResponse, InjectorError> { ) -> error_stack::Result<InjectorResponse, InjectorError> {
logger::info!("Starting token injection process");
let start_time = std::time::Instant::now(); let start_time = std::time::Instant::now();
// Convert API model to domain model // Convert API model to domain model
@ -569,12 +649,6 @@ pub mod core {
.expose() .expose()
.clone(); .clone();
logger::debug!(
template_length = domain_request.connector_payload.template.len(),
vault_connector = ?domain_request.token_data.vault_connector,
"Processing token injection request"
);
// Process template string directly with vault-specific logic // Process template string directly with vault-specific logic
let processed_payload = self.interpolate_string_template_with_vault_data( let processed_payload = self.interpolate_string_template_with_vault_data(
domain_request.connector_payload.template, domain_request.connector_payload.template,
@ -582,11 +656,6 @@ pub mod core {
&domain_request.token_data.vault_connector, &domain_request.token_data.vault_connector,
)?; )?;
logger::debug!(
processed_payload_length = processed_payload.len(),
"Token replacement completed"
);
// Determine content type from headers or default to form-urlencoded // Determine content type from headers or default to form-urlencoded
let content_type = domain_request let content_type = domain_request
.connection_config .connection_config
@ -613,14 +682,7 @@ pub mod core {
) )
.await?; .await?;
let elapsed = start_time.elapsed(); let _elapsed = start_time.elapsed();
logger::info!(
duration_ms = elapsed.as_millis(),
response_size = serde_json::to_string(&response_data)
.map(|s| s.len())
.unwrap_or(0),
"Token injection completed successfully"
);
// Return the raw connector response for connector-agnostic handling // Return the raw connector response for connector-agnostic handling
Ok(response_data) Ok(response_data)
@ -755,7 +817,7 @@ mod tests {
specific_token_data, specific_token_data,
}, },
connection_config: ConnectionConfig { connection_config: ConnectionConfig {
base_url: "https://api.stripe.com".parse().unwrap(), base_url: "https://api.stripe.com".to_string(),
endpoint_path: "/v1/payment_intents".to_string(), endpoint_path: "/v1/payment_intents".to_string(),
http_method: HttpMethod::POST, http_method: HttpMethod::POST,
headers, headers,
@ -776,7 +838,7 @@ mod tests {
// The request should succeed (httpbin.org should be accessible) // The request should succeed (httpbin.org should be accessible)
if let Err(ref e) = result { if let Err(ref e) = result {
logger::info!("Error: {e:?}"); logger::error!(error = ?e, "Injector core failed");
} }
assert!( assert!(
result.is_ok(), result.is_ok(),
@ -785,13 +847,11 @@ mod tests {
let response = result.unwrap(); let response = result.unwrap();
// Print the actual response for demonstration // Log the response for demonstration
logger::info!("=== HTTP RESPONSE FROM HTTPBIN.ORG ===");
logger::info!( logger::info!(
"{}", response = %serde_json::to_string_pretty(&response).unwrap_or_default(),
serde_json::to_string_pretty(&response).unwrap_or_default() "HTTP response from test endpoint"
); );
logger::info!("=======================================");
// Response should be a JSON value from httpbin.org // Response should be a JSON value from httpbin.org
assert!( assert!(
@ -832,11 +892,11 @@ mod tests {
specific_token_data, specific_token_data,
}, },
connection_config: ConnectionConfig { connection_config: ConnectionConfig {
base_url: "https://api.stripe.com".parse().unwrap(), base_url: "https://api.stripe.com".to_string(),
endpoint_path: "/v1/payment_intents".to_string(), endpoint_path: "/v1/payment_intents".to_string(),
http_method: HttpMethod::POST, http_method: HttpMethod::POST,
headers, headers,
proxy_url: Some("https://proxy.example.com:8443".parse().unwrap()), proxy_url: Some(masking::Secret::new("https://proxy.example.com:8443".to_string())),
// Certificate configuration - using insecure for testing // Certificate configuration - using insecure for testing
client_cert: None, client_cert: None,
client_key: None, client_key: None,
@ -858,13 +918,11 @@ mod tests {
let response = result.unwrap(); let response = result.unwrap();
// Print the actual response for demonstration // Log the response for demonstration
logger::info!("=== CERTIFICATE TEST RESPONSE ===");
logger::info!( logger::info!(
"{}", response = %serde_json::to_string_pretty(&response).unwrap_or_default(),
serde_json::to_string_pretty(&response).unwrap_or_default() "Certificate test response"
); );
logger::info!("================================");
// Verify the token was replaced in the JSON // Verify the token was replaced in the JSON
// httpbin.org returns the request data in the 'data' or 'json' field // httpbin.org returns the request data in the 'data' or 'json' field

View File

@ -4,7 +4,6 @@ pub mod models {
use common_utils::pii::SecretSerdeValue; use common_utils::pii::SecretSerdeValue;
use masking::Secret; use masking::Secret;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url;
// Enums for the injector - making it standalone // Enums for the injector - making it standalone
@ -74,7 +73,7 @@ pub mod models {
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ConnectionConfig { pub struct ConnectionConfig {
/// Base URL of the connector endpoint /// Base URL of the connector endpoint
pub base_url: Url, pub base_url: String,
/// Path to append to the base URL for the specific endpoint /// Path to append to the base URL for the specific endpoint
pub endpoint_path: String, pub endpoint_path: String,
/// HTTP method to use for the request /// HTTP method to use for the request
@ -82,7 +81,7 @@ pub mod models {
/// HTTP headers to include in the request /// HTTP headers to include in the request
pub headers: HashMap<String, Secret<String>>, pub headers: HashMap<String, Secret<String>>,
/// Optional proxy URL for routing the request through a proxy server /// Optional proxy URL for routing the request through a proxy server
pub proxy_url: Option<Url>, pub proxy_url: Option<Secret<String>>,
/// Optional client certificate for mutual TLS authentication /// Optional client certificate for mutual TLS authentication
pub client_cert: Option<Secret<String>>, pub client_cert: Option<Secret<String>>,
/// Optional client private key for mutual TLS authentication /// Optional client private key for mutual TLS authentication
@ -151,7 +150,7 @@ pub mod models {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct DomainConnectionConfig { pub struct DomainConnectionConfig {
/// Base URL of the connector endpoint /// Base URL of the connector endpoint
pub base_url: Url, pub base_url: String,
/// Path to append to the base URL for the specific endpoint /// Path to append to the base URL for the specific endpoint
pub endpoint_path: String, pub endpoint_path: String,
/// HTTP method to use for the request /// HTTP method to use for the request
@ -159,7 +158,7 @@ pub mod models {
/// HTTP headers to include in the request (values are masked for security) /// HTTP headers to include in the request (values are masked for security)
pub headers: HashMap<String, Secret<String>>, pub headers: HashMap<String, Secret<String>>,
/// Optional proxy URL for routing the request through a proxy server /// Optional proxy URL for routing the request through a proxy server
pub proxy_url: Option<Url>, pub proxy_url: Option<Secret<String>>,
/// Optional client certificate for mutual TLS authentication (masked) /// Optional client certificate for mutual TLS authentication (masked)
pub client_cert: Option<Secret<String>>, pub client_cert: Option<Secret<String>>,
/// Optional client private key for mutual TLS authentication (masked) /// Optional client private key for mutual TLS authentication (masked)