diff --git a/config/docker_compose.toml b/config/docker_compose.toml index a5294546de..986240f0a3 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -15,7 +15,7 @@ level = "DEBUG" # What you see in your terminal. [log.telemetry] traces_enabled = false # Whether traces are enabled. -metrics_enabled = false # Whether metrics are enabled. +metrics_enabled = true # Whether metrics are enabled. ignore_errors = false # Whether to ignore errors during traces or metrics pipeline setup. otel_exporter_otlp_endpoint = "https://otel-collector:4317" # Endpoint to send metrics and traces to. use_xray_generator = false diff --git a/crates/drainer/src/lib.rs b/crates/drainer/src/lib.rs index 7ccfd600d6..04dff49b74 100644 --- a/crates/drainer/src/lib.rs +++ b/crates/drainer/src/lib.rs @@ -23,7 +23,7 @@ pub async fn start_drainer( loop_interval: u32, ) -> errors::DrainerResult<()> { let mut stream_index: u8 = 0; - let mut jobs_picked: u8 = 0; + let jobs_picked = Arc::new(atomic::AtomicU8::new(0)); let mut shutdown_interval = tokio::time::interval(std::time::Duration::from_millis(shutdown_interval.into())); @@ -61,11 +61,11 @@ pub async fn start_drainer( stream_index, max_read_count, active_tasks.clone(), + jobs_picked.clone(), )); - jobs_picked += 1; } - (stream_index, jobs_picked) = utils::increment_stream_index( - (stream_index, jobs_picked), + stream_index = utils::increment_stream_index( + (stream_index, jobs_picked.clone()), number_of_streams, &mut loop_interval, ) @@ -119,13 +119,19 @@ async fn drainer_handler( stream_index: u8, max_read_count: u64, active_tasks: Arc, + jobs_picked: Arc, ) -> errors::DrainerResult<()> { active_tasks.fetch_add(1, atomic::Ordering::Release); let stream_name = utils::get_drainer_stream_name(store.clone(), stream_index); - let drainer_result = - Box::pin(drainer(store.clone(), max_read_count, stream_name.as_str())).await; + let drainer_result = Box::pin(drainer( + store.clone(), + max_read_count, + stream_name.as_str(), + jobs_picked, + )) + .await; if let Err(error) = drainer_result { logger::error!(?error) @@ -145,11 +151,15 @@ async fn drainer( store: Arc, max_read_count: u64, stream_name: &str, + jobs_picked: Arc, ) -> errors::DrainerResult<()> { let stream_read = match utils::read_from_stream(stream_name, max_read_count, store.redis_conn.as_ref()).await { - Ok(result) => result, + Ok(result) => { + jobs_picked.fetch_add(1, atomic::Ordering::SeqCst); + result + } Err(error) => { if let errors::DrainerError::RedisError(redis_err) = error.current_context() { if let redis_interface::errors::RedisError::StreamEmptyOrNotAvailable = diff --git a/crates/drainer/src/utils.rs b/crates/drainer/src/utils.rs index 5a995652bb..5abc7e474c 100644 --- a/crates/drainer/src/utils.rs +++ b/crates/drainer/src/utils.rs @@ -1,4 +1,7 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::HashMap, + sync::{atomic, Arc}, +}; use error_stack::IntoReport; use redis_interface as redis; @@ -127,19 +130,20 @@ pub fn parse_stream_entries<'a>( // Here the output is in the format (stream_index, jobs_picked), // similar to the first argument of the function pub async fn increment_stream_index( - (index, jobs_picked): (u8, u8), + (index, jobs_picked): (u8, Arc), total_streams: u8, interval: &mut tokio::time::Interval, -) -> (u8, u8) { +) -> u8 { if index == total_streams - 1 { interval.tick().await; - match jobs_picked { + match jobs_picked.load(atomic::Ordering::SeqCst) { 0 => metrics::CYCLES_COMPLETED_UNSUCCESSFULLY.add(&metrics::CONTEXT, 1, &[]), _ => metrics::CYCLES_COMPLETED_SUCCESSFULLY.add(&metrics::CONTEXT, 1, &[]), } - (0, 0) + jobs_picked.store(0, atomic::Ordering::SeqCst); + 0 } else { - (index + 1, jobs_picked) + index + 1 } }