chore(users): add hubspot tracking to prod intent (#7798)

Co-authored-by: Gnanasundari24 <118818938+Gnanasundari24@users.noreply.github.com>
This commit is contained in:
Riddhiagrawal001
2025-05-13 17:25:27 +05:30
committed by GitHub
parent 1dabfe3e2c
commit 67f38f864e
36 changed files with 923 additions and 420 deletions

View File

@ -0,0 +1,148 @@
use std::sync::Arc;
use common_utils::{
errors::CustomResult,
ext_traits::ConfigExt,
request::{Method, Request, RequestBuilder, RequestContent},
};
use error_stack::ResultExt;
use http::header;
use hyperswitch_interfaces::{
crm::{CrmInterface, CrmPayload},
errors::HttpClientError,
types::Proxy,
};
use reqwest;
use router_env::logger;
use crate::{http_client, hubspot_proxy::HubspotRequest};
/// Hubspot Crm configuration
#[derive(Debug, Clone, serde::Deserialize)]
pub struct HubspotProxyConfig {
/// The ID of the Hubspot form to be submitted.
pub form_id: String,
/// The URL to which the Hubspot form data will be sent.
pub request_url: String,
}
impl HubspotProxyConfig {
/// Validates Hubspot configuration
pub(super) fn validate(&self) -> Result<(), InvalidCrmConfig> {
use common_utils::fp_utils::when;
when(self.request_url.is_default_or_empty(), || {
Err(InvalidCrmConfig("request url must not be empty"))
})?;
when(self.form_id.is_default_or_empty(), || {
Err(InvalidCrmConfig("form_id must not be empty"))
})
}
}
/// Error thrown when the crm config is invalid
#[derive(Debug, Clone)]
pub struct InvalidCrmConfig(pub &'static str);
impl std::error::Error for InvalidCrmConfig {}
impl std::fmt::Display for InvalidCrmConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "crm: {}", self.0)
}
}
#[derive(Debug, Clone, Copy)]
/// NoCrm struct
pub struct NoCrm;
/// Enum representing different Crm configurations
#[derive(Debug, Clone, Default, serde::Deserialize)]
#[serde(tag = "crm_manager")]
#[serde(rename_all = "snake_case")]
pub enum CrmManagerConfig {
/// Hubspot Crm configuration
HubspotProxy {
/// Hubspot Crm configuration
hubspot_proxy: HubspotProxyConfig,
},
/// No Crm configuration
#[default]
NoCrm,
}
impl CrmManagerConfig {
/// Verifies that the client configuration is usable
pub fn validate(&self) -> Result<(), InvalidCrmConfig> {
match self {
Self::HubspotProxy { hubspot_proxy } => hubspot_proxy.validate(),
Self::NoCrm => Ok(()),
}
}
/// Retrieves the appropriate Crm client based on the configuration.
pub async fn get_crm_client(&self) -> Arc<dyn CrmInterface> {
match self {
Self::HubspotProxy { hubspot_proxy } => Arc::new(hubspot_proxy.clone()),
Self::NoCrm => Arc::new(NoCrm),
}
}
}
#[async_trait::async_trait]
impl CrmInterface for NoCrm {
async fn make_body(&self, _details: CrmPayload) -> RequestContent {
RequestContent::Json(Box::new(()))
}
async fn make_request(&self, _body: RequestContent, _origin_base_url: String) -> Request {
RequestBuilder::default().build()
}
async fn send_request(
&self,
_proxy: &Proxy,
_request: Request,
) -> CustomResult<reqwest::Response, HttpClientError> {
logger::info!("No CRM configured!");
Err(HttpClientError::UnexpectedState).attach_printable("No CRM configured!")
}
}
#[async_trait::async_trait]
impl CrmInterface for HubspotProxyConfig {
async fn make_body(&self, details: CrmPayload) -> RequestContent {
RequestContent::FormUrlEncoded(Box::new(HubspotRequest::new(
details.business_country_name.unwrap_or_default(),
self.form_id.clone(),
details.poc_name.unwrap_or_default(),
details.poc_email.clone().unwrap_or_default(),
details.legal_business_name.unwrap_or_default(),
details.business_website.unwrap_or_default(),
)))
}
async fn make_request(&self, body: RequestContent, origin_base_url: String) -> Request {
RequestBuilder::new()
.method(Method::Post)
.url(self.request_url.as_str())
.set_body(body)
.attach_default_headers()
.headers(vec![(
header::ORIGIN.to_string(),
format!("{origin_base_url}/dashboard").into(),
)])
.build()
}
async fn send_request(
&self,
proxy: &Proxy,
request: Request,
) -> CustomResult<reqwest::Response, HttpClientError> {
http_client::send_request(proxy, request, None).await
}
}

