feat(metrics): add histogram and update opentelemetry dependencies (#32)

This commit is contained in:
Nishant Joshi
2022-12-01 12:51:31 +05:30
committed by GitHub
parent 863e53c0d1
commit e65ba2a91a
12 changed files with 293 additions and 177 deletions

View File

@ -75,7 +75,7 @@ impl Feature<api::Authorize, types::PaymentsRequestData>
)
.await;
metrics::PAYMENT_COUNT.add(1, &[]); // Metrics
metrics::PAYMENT_COUNT.add(&metrics::CONTEXT, 1, &[]); // Metrics
(resp, payment_data)
}

View File

@ -212,7 +212,7 @@ where
payment_attempt: &storage::PaymentAttempt,
) -> CustomResult<(), errors::ApiErrorResponse> {
if helpers::check_if_operation_confirm(self) {
metrics::TASKS_ADDED_COUNT.add(1, &[]); // Metrics
metrics::TASKS_ADDED_COUNT.add(&metrics::CONTEXT, 1, &[]); // Metrics
let schedule_time = payment_sync::get_sync_process_schedule_time(
&payment_attempt.connector,

View File

@ -3,14 +3,14 @@ use router_env::{
tracing::{self, instrument},
};
use crate::routes::metrics::HEALTH_METRIC;
use crate::routes::metrics;
/// .
// #[logger::instrument(skip_all, name = "name1", level = "warn", fields( key1 = "val1" ))]
#[instrument(skip_all)]
// #[actix_web::get("/health")]
pub async fn health() -> impl actix_web::Responder {
HEALTH_METRIC.add(1, &[]);
metrics::HEALTH_METRIC.add(&metrics::CONTEXT, 1, &[]);
logger::info!("Health was called");
actix_web::HttpResponse::Ok().body("health is good")
}

View File

@ -4,8 +4,10 @@ use once_cell::sync::Lazy;
use router_env::opentelemetry::{
global,
metrics::{Counter, Meter},
Context,
};
pub static CONTEXT: Lazy<Context> = Lazy::new(Context::current);
static GLOBAL_METER: Lazy<Meter> = Lazy::new(|| global::meter("ROUTER_API"));
pub(crate) static HEALTH_METRIC: Lazy<Counter<u64>> =

View File

@ -117,7 +117,7 @@ pub async fn consumer_operations(
pt_utils::add_histogram_metrics(&pickup_time, task, &stream_name);
metrics::TASK_CONSUMED.add(1, &[]);
metrics::TASK_CONSUMED.add(&metrics::CONTEXT, 1, &[]);
let runner = pt_utils::runner_from_task(task)?;
handler.push(tokio::task::spawn(start_workflow(
state.clone(),
@ -205,7 +205,7 @@ pub async fn run_executor<'a>(
}
},
};
metrics::TASK_PROCESSED.add(1, &[]);
metrics::TASK_PROCESSED.add(&metrics::CONTEXT, 1, &[]);
}
#[instrument(skip_all)]

View File

@ -1,15 +1,15 @@
use once_cell::sync::Lazy;
use router_env::opentelemetry::{
global,
metrics::{Counter, Meter, ValueRecorder},
metrics::{Counter, Histogram, Meter},
Context,
};
pub static CONTEXT: Lazy<Context> = Lazy::new(Context::current);
static PT_METER: Lazy<Meter> = Lazy::new(|| global::meter("PROCESS_TRACKER"));
// Using ValueRecorder till https://bitbucket.org/juspay/orca/pull-requests/319
// Histogram available in opentelemetry:0.18
pub(crate) static CONSUMER_STATS: Lazy<ValueRecorder<f64>> =
Lazy::new(|| PT_METER.f64_value_recorder("CONSUMER_OPS").init());
pub(crate) static CONSUMER_STATS: Lazy<Histogram<f64>> =
Lazy::new(|| PT_METER.f64_histogram("CONSUMER_OPS").init());
macro_rules! create_counter {
($name:ident, $meter:ident) => {

View File

@ -124,6 +124,6 @@ pub async fn fetch_producer_tasks(
}
new_tasks.append(&mut pending_tasks);
metrics::TASKS_PICKED_COUNT.add(new_tasks.len() as u64, &[]);
metrics::TASKS_PICKED_COUNT.add(&metrics::CONTEXT, new_tasks.len() as u64, &[]);
Ok(new_tasks)
}

View File

@ -72,7 +72,7 @@ pub async fn divide_and_append_tasks(
settings: &SchedulerSettings,
) -> CustomResult<(), errors::ProcessTrackerError> {
let batches = divide(tasks, settings);
metrics::BATCHES_CREATED.add(batches.len() as u64, &[]); // Metrics
metrics::BATCHES_CREATED.add(&metrics::CONTEXT, batches.len() as u64, &[]); // Metrics
for batch in batches {
let result = update_status_and_append(state, flow, batch).await;
match result {
@ -209,7 +209,7 @@ pub async fn get_batches(
logger::error!(%error, "Error finding batch in stream");
error.change_context(errors::ProcessTrackerError::BatchNotFound)
})?;
metrics::BATCHES_CONSUMED.add(1, &[]);
metrics::BATCHES_CONSUMED.add(&metrics::CONTEXT, 1, &[]);
let (batches, entry_ids): (Vec<Vec<ProcessTrackerBatch>>, Vec<Vec<String>>) = response.into_iter().map(|(_key, entries)| {
entries.into_iter().try_fold(
@ -303,6 +303,7 @@ pub fn add_histogram_metrics(
logger::error!(%pickup_schedule_delta, "<- Time delta for scheduled tasks");
let runner_name = runner.clone();
metrics::CONSUMER_STATS.record(
&metrics::CONTEXT,
pickup_schedule_delta,
&[opentelemetry::KeyValue::new(
stream_name.to_owned(),

View File

@ -75,7 +75,7 @@ impl ProcessTracker {
db: &dyn db::Db,
schedule_time: PrimitiveDateTime,
) -> Result<(), errors::ProcessTrackerError> {
metrics::TASK_RETRIED.add(1, &[]);
metrics::TASK_RETRIED.add(&metrics::CONTEXT, 1, &[]);
db.update_process_tracker(
self.clone(),
ProcessTrackerUpdate::StatusRetryUpdate {
@ -102,7 +102,7 @@ impl ProcessTracker {
)
.await
.attach_printable("Failed while updating status of the process")?;
metrics::TASK_FINISHED.add(1, &[]);
metrics::TASK_FINISHED.add(&metrics::CONTEXT, 1, &[]);
Ok(())
}
}

View File

@ -12,8 +12,9 @@ build = "src/build.rs"
config = { version = "0.13.2", features = ["toml"] }
gethostname = "0.2.3"
once_cell = "1.15.0"
opentelemetry = { version = "0.17", features = ["rt-tokio-current-thread", "metrics"] }
opentelemetry-otlp = { version = "0.10", features = ["metrics"] }
opentelemetry = { git = "https://github.com/jarnura/opentelemetry-rust", rev = "a82056696ca3d26960458269a894e5cf15056ad8", features = ["rt-tokio-current-thread", "metrics"] }
opentelemetry-otlp = { git = "https://github.com/jarnura/opentelemetry-rust", rev = "a82056696ca3d26960458269a894e5cf15056ad8", features = ["metrics"] }
rustc-hash = "1.1"
serde = { version = "1.0.145", features = ["derive"] }
serde_json = "1.0.85"
@ -25,7 +26,7 @@ tracing = "0.1.36"
tracing-actix-web = { version = "0.6.1", features = ["opentelemetry_0_17"], optional = true }
tracing-appender = "0.2.2"
tracing-core = "0.1.29"
tracing-opentelemetry = { version = "0.17" }
tracing-opentelemetry = { git = "https://github.com/jarnura/tracing", rev = "16d277227f60788750528e4f4cc1db4f36b0869f" }
tracing-subscriber = { version = "0.3.15", default-features = true, features = ["json", "env-filter", "registry"] }
vergen = { version = "7.4.2", optional = true }

View File

@ -3,20 +3,19 @@
//!
use std::{path::PathBuf, time::Duration};
use once_cell::sync::Lazy;
use opentelemetry::{
global,
global, runtime,
sdk::{
metrics::{selectors, PushController},
export::metrics::aggregation::cumulative_temporality_selector,
metrics::{controllers::BasicController, selectors::simple},
propagation::TraceContextPropagator,
trace, Resource,
},
util::tokio_interval_stream,
KeyValue,
};
use opentelemetry_otlp::WithExportConfig;
use tracing_appender::non_blocking::WorkerGuard;
// use tracing_subscriber::fmt::format::FmtSpan;
// use tracing_bunyan_formatter::JsonStorageLayer;
use tracing_subscriber::{
filter, fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer,
};
@ -51,7 +50,7 @@ where
#[derive(Debug)]
pub struct TelemetryGuard {
_log_guards: Vec<WorkerGuard>,
_metric_controller: Option<PushController>,
_metric_controller: Option<BasicController>,
}
///
@ -92,29 +91,13 @@ pub fn setup<Str: AsRef<str>>(
let file_filter = filter::Targets::new().with_default(conf.file.level.into_level());
let file_layer = FormattingLayer::new(service_name, file_writer).with_filter(file_filter);
// let fmt_layer = fmt::layer()
// .with_writer(file_writer)
// .with_target(true)
// .with_level(true)
// .with_span_events(FmtSpan::ACTIVE)
// .json();
// Some(fmt_layer)
//Some(FormattingLayer::new(service_name, file_writer))
Some(file_layer)
// Some(BunyanFormattingLayer::new("router".into(), file_writer))
} else {
None
};
let telemetry_layer = match telemetry {
Some(Ok(ref tracer)) => Some(tracing_opentelemetry::layer().with_tracer(tracer.clone())),
_ => None,
};
// Use 'RUST_LOG' environment variable will override the config settings
let subscriber = tracing_subscriber::registry()
.with(telemetry_layer)
.with(StorageSubscription)
.with(file_writer)
.with(
@ -166,15 +149,29 @@ pub fn setup<Str: AsRef<str>>(
})
}
fn setup_metrics() -> Option<PushController> {
static HISTOGRAM_BUCKETS: Lazy<[f64; 15]> = Lazy::new(|| {
let mut init = 0.01;
let mut buckets: [f64; 15] = [0.0; 15];
for bucket in &mut buckets {
init *= 2.0;
*bucket = init;
}
buckets
});
fn setup_metrics() -> Option<BasicController> {
opentelemetry_otlp::new_pipeline()
.metrics(tokio::spawn, tokio_interval_stream)
.metrics(
simple::histogram(*HISTOGRAM_BUCKETS),
cumulative_temporality_selector(),
runtime::TokioCurrentThread,
)
.with_exporter(
opentelemetry_otlp::new_exporter().tonic().with_env(), // can also config it using with_* functions like the tracing part above.
)
.with_period(Duration::from_secs(3))
.with_timeout(Duration::from_secs(10))
.with_aggregator_selector(selectors::simple::Selector::Exact)
.build()
.map_err(|err| eprintln!("Failed to Setup Metrics with {:?}", err))
.ok()