From 3292960996834f93095f96f05b18ce28adceb374 Mon Sep 17 00:00:00 2001 From: Nishant Joshi Date: Tue, 7 Feb 2023 20:20:51 +0530 Subject: [PATCH] feat: add tick interval to drainer (#517) --- config/config.example.toml | 3 ++- crates/drainer/src/lib.rs | 11 +++++++++-- crates/drainer/src/main.rs | 2 ++ crates/drainer/src/settings.rs | 2 ++ crates/drainer/src/utils.rs | 7 ++++++- crates/router/src/configs/defaults.rs | 2 ++ crates/router/src/configs/settings.rs | 2 ++ 7 files changed, 25 insertions(+), 4 deletions(-) diff --git a/config/config.example.toml b/config/config.example.toml index 121d07617e..3ddbab9876 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -188,4 +188,5 @@ batch_size = 200 # Specifies the batch size the producer will push under a singl stream_name = "DRAINER_STREAM" # Specifies the stream name to be used by the drainer num_partitions = 64 # Specifies the number of partitions the stream will be divided into max_read_count = 100 # Specifies the maximum number of entries that would be read from redis stream in one call -shutdown_interval = 1000 # Specifies how much time to wait, while waiting for threads to complete execution +shutdown_interval = 1000 # Specifies how much time to wait, while waiting for threads to complete execution (in milliseconds) +loop_interval = 500 # Specifies how much time to wait after checking all the possible streams in completed (in milliseconds) diff --git a/crates/drainer/src/lib.rs b/crates/drainer/src/lib.rs index bb7e5d1e2b..6ab1287495 100644 --- a/crates/drainer/src/lib.rs +++ b/crates/drainer/src/lib.rs @@ -19,12 +19,15 @@ pub async fn start_drainer( number_of_streams: u8, max_read_count: u64, shutdown_interval: u32, + loop_interval: u32, ) -> errors::DrainerResult<()> { let mut stream_index: u8 = 0; let mut jobs_picked: u8 = 0; let mut shutdown_interval = tokio::time::interval(std::time::Duration::from_millis(shutdown_interval.into())); + let mut loop_interval = + tokio::time::interval(std::time::Duration::from_millis(loop_interval.into())); let signal = get_allowed_signals() @@ -49,8 +52,12 @@ pub async fn start_drainer( )); jobs_picked += 1; } - (stream_index, jobs_picked) = - utils::increment_stream_index((stream_index, jobs_picked), number_of_streams); + (stream_index, jobs_picked) = utils::increment_stream_index( + (stream_index, jobs_picked), + number_of_streams, + &mut loop_interval, + ) + .await; } Ok(()) | Err(oneshot::error::TryRecvError::Closed) => { logger::info!("Awaiting shutdown!"); diff --git a/crates/drainer/src/main.rs b/crates/drainer/src/main.rs index 18238c7684..59fa5fb375 100644 --- a/crates/drainer/src/main.rs +++ b/crates/drainer/src/main.rs @@ -19,6 +19,7 @@ async fn main() -> DrainerResult<()> { let number_of_streams = store.config.drainer_num_partitions; let max_read_count = conf.drainer.max_read_count; let shutdown_intervals = conf.drainer.shutdown_interval; + let loop_interval = conf.drainer.loop_interval; let _guard = logger::setup(&conf.log).change_context(errors::DrainerError::MetricsError)?; @@ -29,6 +30,7 @@ async fn main() -> DrainerResult<()> { number_of_streams, max_read_count, shutdown_intervals, + loop_interval, ) .await?; diff --git a/crates/drainer/src/settings.rs b/crates/drainer/src/settings.rs index 2f616b43d0..f77853f1b8 100644 --- a/crates/drainer/src/settings.rs +++ b/crates/drainer/src/settings.rs @@ -45,6 +45,7 @@ pub struct DrainerSettings { pub num_partitions: u8, pub max_read_count: u64, pub shutdown_interval: u32, // in milliseconds + pub loop_interval: u32, // in milliseconds } impl Default for Database { @@ -67,6 +68,7 @@ impl Default for DrainerSettings { num_partitions: 64, max_read_count: 100, shutdown_interval: 1000, // in milliseconds + loop_interval: 500, // in milliseconds } } } diff --git a/crates/drainer/src/utils.rs b/crates/drainer/src/utils.rs index 1965d26068..c06615759f 100644 --- a/crates/drainer/src/utils.rs +++ b/crates/drainer/src/utils.rs @@ -124,8 +124,13 @@ pub fn parse_stream_entries<'a>( // Here the output is in the format (stream_index, jobs_picked), // similar to the first argument of the function -pub fn increment_stream_index((index, jobs_picked): (u8, u8), total_streams: u8) -> (u8, u8) { +pub async fn increment_stream_index( + (index, jobs_picked): (u8, u8), + total_streams: u8, + interval: &mut tokio::time::Interval, +) -> (u8, u8) { if index == total_streams - 1 { + interval.tick().await; match jobs_picked { 0 => metrics::CYCLES_COMPLETED_UNSUCCESSFULLY.add(&metrics::CONTEXT, 1, &[]), _ => metrics::CYCLES_COMPLETED_SUCCESSFULLY.add(&metrics::CONTEXT, 1, &[]), diff --git a/crates/router/src/configs/defaults.rs b/crates/router/src/configs/defaults.rs index a8410e40e7..00f2e66081 100644 --- a/crates/router/src/configs/defaults.rs +++ b/crates/router/src/configs/defaults.rs @@ -116,6 +116,8 @@ impl Default for super::settings::DrainerSettings { stream_name: "DRAINER_STREAM".into(), num_partitions: 64, max_read_count: 100, + shutdown_interval: 1000, + loop_interval: 500, } } } diff --git a/crates/router/src/configs/settings.rs b/crates/router/src/configs/settings.rs index b4c3bcc6ed..71453cabaa 100644 --- a/crates/router/src/configs/settings.rs +++ b/crates/router/src/configs/settings.rs @@ -194,6 +194,8 @@ pub struct DrainerSettings { pub stream_name: String, pub num_partitions: u8, pub max_read_count: u64, + pub shutdown_interval: u32, // in milliseconds + pub loop_interval: u32, // in milliseconds } #[derive(Debug, Clone, Default, Deserialize)]