mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-29 09:07:09 +08:00
feat(trace): add optional sampling behaviour for routes (#2511)
Co-authored-by: Arun Raj M <jarnura47@gmail.com>
This commit is contained in:
@ -28,6 +28,7 @@ port = 5432 # DB Port
|
||||
dbname = "hyperswitch_db" # Name of Database
|
||||
pool_size = 5 # Number of connections to keep open
|
||||
connection_timeout = 10 # Timeout for database connection in seconds
|
||||
queue_strategy = "Fifo" # Add the queue strategy used by the database bb8 client
|
||||
|
||||
# Replica SQL data store credentials
|
||||
[replica_database]
|
||||
@ -38,6 +39,7 @@ port = 5432 # DB Port
|
||||
dbname = "hyperswitch_db" # Name of Database
|
||||
pool_size = 5 # Number of connections to keep open
|
||||
connection_timeout = 10 # Timeout for database connection in seconds
|
||||
queue_strategy = "Fifo" # Add the queue strategy used by the database bb8 client
|
||||
|
||||
# Redis credentials
|
||||
[redis]
|
||||
@ -93,6 +95,7 @@ sampling_rate = 0.1 # decimal rate between 0.0
|
||||
otel_exporter_otlp_endpoint = "http://localhost:4317" # endpoint to send metrics and traces to, can include port number
|
||||
otel_exporter_otlp_timeout = 5000 # timeout (in milliseconds) for sending metrics and traces
|
||||
use_xray_generator = false # Set this to true for AWS X-ray compatible traces
|
||||
route_to_trace = [ "*/confirm" ]
|
||||
|
||||
# This section provides some secret values.
|
||||
[secrets]
|
||||
|
||||
@ -29,6 +29,7 @@ impl Default for super::settings::Database {
|
||||
dbname: String::new(),
|
||||
pool_size: 5,
|
||||
connection_timeout: 10,
|
||||
queue_strategy: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -61,6 +61,7 @@ impl KmsDecrypt for settings::Database {
|
||||
password: self.password.decrypt_inner(kms_client).await?.into(),
|
||||
pool_size: self.pool_size,
|
||||
connection_timeout: self.connection_timeout,
|
||||
queue_strategy: self.queue_strategy.into(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -464,6 +464,24 @@ pub struct Database {
|
||||
pub dbname: String,
|
||||
pub pool_size: u32,
|
||||
pub connection_timeout: u64,
|
||||
pub queue_strategy: QueueStrategy,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Clone, Default)]
|
||||
#[serde(rename_all = "PascalCase")]
|
||||
pub enum QueueStrategy {
|
||||
#[default]
|
||||
Fifo,
|
||||
Lifo,
|
||||
}
|
||||
|
||||
impl From<QueueStrategy> for bb8::QueueStrategy {
|
||||
fn from(value: QueueStrategy) -> Self {
|
||||
match value {
|
||||
QueueStrategy::Fifo => Self::Fifo,
|
||||
QueueStrategy::Lifo => Self::Lifo,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "kms"))]
|
||||
@ -477,6 +495,7 @@ impl Into<storage_impl::config::Database> for Database {
|
||||
dbname: self.dbname,
|
||||
pool_size: self.pool_size,
|
||||
connection_timeout: self.connection_timeout,
|
||||
queue_strategy: self.queue_strategy.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -704,9 +723,11 @@ impl Settings {
|
||||
.try_parsing(true)
|
||||
.separator("__")
|
||||
.list_separator(",")
|
||||
.with_list_parse_key("log.telemetry.route_to_trace")
|
||||
.with_list_parse_key("redis.cluster_urls")
|
||||
.with_list_parse_key("connectors.supported.wallets")
|
||||
.with_list_parse_key("connector_request_reference_id_config.merchant_ids_send_payment_id_as_connector_request_id"),
|
||||
|
||||
)
|
||||
.build()?;
|
||||
|
||||
|
||||
@ -101,6 +101,8 @@ pub struct LogTelemetry {
|
||||
pub otel_exporter_otlp_timeout: Option<u64>,
|
||||
/// Whether to use xray ID generator, (enable this if you plan to use AWS-XRAY)
|
||||
pub use_xray_generator: bool,
|
||||
/// Route Based Tracing
|
||||
pub route_to_trace: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
/// Telemetry / tracing.
|
||||
|
||||
@ -12,6 +12,7 @@ use opentelemetry::{
|
||||
trace::BatchConfig,
|
||||
Resource,
|
||||
},
|
||||
trace::{TraceContextExt, TraceState},
|
||||
KeyValue,
|
||||
};
|
||||
use opentelemetry_otlp::{TonicExporterBuilder, WithExportConfig};
|
||||
@ -132,6 +133,92 @@ fn get_opentelemetry_exporter(config: &config::LogTelemetry) -> TonicExporterBui
|
||||
exporter_builder
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum TraceUrlAssert {
|
||||
Match(String),
|
||||
EndsWith(String),
|
||||
}
|
||||
|
||||
impl TraceUrlAssert {
|
||||
fn compare_url(&self, url: &str) -> bool {
|
||||
match self {
|
||||
Self::Match(value) => url == value,
|
||||
Self::EndsWith(end) => url.ends_with(end),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<String> for TraceUrlAssert {
|
||||
fn from(value: String) -> Self {
|
||||
match value {
|
||||
url if url.starts_with('*') => Self::EndsWith(url.trim_start_matches('*').to_string()),
|
||||
url => Self::Match(url),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct TraceAssertion {
|
||||
clauses: Option<Vec<TraceUrlAssert>>,
|
||||
/// default behaviour for tracing if no condition is provided
|
||||
default: bool,
|
||||
}
|
||||
|
||||
impl TraceAssertion {
|
||||
///
|
||||
/// Should the provided url be traced
|
||||
///
|
||||
fn should_trace_url(&self, url: &str) -> bool {
|
||||
match &self.clauses {
|
||||
Some(clauses) => clauses.iter().all(|cur| cur.compare_url(url)),
|
||||
None => self.default,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Conditional Sampler for providing control on url based tracing
|
||||
///
|
||||
#[derive(Clone, Debug)]
|
||||
struct ConditionalSampler<T: trace::ShouldSample + Clone + 'static>(TraceAssertion, T);
|
||||
|
||||
impl<T: trace::ShouldSample + Clone + 'static> trace::ShouldSample for ConditionalSampler<T> {
|
||||
fn should_sample(
|
||||
&self,
|
||||
parent_context: Option<&opentelemetry::Context>,
|
||||
trace_id: opentelemetry::trace::TraceId,
|
||||
name: &str,
|
||||
span_kind: &opentelemetry::trace::SpanKind,
|
||||
attributes: &opentelemetry::trace::OrderMap<opentelemetry::Key, opentelemetry::Value>,
|
||||
links: &[opentelemetry::trace::Link],
|
||||
instrumentation_library: &opentelemetry::InstrumentationLibrary,
|
||||
) -> opentelemetry::trace::SamplingResult {
|
||||
match attributes
|
||||
.get(&opentelemetry::Key::new("http.route"))
|
||||
.map_or(self.0.default, |inner| {
|
||||
self.0.should_trace_url(&inner.as_str())
|
||||
}) {
|
||||
true => self.1.should_sample(
|
||||
parent_context,
|
||||
trace_id,
|
||||
name,
|
||||
span_kind,
|
||||
attributes,
|
||||
links,
|
||||
instrumentation_library,
|
||||
),
|
||||
false => opentelemetry::trace::SamplingResult {
|
||||
decision: opentelemetry::trace::SamplingDecision::Drop,
|
||||
attributes: Vec::new(),
|
||||
trace_state: match parent_context {
|
||||
Some(ctx) => ctx.span().span_context().trace_state().clone(),
|
||||
None => TraceState::default(),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn setup_tracing_pipeline(
|
||||
config: &config::LogTelemetry,
|
||||
service_name: &str,
|
||||
@ -140,9 +227,16 @@ fn setup_tracing_pipeline(
|
||||
global::set_text_map_propagator(TraceContextPropagator::new());
|
||||
|
||||
let mut trace_config = trace::config()
|
||||
.with_sampler(trace::Sampler::TraceIdRatioBased(
|
||||
config.sampling_rate.unwrap_or(1.0),
|
||||
))
|
||||
.with_sampler(trace::Sampler::ParentBased(Box::new(ConditionalSampler(
|
||||
TraceAssertion {
|
||||
clauses: config
|
||||
.route_to_trace
|
||||
.clone()
|
||||
.map(|inner| inner.into_iter().map(Into::into).collect()),
|
||||
default: false,
|
||||
},
|
||||
trace::Sampler::TraceIdRatioBased(config.sampling_rate.unwrap_or(1.0)),
|
||||
))))
|
||||
.with_resource(Resource::new(vec![KeyValue::new(
|
||||
"service.name",
|
||||
service_name.to_owned(),
|
||||
|
||||
@ -9,4 +9,5 @@ pub struct Database {
|
||||
pub dbname: String,
|
||||
pub pool_size: u32,
|
||||
pub connection_timeout: u64,
|
||||
pub queue_strategy: bb8::QueueStrategy,
|
||||
}
|
||||
|
||||
@ -88,6 +88,7 @@ pub async fn diesel_make_pg_pool(
|
||||
let manager = async_bb8_diesel::ConnectionManager::<PgConnection>::new(database_url);
|
||||
let mut pool = bb8::Pool::builder()
|
||||
.max_size(database.pool_size)
|
||||
.queue_strategy(database.queue_strategy)
|
||||
.connection_timeout(std::time::Duration::from_secs(database.connection_timeout));
|
||||
|
||||
if test_transaction {
|
||||
|
||||
Reference in New Issue
Block a user