feat(metrics): add drainer delay metric (#3034)

This commit is contained in:
Kartikeya Hegde
2023-12-06 11:25:00 +05:30
committed by GitHub
parent cfafd5cd29
commit c6e2ee29d9
5 changed files with 48 additions and 1 deletions

View File

@ -199,6 +199,7 @@ async fn drainer(
.get("request_id")
.map_or(String::new(), Clone::clone);
let global_id = entry.1.get("global_id").map_or(String::new(), Clone::clone);
let pushed_at = entry.1.get("pushed_at");
tracing::Span::current().record("request_id", request_id);
tracing::Span::current().record("global_id", global_id);
@ -261,6 +262,7 @@ async fn drainer(
value: insert_op.into(),
}],
);
utils::push_drainer_delay(pushed_at, insert_op.to_string());
}
kv::DBOperation::Update { updatable } => {
let (_, execution_time) = common_utils::date_time::time_it(|| async {
@ -302,6 +304,7 @@ async fn drainer(
value: update_op.into(),
}],
);
utils::push_drainer_delay(pushed_at, update_op.to_string());
}
kv::DBOperation::Delete => {
// [#224]: Implement this

View File

@ -1,5 +1,7 @@
pub use router_env::opentelemetry::KeyValue;
use router_env::{counter_metric, global_meter, histogram_metric, metrics_context};
use router_env::{
counter_metric, global_meter, histogram_metric, histogram_metric_i64, metrics_context,
};
metrics_context!(CONTEXT);
global_meter!(DRAINER_METER, "DRAINER");
@ -18,3 +20,4 @@ histogram_metric!(QUERY_EXECUTION_TIME, DRAINER_METER); // Time in (ms) millisec
histogram_metric!(REDIS_STREAM_READ_TIME, DRAINER_METER); // Time in (ms) milliseconds
histogram_metric!(REDIS_STREAM_TRIM_TIME, DRAINER_METER); // Time in (ms) milliseconds
histogram_metric!(CLEANUP_TIME, DRAINER_METER); // Time in (ms) milliseconds
histogram_metric_i64!(DRAINER_DELAY_SECONDS, DRAINER_METER); // Time in (s) seconds

View File

@ -128,6 +128,25 @@ pub fn parse_stream_entries<'a>(
.into_report()
}
pub fn push_drainer_delay(pushed_at: Option<&String>, operation: String) {
if let Some(pushed_at) = pushed_at {
if let Ok(time) = pushed_at.parse::<i64>() {
let drained_at = common_utils::date_time::now_unix_timestamp();
let delay = drained_at - time;
logger::debug!(operation = operation, delay = delay);
metrics::DRAINER_DELAY_SECONDS.record(
&metrics::CONTEXT,
delay,
&[metrics::KeyValue {
key: "operation".into(),
value: operation.into(),
}],
);
}
}
}
// Here the output is in the format (stream_index, jobs_picked),
// similar to the first argument of the function
pub async fn increment_stream_index(