fix: add disable env flag for consumer (#475)

This commit is contained in:
Nishant Joshi
2023-01-30 17:48:58 +05:30
committed by GitHub
parent ec2f4ba257
commit 39adbbc183
7 changed files with 33 additions and 5 deletions

View File

@ -82,8 +82,8 @@ impl Default for super::settings::SchedulerSettings {
fn default() -> Self {
Self {
stream: "SCHEDULER_STREAM".into(),
consumer_group: "SCHEDULER_GROUP".into(),
producer: super::settings::ProducerSettings::default(),
consumer: super::settings::ConsumerSettings::default(),
}
}
}
@ -100,6 +100,15 @@ impl Default for super::settings::ProducerSettings {
}
}
impl Default for super::settings::ConsumerSettings {
fn default() -> Self {
Self {
disabled: false,
consumer_group: "SCHEDULER_GROUP".into(),
}
}
}
#[cfg(feature = "kv_store")]
impl Default for super::settings::DrainerSettings {
fn default() -> Self {

View File

@ -164,8 +164,8 @@ pub struct ConnectorParams {
#[serde(default)]
pub struct SchedulerSettings {
pub stream: String,
pub consumer_group: String,
pub producer: ProducerSettings,
pub consumer: ConsumerSettings,
}
#[derive(Debug, Clone, Deserialize)]
@ -179,6 +179,13 @@ pub struct ProducerSettings {
pub batch_size: usize,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(default)]
pub struct ConsumerSettings {
pub disabled: bool,
pub consumer_group: String,
}
#[cfg(feature = "kv_store")]
#[derive(Debug, Clone, Deserialize)]
#[serde(default)]

View File

@ -152,7 +152,7 @@ impl super::settings::SchedulerSettings {
))
})?;
when(self.consumer_group.is_default_or_empty(), || {
when(self.consumer.consumer_group.is_default_or_empty(), || {
Err(ApplicationError::InvalidConfigurationValueError(
"scheduler consumer group must not be empty".into(),
))

View File

@ -71,6 +71,12 @@ pub async fn start_consumer(
match rx.try_recv() {
Err(oneshot::error::TryRecvError::Empty) => {
interval.tick().await;
// A guard from env to disable the consumer
if settings.consumer.disabled {
continue;
}
tokio::task::spawn(pt_utils::consumer_operation_handler(
state.clone(),
options.clone(),
@ -112,7 +118,7 @@ pub async fn consumer_operations(
settings: &settings::SchedulerSettings,
) -> CustomResult<(), errors::ProcessTrackerError> {
let stream_name = settings.stream.clone();
let group_name = settings.consumer_group.clone();
let group_name = settings.consumer.consumer_group.clone();
let consumer_name = format!("consumer_{}", Uuid::new_v4());
let group_created = &mut state

View File

@ -156,7 +156,7 @@ pub fn divide_into_batches(
.fold(Vec::new(), |mut batches, item| {
let batch = ProcessTrackerBatch {
id: batch_id.clone(),
group_name: conf.consumer_group.clone(),
group_name: conf.consumer.consumer_group.clone(),
stream_name: conf.stream.clone(),
connection_name: String::new(),
created_time: batch_creation_time,