View File

@ -0,0 +1,182 @@
use common_utils::{consts, errors::CustomResult, request::Request};
use hyperswitch_interfaces::{errors::HttpClientError, types::Proxy};
use request::{HeaderExt, RequestBuilderExt};
use router_env::{instrument, logger, tracing};
/// client module
pub mod client;
/// metrics module
pub mod metrics;
/// request module
pub mod request;
use std::{error::Error, time::Duration};
use common_utils::request::RequestContent;
pub use common_utils::request::{ContentType, Method, RequestBuilder};
use error_stack::ResultExt;
#[allow(missing_docs)]
#[instrument(skip_all)]
pub async fn send_request(
client_proxy: &Proxy,
request: Request,
option_timeout_secs: Option<u64>,
) -> CustomResult<reqwest::Response, HttpClientError> {
logger::info!(method=?request.method, headers=?request.headers, payload=?request.body, ?request);
let url = url::Url::parse(&request.url).change_context(HttpClientError::UrlParsingFailed)?;
let client = client::create_client(client_proxy, request.certificate, request.certificate_key)?;
let headers = request.headers.construct_header_map()?;
let metrics_tag = router_env::metric_attributes!((
consts::METRICS_HOST_TAG_NAME,
url.host_str().unwrap_or_default().to_owned()
));
let request = {
match request.method {
Method::Get => client.get(url),
Method::Post => {
let client = client.post(url);
match request.body {
Some(RequestContent::Json(payload)) => client.json(&payload),
Some(RequestContent::FormData(form)) => client.multipart(form),
Some(RequestContent::FormUrlEncoded(payload)) => client.form(&payload),
Some(RequestContent::Xml(payload)) => {
let body = quick_xml::se::to_string(&payload)
.change_context(HttpClientError::BodySerializationFailed)?;
client.body(body).header("Content-Type", "application/xml")
}
Some(RequestContent::RawBytes(payload)) => client.body(payload),
None => client,
}
}
Method::Put => {
let client = client.put(url);
match request.body {
Some(RequestContent::Json(payload)) => client.json(&payload),
Some(RequestContent::FormData(form)) => client.multipart(form),
Some(RequestContent::FormUrlEncoded(payload)) => client.form(&payload),
Some(RequestContent::Xml(payload)) => {
let body = quick_xml::se::to_string(&payload)
.change_context(HttpClientError::BodySerializationFailed)?;
client.body(body).header("Content-Type", "application/xml")
}
Some(RequestContent::RawBytes(payload)) => client.body(payload),
None => client,
}
}
Method::Patch => {
let client = client.patch(url);
match request.body {
Some(RequestContent::Json(payload)) => client.json(&payload),
Some(RequestContent::FormData(form)) => client.multipart(form),
Some(RequestContent::FormUrlEncoded(payload)) => client.form(&payload),
Some(RequestContent::Xml(payload)) => {
let body = quick_xml::se::to_string(&payload)
.change_context(HttpClientError::BodySerializationFailed)?;
client.body(body).header("Content-Type", "application/xml")
}
Some(RequestContent::RawBytes(payload)) => client.body(payload),
None => client,
}
}
Method::Delete => client.delete(url),
}
.add_headers(headers)
.timeout(Duration::from_secs(
option_timeout_secs.unwrap_or(consts::REQUEST_TIME_OUT),
))
};
// We cannot clone the request type, because it has Form trait which is not cloneable. So we are cloning the request builder here.
let cloned_send_request = request.try_clone().map(|cloned_request| async {
cloned_request
.send()
.await
.map_err(|error| match error {
error if error.is_timeout() => {
metrics::REQUEST_BUILD_FAILURE.add(1, metrics_tag);
HttpClientError::RequestTimeoutReceived
}
error if is_connection_closed_before_message_could_complete(&error) => {
metrics::REQUEST_BUILD_FAILURE.add(1, metrics_tag);
HttpClientError::ConnectionClosedIncompleteMessage
}
_ => HttpClientError::RequestNotSent(error.to_string()),
})
.attach_printable("Unable to send request to connector")
});
let send_request = async {
request
.send()
.await
.map_err(|error| match error {
error if error.is_timeout() => {
metrics::REQUEST_BUILD_FAILURE.add(1, metrics_tag);
HttpClientError::RequestTimeoutReceived
}
error if is_connection_closed_before_message_could_complete(&error) => {
metrics::REQUEST_BUILD_FAILURE.add(1, metrics_tag);
HttpClientError::ConnectionClosedIncompleteMessage
}
_ => HttpClientError::RequestNotSent(error.to_string()),
})
.attach_printable("Unable to send request to connector")
};
let response = common_utils::metrics::utils::record_operation_time(
send_request,
&metrics::EXTERNAL_REQUEST_TIME,
metrics_tag,
)
.await;
// Retry once if the response is connection closed.
//
// This is just due to the racy nature of networking.
// hyper has a connection pool of idle connections, and it selected one to send your request.
// Most of the time, hyper will receive the servers FIN and drop the dead connection from its pool.
// But occasionally, a connection will be selected from the pool
// and written to at the same time the server is deciding to close the connection.
// Since hyper already wrote some of the request,
// it cant really retry it automatically on a new connection, since the server may have acted already
match response {
Ok(response) => Ok(response),
Err(error)
if error.current_context() == &HttpClientError::ConnectionClosedIncompleteMessage =>
{
metrics::AUTO_RETRY_CONNECTION_CLOSED.add(1, metrics_tag);
match cloned_send_request {
Some(cloned_request) => {
logger::info!(
"Retrying request due to connection closed before message could complete"
);
common_utils::metrics::utils::record_operation_time(
cloned_request,
&metrics::EXTERNAL_REQUEST_TIME,
metrics_tag,
)
.await
}
None => {
logger::info!("Retrying request due to connection closed before message could complete failed as request is not cloneable");
Err(error)
}
}
}
err @ Err(_) => err,
}
}
fn is_connection_closed_before_message_could_complete(error: &reqwest::Error) -> bool {
let mut source = error.source();
while let Some(err) = source {
if let Some(hyper_err) = err.downcast_ref::<hyper::Error>() {
if hyper_err.is_incomplete_message() {
return true;
}
}
source = err.source();
}
false
}

