refactor(scheduler): improve code reusability and consumer logs (#3712)

This commit is contained in:
Sanchith Hegde
2024-02-21 17:51:59 +05:30
committed by GitHub
parent deec8b4eb5
commit 7c63c76011
17 changed files with 478 additions and 409 deletions

View File

@ -8,7 +8,7 @@ use diesel_models::enums::{self, ProcessTrackerStatus};
pub use diesel_models::process_tracker as storage;
use error_stack::{report, ResultExt};
use redis_interface::{RedisConnectionPool, RedisEntryId};
use router_env::opentelemetry;
use router_env::{instrument, opentelemetry, tracing};
use uuid::Uuid;
use super::{
@ -178,7 +178,7 @@ pub async fn get_batches(
group_name: &str,
consumer_name: &str,
) -> CustomResult<Vec<ProcessTrackerBatch>, errors::ProcessTrackerError> {
let response = conn
let response = match conn
.stream_read_with_options(
stream_name,
RedisEntryId::UndeliveredEntryID,
@ -188,10 +188,20 @@ pub async fn get_batches(
Some((group_name, consumer_name)),
)
.await
.map_err(|error| {
logger::warn!(%error, "Warning: finding batch in stream");
error.change_context(errors::ProcessTrackerError::BatchNotFound)
})?;
{
Ok(response) => response,
Err(error) => {
if let redis_interface::errors::RedisError::StreamEmptyOrNotAvailable =
error.current_context()
{
logger::debug!("No batches processed as stream is empty");
return Ok(Vec::new());
} else {
return Err(error.change_context(errors::ProcessTrackerError::BatchNotFound));
}
}
};
metrics::BATCHES_CONSUMED.add(&metrics::CONTEXT, 1, &[]);
let (batches, entry_ids): (Vec<Vec<ProcessTrackerBatch>>, Vec<Vec<String>>) = response.into_values().map(|entries| {
@ -217,13 +227,13 @@ pub async fn get_batches(
conn.stream_acknowledge_entries(stream_name, group_name, entry_ids.clone())
.await
.map_err(|error| {
logger::error!(%error, "Error acknowledging batch in stream");
logger::error!(?error, "Error acknowledging batch in stream");
error.change_context(errors::ProcessTrackerError::BatchUpdateFailed)
})?;
conn.stream_delete_entries(stream_name, entry_ids.clone())
.await
.map_err(|error| {
logger::error!(%error, "Error deleting batch from stream");
logger::error!(?error, "Error deleting batch from stream");
error.change_context(errors::ProcessTrackerError::BatchDeleteFailed)
})?;
@ -231,7 +241,7 @@ pub async fn get_batches(
}
pub fn get_process_tracker_id<'a>(
runner: &'a str,
runner: storage::ProcessTrackerRunner,
task_name: &'a str,
txn_id: &'a str,
merchant_id: &'a str,
@ -243,6 +253,7 @@ pub fn get_time_from_delta(delta: Option<i32>) -> Option<time::PrimitiveDateTime
delta.map(|t| common_utils::date_time::now().saturating_add(time::Duration::seconds(t.into())))
}
#[instrument(skip_all)]
pub async fn consumer_operation_handler<E, T: Send + Sync + 'static>(
state: T,
settings: sync::Arc<SchedulerSettings>,