feat: add tick interval to drainer (#517)

This commit is contained in:
Nishant Joshi
2023-02-07 20:20:51 +05:30
committed by GitHub
parent ac30313ff1
commit 3292960996
7 changed files with 25 additions and 4 deletions

View File

@ -188,4 +188,5 @@ batch_size = 200 # Specifies the batch size the producer will push under a singl
stream_name = "DRAINER_STREAM" # Specifies the stream name to be used by the drainer stream_name = "DRAINER_STREAM" # Specifies the stream name to be used by the drainer
num_partitions = 64 # Specifies the number of partitions the stream will be divided into num_partitions = 64 # Specifies the number of partitions the stream will be divided into
max_read_count = 100 # Specifies the maximum number of entries that would be read from redis stream in one call max_read_count = 100 # Specifies the maximum number of entries that would be read from redis stream in one call
shutdown_interval = 1000 # Specifies how much time to wait, while waiting for threads to complete execution shutdown_interval = 1000 # Specifies how much time to wait, while waiting for threads to complete execution (in milliseconds)
loop_interval = 500 # Specifies how much time to wait after checking all the possible streams in completed (in milliseconds)

View File

@ -19,12 +19,15 @@ pub async fn start_drainer(
number_of_streams: u8, number_of_streams: u8,
max_read_count: u64, max_read_count: u64,
shutdown_interval: u32, shutdown_interval: u32,
loop_interval: u32,
) -> errors::DrainerResult<()> { ) -> errors::DrainerResult<()> {
let mut stream_index: u8 = 0; let mut stream_index: u8 = 0;
let mut jobs_picked: u8 = 0; let mut jobs_picked: u8 = 0;
let mut shutdown_interval = let mut shutdown_interval =
tokio::time::interval(std::time::Duration::from_millis(shutdown_interval.into())); tokio::time::interval(std::time::Duration::from_millis(shutdown_interval.into()));
let mut loop_interval =
tokio::time::interval(std::time::Duration::from_millis(loop_interval.into()));
let signal = let signal =
get_allowed_signals() get_allowed_signals()
@ -49,8 +52,12 @@ pub async fn start_drainer(
)); ));
jobs_picked += 1; jobs_picked += 1;
} }
(stream_index, jobs_picked) = (stream_index, jobs_picked) = utils::increment_stream_index(
utils::increment_stream_index((stream_index, jobs_picked), number_of_streams); (stream_index, jobs_picked),
number_of_streams,
&mut loop_interval,
)
.await;
} }
Ok(()) | Err(oneshot::error::TryRecvError::Closed) => { Ok(()) | Err(oneshot::error::TryRecvError::Closed) => {
logger::info!("Awaiting shutdown!"); logger::info!("Awaiting shutdown!");

View File

@ -19,6 +19,7 @@ async fn main() -> DrainerResult<()> {
let number_of_streams = store.config.drainer_num_partitions; let number_of_streams = store.config.drainer_num_partitions;
let max_read_count = conf.drainer.max_read_count; let max_read_count = conf.drainer.max_read_count;
let shutdown_intervals = conf.drainer.shutdown_interval; let shutdown_intervals = conf.drainer.shutdown_interval;
let loop_interval = conf.drainer.loop_interval;
let _guard = logger::setup(&conf.log).change_context(errors::DrainerError::MetricsError)?; let _guard = logger::setup(&conf.log).change_context(errors::DrainerError::MetricsError)?;
@ -29,6 +30,7 @@ async fn main() -> DrainerResult<()> {
number_of_streams, number_of_streams,
max_read_count, max_read_count,
shutdown_intervals, shutdown_intervals,
loop_interval,
) )
.await?; .await?;

View File

@ -45,6 +45,7 @@ pub struct DrainerSettings {
pub num_partitions: u8, pub num_partitions: u8,
pub max_read_count: u64, pub max_read_count: u64,
pub shutdown_interval: u32, // in milliseconds pub shutdown_interval: u32, // in milliseconds
pub loop_interval: u32, // in milliseconds
} }
impl Default for Database { impl Default for Database {
@ -67,6 +68,7 @@ impl Default for DrainerSettings {
num_partitions: 64, num_partitions: 64,
max_read_count: 100, max_read_count: 100,
shutdown_interval: 1000, // in milliseconds shutdown_interval: 1000, // in milliseconds
loop_interval: 500, // in milliseconds
} }
} }
} }

View File

@ -124,8 +124,13 @@ pub fn parse_stream_entries<'a>(
// Here the output is in the format (stream_index, jobs_picked), // Here the output is in the format (stream_index, jobs_picked),
// similar to the first argument of the function // similar to the first argument of the function
pub fn increment_stream_index((index, jobs_picked): (u8, u8), total_streams: u8) -> (u8, u8) { pub async fn increment_stream_index(
(index, jobs_picked): (u8, u8),
total_streams: u8,
interval: &mut tokio::time::Interval,
) -> (u8, u8) {
if index == total_streams - 1 { if index == total_streams - 1 {
interval.tick().await;
match jobs_picked { match jobs_picked {
0 => metrics::CYCLES_COMPLETED_UNSUCCESSFULLY.add(&metrics::CONTEXT, 1, &[]), 0 => metrics::CYCLES_COMPLETED_UNSUCCESSFULLY.add(&metrics::CONTEXT, 1, &[]),
_ => metrics::CYCLES_COMPLETED_SUCCESSFULLY.add(&metrics::CONTEXT, 1, &[]), _ => metrics::CYCLES_COMPLETED_SUCCESSFULLY.add(&metrics::CONTEXT, 1, &[]),

View File

@ -116,6 +116,8 @@ impl Default for super::settings::DrainerSettings {
stream_name: "DRAINER_STREAM".into(), stream_name: "DRAINER_STREAM".into(),
num_partitions: 64, num_partitions: 64,
max_read_count: 100, max_read_count: 100,
shutdown_interval: 1000,
loop_interval: 500,
} }
} }
} }

View File

@ -194,6 +194,8 @@ pub struct DrainerSettings {
pub stream_name: String, pub stream_name: String,
pub num_partitions: u8, pub num_partitions: u8,
pub max_read_count: u64, pub max_read_count: u64,
pub shutdown_interval: u32, // in milliseconds
pub loop_interval: u32, // in milliseconds
} }
#[derive(Debug, Clone, Default, Deserialize)] #[derive(Debug, Clone, Default, Deserialize)]