View File

@ -0,0 +1,131 @@
use std::time::Duration;
use base64::Engine;
use common_utils::consts::BASE64_ENGINE;
pub use common_utils::errors::CustomResult;
use error_stack::ResultExt;
use hyperswitch_interfaces::{errors::HttpClientError, types::Proxy};
use masking::ExposeInterface;
use once_cell::sync::OnceCell;
static DEFAULT_CLIENT: OnceCell<reqwest::Client> = OnceCell::new();
// We may need to use outbound proxy to connect to external world.
// Precedence will be the environment variables, followed by the config.
#[allow(missing_docs)]
pub fn create_client(
proxy_config: &Proxy,
client_certificate: Option<masking::Secret<String>>,
client_certificate_key: Option<masking::Secret<String>>,
) -> CustomResult<reqwest::Client, HttpClientError> {
match (client_certificate, client_certificate_key) {
(Some(encoded_certificate), Some(encoded_certificate_key)) => {
let client_builder = get_client_builder(proxy_config)?;
let identity = create_identity_from_certificate_and_key(
encoded_certificate.clone(),
encoded_certificate_key,
)?;
let certificate_list = create_certificate(encoded_certificate)?;
let client_builder = certificate_list
.into_iter()
.fold(client_builder, |client_builder, certificate| {
client_builder.add_root_certificate(certificate)
});
client_builder
.identity(identity)
.use_rustls_tls()
.build()
.change_context(HttpClientError::ClientConstructionFailed)
.attach_printable("Failed to construct client with certificate and certificate key")
}
_ => get_base_client(proxy_config),
}
}
#[allow(missing_docs)]
pub fn get_client_builder(
proxy_config: &Proxy,
) -> CustomResult<reqwest::ClientBuilder, HttpClientError> {
let mut client_builder = reqwest::Client::builder()
.redirect(reqwest::redirect::Policy::none())
.pool_idle_timeout(Duration::from_secs(
proxy_config
.idle_pool_connection_timeout
.unwrap_or_default(),
));
let proxy_exclusion_config =
reqwest::NoProxy::from_string(&proxy_config.bypass_proxy_hosts.clone().unwrap_or_default());
// Proxy all HTTPS traffic through the configured HTTPS proxy
if let Some(url) = proxy_config.https_url.as_ref() {
client_builder = client_builder.proxy(
reqwest::Proxy::https(url)
.change_context(HttpClientError::InvalidProxyConfiguration)
.attach_printable("HTTPS proxy configuration error")?
.no_proxy(proxy_exclusion_config.clone()),
);
}
// Proxy all HTTP traffic through the configured HTTP proxy
if let Some(url) = proxy_config.http_url.as_ref() {
client_builder = client_builder.proxy(
reqwest::Proxy::http(url)
.change_context(HttpClientError::InvalidProxyConfiguration)
.attach_printable("HTTP proxy configuration error")?
.no_proxy(proxy_exclusion_config),
);
}
Ok(client_builder)
}
#[allow(missing_docs)]
pub fn create_identity_from_certificate_and_key(
encoded_certificate: masking::Secret<String>,
encoded_certificate_key: masking::Secret<String>,
) -> Result<reqwest::Identity, error_stack::Report<HttpClientError>> {
let decoded_certificate = BASE64_ENGINE
.decode(encoded_certificate.expose())
.change_context(HttpClientError::CertificateDecodeFailed)?;
let decoded_certificate_key = BASE64_ENGINE
.decode(encoded_certificate_key.expose())
.change_context(HttpClientError::CertificateDecodeFailed)?;
let certificate = String::from_utf8(decoded_certificate)
.change_context(HttpClientError::CertificateDecodeFailed)?;
let certificate_key = String::from_utf8(decoded_certificate_key)
.change_context(HttpClientError::CertificateDecodeFailed)?;
let key_chain = format!("{}{}", certificate_key, certificate);
reqwest::Identity::from_pem(key_chain.as_bytes())
.change_context(HttpClientError::CertificateDecodeFailed)
}
#[allow(missing_docs)]
pub fn create_certificate(
encoded_certificate: masking::Secret<String>,
) -> Result<Vec<reqwest::Certificate>, error_stack::Report<HttpClientError>> {
let decoded_certificate = BASE64_ENGINE
.decode(encoded_certificate.expose())
.change_context(HttpClientError::CertificateDecodeFailed)?;
let certificate = String::from_utf8(decoded_certificate)
.change_context(HttpClientError::CertificateDecodeFailed)?;
reqwest::Certificate::from_pem_bundle(certificate.as_bytes())
.change_context(HttpClientError::CertificateDecodeFailed)
}
fn get_base_client(proxy_config: &Proxy) -> CustomResult<reqwest::Client, HttpClientError> {
Ok(DEFAULT_CLIENT
.get_or_try_init(|| {
get_client_builder(proxy_config)?
.build()
.change_context(HttpClientError::ClientConstructionFailed)
.attach_printable("Failed to construct base client")
})?
.clone())
}

