feat(consumer): add workflow_id field in traces for consumer tasks (#480)

This commit is contained in:
Nishant Joshi
2023-02-02 16:39:25 +05:30
committed by GitHub
parent a8132e1082
commit 9ec435e86e
4 changed files with 9 additions and 5 deletions

View File

@ -198,13 +198,14 @@ pub async fn fetch_consumer_tasks(
}
// Accept flow_options if required
#[instrument(skip(state))]
#[instrument(skip(state), fields(workflow_id))]
pub async fn start_workflow(
state: AppState,
process: storage::ProcessTracker,
_pickup_time: PrimitiveDateTime,
runner: workflows::PTRunner,
) {
tracing::Span::current().record("workflow_id", Uuid::new_v4().to_string());
workflows::perform_workflow_execution(&state, process, runner).await
}
@ -233,12 +234,12 @@ pub async fn run_executor<'a>(
}
#[instrument(skip_all)]
pub async fn some_error_handler<E: fmt::Display>(
pub async fn consumer_error_handler<E: fmt::Display + fmt::Debug>(
state: &AppState,
process: storage::ProcessTracker,
error: E,
) -> CustomResult<(), errors::ProcessTrackerError> {
logger::error!(pt.name = ?process.name, pt.id = %process.id, %error, "Failed while executing workflow");
logger::error!(pt.name = ?process.name, pt.id = %process.id, ?error, "ERROR: Failed while executing workflow");
let db: &dyn StorageInterface = &*state.store;
db.process_tracker_update_process_status_by_ids(

View File

@ -1,9 +1,9 @@
use async_trait::async_trait;
use router_env::{instrument, tracing};
use serde::{Deserialize, Serialize};
use strum::EnumString;
use crate::{core::errors, routes::AppState, scheduler::consumer, types::storage};
pub mod payment_sync;
pub mod refund_router;
@ -22,6 +22,7 @@ macro_rules! runners {
pub struct $body;
} )*
#[instrument(skip(state))]
pub async fn perform_workflow_execution<'a>(state: &AppState, process: storage::ProcessTracker, runner: PTRunner)
where
{

View File

@ -85,7 +85,7 @@ impl ProcessTrackerWorkflow for PaymentsSyncWorkflow {
process: storage::ProcessTracker,
error: errors::ProcessTrackerError,
) -> errors::CustomResult<(), errors::ProcessTrackerError> {
consumer::some_error_handler(state, process, error).await
consumer::consumer_error_handler(state, process, error).await
}
}

View File

@ -50,6 +50,7 @@ const MERCHANT_ID: &str = "merchant_id";
const REQUEST_METHOD: &str = "request_method";
const REQUEST_URL_PATH: &str = "request_url_path";
const REQUEST_ID: &str = "request_id";
const WORKFLOW_ID: &str = "workflow_id";
/// Set of predefined implicit keys.
pub static IMPLICIT_KEYS: Lazy<rustc_hash::FxHashSet<&str>> = Lazy::new(|| {
@ -84,6 +85,7 @@ pub static EXTRA_IMPLICIT_KEYS: Lazy<rustc_hash::FxHashSet<&str>> = Lazy::new(||
set.insert(REQUEST_METHOD);
set.insert(REQUEST_URL_PATH);
set.insert(REQUEST_ID);
set.insert(WORKFLOW_ID);
set
});