feat: Add graceful shutdown in drainer (#498)

This commit is contained in:
Nishant Joshi
2023-02-07 12:44:24 +05:30
committed by GitHub
parent dfd1e5e254
commit 0239c422b6
11 changed files with 138 additions and 26 deletions

View File

@ -11,6 +11,10 @@ pub enum DrainerError {
ConfigurationError(config::ConfigError),
#[error("Metrics initialization error")]
MetricsError,
#[error("Error while configuring signals: {0}")]
SignalError(String),
#[error("Unexpected error occurred: {0}")]
UnexpectedError(String),
}
pub type DrainerResult<T> = error_stack::Result<T, DrainerError>;

View File

@ -5,37 +5,90 @@ pub(crate) mod metrics;
pub mod services;
pub mod settings;
mod utils;
use std::sync::Arc;
use std::sync::{atomic, Arc};
use common_utils::signals::{get_allowed_signals, oneshot};
pub use env as logger;
use logger::{instrument, tracing};
use error_stack::{IntoReport, ResultExt};
use storage_models::kv;
use crate::{connection::pg_connection, services::Store};
#[instrument(skip(store))]
pub async fn start_drainer(
store: Arc<Store>,
number_of_streams: u8,
max_read_count: u64,
shutdown_interval: u32,
) -> errors::DrainerResult<()> {
let mut stream_index: u8 = 0;
let mut jobs_picked: u8 = 0;
loop {
if utils::is_stream_available(stream_index, store.clone()).await {
tokio::spawn(drainer_handler(store.clone(), stream_index, max_read_count));
jobs_picked += 1;
let mut shutdown_interval =
tokio::time::interval(std::time::Duration::from_millis(shutdown_interval.into()));
let signal =
get_allowed_signals()
.into_report()
.change_context(errors::DrainerError::SignalError(
"Failed while getting allowed signals".to_string(),
))?;
let (tx, mut rx) = oneshot::channel();
let handle = signal.handle();
let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, tx));
let active_tasks = Arc::new(atomic::AtomicU64::new(0));
'event: loop {
match rx.try_recv() {
Err(oneshot::error::TryRecvError::Empty) => {
if utils::is_stream_available(stream_index, store.clone()).await {
tokio::spawn(drainer_handler(
store.clone(),
stream_index,
max_read_count,
active_tasks.clone(),
));
jobs_picked += 1;
}
(stream_index, jobs_picked) =
utils::increment_stream_index((stream_index, jobs_picked), number_of_streams);
}
Ok(()) | Err(oneshot::error::TryRecvError::Closed) => {
logger::info!("Awaiting shutdown!");
metrics::SHUTDOWN_SIGNAL_RECEIVED.add(&metrics::CONTEXT, 1, &[]);
let shutdown_started = tokio::time::Instant::now();
loop {
if active_tasks.load(atomic::Ordering::Acquire) == 0 {
logger::info!("Terminating drainer");
metrics::SUCCESSFUL_SHUTDOWN.add(&metrics::CONTEXT, 1, &[]);
let shutdown_ended = shutdown_started.elapsed().as_secs_f64() * 1000f64;
metrics::CLEANUP_TIME.record(&metrics::CONTEXT, shutdown_ended, &[]);
break 'event;
}
shutdown_interval.tick().await;
}
}
}
(stream_index, jobs_picked) =
utils::increment_stream_index((stream_index, jobs_picked), number_of_streams);
}
handle.close();
task_handle
.await
.into_report()
.change_context(errors::DrainerError::UnexpectedError(
"Failed while joining signal handler".to_string(),
))?;
Ok(())
}
async fn drainer_handler(
store: Arc<Store>,
stream_index: u8,
max_read_count: u64,
active_tasks: Arc<atomic::AtomicU64>,
) -> errors::DrainerResult<()> {
active_tasks.fetch_add(1, atomic::Ordering::Release);
let stream_name = utils::get_drainer_stream_name(store.clone(), stream_index);
let drainer_result = drainer(store.clone(), max_read_count, stream_name.as_str()).await;
@ -45,7 +98,10 @@ async fn drainer_handler(
let flag_stream_name = utils::get_stream_key_flag(store.clone(), stream_index);
//TODO: USE THE RESULT FOR LOGGING
utils::make_stream_available(flag_stream_name.as_str(), store.redis_conn.as_ref()).await
let output =
utils::make_stream_available(flag_stream_name.as_str(), store.redis_conn.as_ref()).await;
active_tasks.fetch_sub(1, atomic::Ordering::Release);
output
}
async fn drainer(

View File

@ -18,12 +18,19 @@ async fn main() -> DrainerResult<()> {
let number_of_streams = store.config.drainer_num_partitions;
let max_read_count = conf.drainer.max_read_count;
let shutdown_intervals = conf.drainer.shutdown_interval;
let _guard = logger::setup(&conf.log).change_context(errors::DrainerError::MetricsError)?;
logger::info!("Drainer started [{:?}] [{:?}]", conf.drainer, conf.log);
start_drainer(store.clone(), number_of_streams, max_read_count).await?;
start_drainer(
store.clone(),
number_of_streams,
max_read_count,
shutdown_intervals,
)
.await?;
store.close().await;
Ok(())

View File

@ -48,12 +48,12 @@ pub(crate) static REDIS_STREAM_READ_TIME: Lazy<Histogram<f64>> =
pub(crate) static REDIS_STREAM_TRIM_TIME: Lazy<Histogram<f64>> =
Lazy::new(|| DRAINER_METER.f64_histogram("REDIS_STREAM_TRIM_TIME").init());
pub(crate) static _SHUTDOWN_SIGNAL_RECEIVED: Lazy<Counter<u64>> =
pub(crate) static SHUTDOWN_SIGNAL_RECEIVED: Lazy<Counter<u64>> =
Lazy::new(|| DRAINER_METER.u64_counter("SHUTDOWN_SIGNAL_RECEIVED").init());
pub(crate) static _SUCCESSFUL_SHUTDOWN: Lazy<Counter<u64>> =
pub(crate) static SUCCESSFUL_SHUTDOWN: Lazy<Counter<u64>> =
Lazy::new(|| DRAINER_METER.u64_counter("SUCCESSFUL_SHUTDOWN").init());
// Time in (ms) milliseconds
pub(crate) static _CLEANUP_TIME: Lazy<Histogram<f64>> =
pub(crate) static CLEANUP_TIME: Lazy<Histogram<f64>> =
Lazy::new(|| DRAINER_METER.f64_histogram("CLEANUP_TIME").init());

View File

@ -44,6 +44,7 @@ pub struct DrainerSettings {
pub stream_name: String,
pub num_partitions: u8,
pub max_read_count: u64,
pub shutdown_interval: u32, // in milliseconds
}
impl Default for Database {
@ -65,6 +66,7 @@ impl Default for DrainerSettings {
stream_name: "DRAINER_STREAM".into(),
num_partitions: 64,
max_read_count: 100,
shutdown_interval: 1000, // in milliseconds
}
}
}