View File

@ -0,0 +1,9 @@
use router_env::{counter_metric, global_meter, histogram_metric_f64};
global_meter!(GLOBAL_METER, "ROUTER_API");
counter_metric!(REQUEST_BUILD_FAILURE, GLOBAL_METER);
histogram_metric_f64!(EXTERNAL_REQUEST_TIME, GLOBAL_METER);
counter_metric!(AUTO_RETRY_CONNECTION_CLOSED, GLOBAL_METER);

View File

@ -0,0 +1,45 @@
use std::str::FromStr;
use common_utils::request::Headers;
pub use common_utils::{errors::CustomResult, request::ContentType};
use error_stack::ResultExt;
use hyperswitch_interfaces::errors::HttpClientError;
pub use masking::{Mask, Maskable};
use router_env::{instrument, tracing};
#[allow(missing_docs)]
pub trait HeaderExt {
fn construct_header_map(self) -> CustomResult<reqwest::header::HeaderMap, HttpClientError>;
}
impl HeaderExt for Headers {
fn construct_header_map(self) -> CustomResult<reqwest::header::HeaderMap, HttpClientError> {
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
self.into_iter().try_fold(
HeaderMap::new(),
|mut header_map, (header_name, header_value)| {
let header_name = HeaderName::from_str(&header_name)
.change_context(HttpClientError::HeaderMapConstructionFailed)?;
let header_value = header_value.into_inner();
let header_value = HeaderValue::from_str(&header_value)
.change_context(HttpClientError::HeaderMapConstructionFailed)?;
header_map.append(header_name, header_value);
Ok(header_map)
},
)
}
}
#[allow(missing_docs)]
pub trait RequestBuilderExt {
fn add_headers(self, headers: reqwest::header::HeaderMap) -> Self;
}
impl RequestBuilderExt for reqwest::RequestBuilder {
#[instrument(skip_all)]
fn add_headers(mut self, headers: reqwest::header::HeaderMap) -> Self {
self = self.headers(headers);
self
}
}

