feat(process_tracker): changing runner selection to dyn dispatch (#853)

This commit is contained in:
Nishant Joshi
2023-04-11 14:21:05 +05:30
committed by GitHub
parent 2351116692
commit 18b84c428f
4 changed files with 37 additions and 29 deletions

View File

@ -24,7 +24,9 @@ pub async fn start_process_tracker(
) -> CustomResult<(), errors::ProcessTrackerError> {
match scheduler_flow {
SchedulerFlow::Producer => producer::start_producer(state, scheduler_settings).await?,
SchedulerFlow::Consumer => consumer::start_consumer(state, scheduler_settings).await?,
SchedulerFlow::Consumer => {
consumer::start_consumer(state, scheduler_settings, workflows::runner_from_task).await?
}
SchedulerFlow::Cleaner => {
error!("This flow has not been implemented yet!");
}

View File

@ -36,6 +36,7 @@ pub fn valid_business_statuses() -> Vec<&'static str> {
pub async fn start_consumer(
state: &AppState,
settings: sync::Arc<settings::SchedulerSettings>,
workflow_selector: workflows::WorkflowSelectorFn,
) -> CustomResult<(), errors::ProcessTrackerError> {
use std::time::Duration;
@ -78,6 +79,7 @@ pub async fn start_consumer(
logger::error!(%err);
},
sync::Arc::clone(&consumer_operation_counter),
workflow_selector,
));
}
Ok(()) | Err(oneshot::error::TryRecvError::Closed) => {
@ -108,6 +110,7 @@ pub async fn start_consumer(
pub async fn consumer_operations(
state: &AppState,
settings: &settings::SchedulerSettings,
workflow_selector: workflows::WorkflowSelectorFn,
) -> CustomResult<(), errors::ProcessTrackerError> {
let stream_name = settings.stream.clone();
let group_name = settings.consumer.consumer_group.clone();
@ -135,7 +138,7 @@ pub async fn consumer_operations(
pt_utils::add_histogram_metrics(&pickup_time, task, &stream_name);
metrics::TASK_CONSUMED.add(&metrics::CONTEXT, 1, &[]);
let runner = pt_utils::runner_from_task(task)?;
let runner = workflow_selector(task)?.ok_or(errors::ProcessTrackerError::UnexpectedFlow)?;
handler.push(tokio::task::spawn(start_workflow(
state.clone(),
task.clone(),
@ -190,21 +193,21 @@ pub async fn fetch_consumer_tasks(
}
// Accept flow_options if required
#[instrument(skip(state), fields(workflow_id))]
#[instrument(skip(state, runner), fields(workflow_id))]
pub async fn start_workflow(
state: AppState,
process: storage::ProcessTracker,
_pickup_time: PrimitiveDateTime,
runner: workflows::PTRunner,
runner: Box<dyn ProcessTrackerWorkflow>,
) {
tracing::Span::current().record("workflow_id", Uuid::new_v4().to_string());
workflows::perform_workflow_execution(&state, process, runner).await
run_executor(&state, process, runner).await
}
pub async fn run_executor<'a>(
state: &'a AppState,
pub async fn run_executor(
state: &AppState,
process: storage::ProcessTracker,
operation: &(impl ProcessTrackerWorkflow + Send + Sync),
operation: Box<dyn ProcessTrackerWorkflow>,
) {
let output = operation.execute_workflow(state, process.clone()).await;
match output {

View File

@ -19,7 +19,6 @@ use crate::{
self,
enums::{self, ProcessTrackerStatus},
},
utils::{OptionExt, StringExt},
};
pub async fn divide_and_append_tasks(
@ -246,6 +245,7 @@ pub async fn consumer_operation_handler<E>(
settings: sync::Arc<SchedulerSettings>,
error_handler_fun: E,
consumer_operation_counter: sync::Arc<atomic::AtomicU64>,
workflow_selector: workflows::WorkflowSelectorFn,
) where
// Error handler function
E: FnOnce(error_stack::Report<errors::ProcessTrackerError>),
@ -253,7 +253,7 @@ pub async fn consumer_operation_handler<E>(
consumer_operation_counter.fetch_add(1, atomic::Ordering::Release);
let start_time = std_time::Instant::now();
match consumer::consumer_operations(&state, &settings).await {
match consumer::consumer_operations(&state, &settings, workflow_selector).await {
Ok(_) => (),
Err(err) => error_handler_fun(err),
}
@ -265,14 +265,6 @@ pub async fn consumer_operation_handler<E>(
logger::info!("Current tasks being executed: {}", current_count);
}
pub fn runner_from_task(
task: &storage::ProcessTracker,
) -> Result<workflows::PTRunner, errors::ProcessTrackerError> {
let runner = task.runner.clone().get_required_value("runner")?;
Ok(runner.parse_enum("PTRunner")?)
}
pub fn add_histogram_metrics(
pickup_time: &time::PrimitiveDateTime,
task: &mut storage::ProcessTracker,

View File

@ -1,9 +1,13 @@
use async_trait::async_trait;
use router_env::{instrument, tracing};
use serde::{Deserialize, Serialize};
use strum::EnumString;
use crate::{core::errors, routes::AppState, scheduler::consumer, types::storage};
use crate::{
core::errors,
routes::AppState,
types::storage,
utils::{OptionExt, StringExt},
};
pub mod payment_sync;
pub mod refund_router;
pub mod tokenized_data;
@ -23,17 +27,19 @@ macro_rules! runners {
pub struct $body;
} )*
#[instrument(skip(state))]
pub async fn perform_workflow_execution<'a>(state: &AppState, process: storage::ProcessTracker, runner: PTRunner)
where
{
match runner {
$( PTRunner::$body => {
let flow = &$body;
consumer::run_executor(state, process, flow).await
pub fn runner_from_task(task: &storage::ProcessTracker) -> Result<Option<Box<dyn ProcessTrackerWorkflow>>, errors::ProcessTrackerError> {
let runner = task.runner.clone().get_required_value("runner")?;
let runner: Option<PTRunner> = runner.parse_enum("PTRunner").ok();
Ok(match runner {
$( Some( PTRunner::$body ) => {
Some(Box::new($body))
} ,)*
None => {
None
}
})
}
};
}
@ -49,6 +55,11 @@ runners! {
DeleteTokenizeDataWorkflow
}
pub type WorkflowSelectorFn =
fn(
&storage::ProcessTracker,
) -> Result<Option<Box<dyn ProcessTrackerWorkflow>>, errors::ProcessTrackerError>;
#[async_trait]
pub trait ProcessTrackerWorkflow: Send + Sync {
// The core execution of the workflow