fix: make drainer sleep on every loop interval instead of cycle end (#2951)

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: preetamrevankar <132073736+preetamrevankar@users.noreply.github.com>
This commit is contained in:
Kartikeya Hegde
2023-11-23 18:29:14 +05:30
committed by GitHub
parent dd3e22a938
commit e8df69092f
4 changed files with 6 additions and 6 deletions

View File

@ -67,9 +67,9 @@ pub async fn start_drainer(
stream_index = utils::increment_stream_index( stream_index = utils::increment_stream_index(
(stream_index, jobs_picked.clone()), (stream_index, jobs_picked.clone()),
number_of_streams, number_of_streams,
&mut loop_interval,
) )
.await; .await;
loop_interval.tick().await;
} }
Ok(()) | Err(mpsc::error::TryRecvError::Disconnected) => { Ok(()) | Err(mpsc::error::TryRecvError::Disconnected) => {
logger::info!("Awaiting shutdown!"); logger::info!("Awaiting shutdown!");
@ -114,6 +114,7 @@ pub async fn redis_error_receiver(rx: oneshot::Receiver<()>, shutdown_channel: m
} }
} }
#[router_env::instrument(skip_all)]
async fn drainer_handler( async fn drainer_handler(
store: Arc<Store>, store: Arc<Store>,
stream_index: u8, stream_index: u8,

View File

@ -79,7 +79,7 @@ impl Default for DrainerSettings {
num_partitions: 64, num_partitions: 64,
max_read_count: 100, max_read_count: 100,
shutdown_interval: 1000, // in milliseconds shutdown_interval: 1000, // in milliseconds
loop_interval: 500, // in milliseconds loop_interval: 100, // in milliseconds
} }
} }
} }

View File

@ -8,12 +8,13 @@ use redis_interface as redis;
use crate::{ use crate::{
errors::{self, DrainerError}, errors::{self, DrainerError},
logger, metrics, services, logger, metrics, services, tracing,
}; };
pub type StreamEntries = Vec<(String, HashMap<String, String>)>; pub type StreamEntries = Vec<(String, HashMap<String, String>)>;
pub type StreamReadResult = HashMap<String, StreamEntries>; pub type StreamReadResult = HashMap<String, StreamEntries>;
#[router_env::instrument(skip_all)]
pub async fn is_stream_available(stream_index: u8, store: Arc<services::Store>) -> bool { pub async fn is_stream_available(stream_index: u8, store: Arc<services::Store>) -> bool {
let stream_key_flag = get_stream_key_flag(store.clone(), stream_index); let stream_key_flag = get_stream_key_flag(store.clone(), stream_index);
@ -132,10 +133,8 @@ pub fn parse_stream_entries<'a>(
pub async fn increment_stream_index( pub async fn increment_stream_index(
(index, jobs_picked): (u8, Arc<atomic::AtomicU8>), (index, jobs_picked): (u8, Arc<atomic::AtomicU8>),
total_streams: u8, total_streams: u8,
interval: &mut tokio::time::Interval,
) -> u8 { ) -> u8 {
if index == total_streams - 1 { if index == total_streams - 1 {
interval.tick().await;
match jobs_picked.load(atomic::Ordering::SeqCst) { match jobs_picked.load(atomic::Ordering::SeqCst) {
0 => metrics::CYCLES_COMPLETED_UNSUCCESSFULLY.add(&metrics::CONTEXT, 1, &[]), 0 => metrics::CYCLES_COMPLETED_UNSUCCESSFULLY.add(&metrics::CONTEXT, 1, &[]),
_ => metrics::CYCLES_COMPLETED_SUCCESSFULLY.add(&metrics::CONTEXT, 1, &[]), _ => metrics::CYCLES_COMPLETED_SUCCESSFULLY.add(&metrics::CONTEXT, 1, &[]),

View File

@ -99,7 +99,7 @@ impl Default for super::settings::DrainerSettings {
num_partitions: 64, num_partitions: 64,
max_read_count: 100, max_read_count: 100,
shutdown_interval: 1000, shutdown_interval: 1000,
loop_interval: 500, loop_interval: 100,
} }
} }
} }