View File

@ -0,0 +1,78 @@
use masking::Secret;
/// Lead source constant for Hubspot
pub const HUBSPOT_LEAD_SOURCE: &str = "Hyperswitch Dashboard";
/// Struct representing a request to Hubspot
#[derive(Clone, Debug, serde::Serialize, Default)]
pub struct HubspotRequest {
/// Indicates whether Hubspot should be used.
#[serde(rename = "useHubspot")]
pub use_hubspot: bool,
/// The country of the user or company.
pub country: String,
/// The ID of the Hubspot form being submitted.
#[serde(rename = "hubspotFormId")]
pub hubspot_form_id: String,
/// The first name of the user.
pub firstname: Secret<String>,
/// The last name of the user.
pub lastname: Secret<String>,
/// The email address of the user.
pub email: Secret<String>,
/// The name of the company.
#[serde(rename = "companyName")]
pub company_name: String,
/// The source of the lead, typically set to "Hyperswitch Dashboard".
pub lead_source: String,
/// The website URL of the company.
pub website: String,
/// The phone number of the user.
pub phone: Secret<String>,
/// The role or designation of the user.
pub role: String,
/// The monthly GMV (Gross Merchandise Value) of the company.
#[serde(rename = "monthlyGMV")]
pub monthly_gmv: String,
/// Notes from the business development team.
pub bd_notes: String,
/// Additional message or comments.
pub message: String,
}
#[allow(missing_docs)]
impl HubspotRequest {
pub fn new(
country: String,
hubspot_form_id: String,
firstname: Secret<String>,
email: Secret<String>,
company_name: String,
website: String,
) -> Self {
Self {
use_hubspot: true,
country,
hubspot_form_id,
firstname,
email,
company_name,
lead_source: HUBSPOT_LEAD_SOURCE.to_string(),
website,
..Default::default()
}
}
}

View File

@ -17,8 +17,17 @@ pub mod no_encryption;
/// Building grpc clients to communicate with the server
pub mod grpc_client;
/// http_client module
pub mod http_client;
/// hubspot_proxy module
pub mod hubspot_proxy;
pub mod managers;
/// crm module
pub mod crm;
/// Crate specific constants
#[cfg(feature = "aws_kms")]
pub mod consts {