mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-11-01 11:06:50 +08:00
chore(process_tracker): use const instead of String for business_status (#4849)
Co-authored-by: SanchithHegde <22217505+SanchithHegde@users.noreply.github.com>
This commit is contained in:
@ -76,8 +76,6 @@ impl ProcessTrackerNew {
|
||||
where
|
||||
T: Serialize + std::fmt::Debug,
|
||||
{
|
||||
const BUSINESS_STATUS_PENDING: &str = "Pending";
|
||||
|
||||
let current_time = common_utils::date_time::now();
|
||||
Ok(Self {
|
||||
id: process_tracker_id.into(),
|
||||
@ -91,7 +89,7 @@ impl ProcessTrackerNew {
|
||||
.encode_to_value()
|
||||
.change_context(errors::DatabaseError::Others)
|
||||
.attach_printable("Failed to serialize process tracker tracking data")?,
|
||||
business_status: String::from(BUSINESS_STATUS_PENDING),
|
||||
business_status: String::from(business_status::PENDING),
|
||||
status: storage_enums::ProcessTrackerStatus::New,
|
||||
event: vec![],
|
||||
created_at: current_time,
|
||||
@ -227,3 +225,42 @@ mod tests {
|
||||
assert_eq!(enum_format, ProcessTrackerRunner::PaymentsSyncWorkflow);
|
||||
}
|
||||
}
|
||||
|
||||
pub mod business_status {
|
||||
/// Indicates that an irrecoverable error occurred during the workflow execution.
|
||||
pub const GLOBAL_FAILURE: &str = "GLOBAL_FAILURE";
|
||||
|
||||
/// Task successfully completed by consumer.
|
||||
/// A task that reaches this status should not be retried (rescheduled for execution) later.
|
||||
pub const COMPLETED_BY_PT: &str = "COMPLETED_BY_PT";
|
||||
|
||||
/// An error occurred during the workflow execution which prevents further execution and
|
||||
/// retries.
|
||||
/// A task that reaches this status should not be retried (rescheduled for execution) later.
|
||||
pub const FAILURE: &str = "FAILURE";
|
||||
|
||||
/// The resource associated with the task was removed, due to which further retries can/should
|
||||
/// not be done.
|
||||
pub const REVOKED: &str = "Revoked";
|
||||
|
||||
/// The task was executed for the maximum possible number of times without a successful outcome.
|
||||
/// A task that reaches this status should not be retried (rescheduled for execution) later.
|
||||
pub const RETRIES_EXCEEDED: &str = "RETRIES_EXCEEDED";
|
||||
|
||||
/// The outgoing webhook was successfully delivered in the initial attempt.
|
||||
/// Further retries of the task are not required.
|
||||
pub const INITIAL_DELIVERY_ATTEMPT_SUCCESSFUL: &str = "INITIAL_DELIVERY_ATTEMPT_SUCCESSFUL";
|
||||
|
||||
/// Indicates that an error occurred during the workflow execution.
|
||||
/// This status is typically set by the workflow error handler.
|
||||
/// A task that reaches this status should not be retried (rescheduled for execution) later.
|
||||
pub const GLOBAL_ERROR: &str = "GLOBAL_ERROR";
|
||||
|
||||
/// The resource associated with the task has been significantly modified since the task was
|
||||
/// created, due to which further retries of the current task are not required.
|
||||
/// A task that reaches this status should not be retried (rescheduled for execution) later.
|
||||
pub const RESOURCE_STATUS_MISMATCH: &str = "RESOURCE_STATUS_MISMATCH";
|
||||
|
||||
/// Business status set for newly created tasks.
|
||||
pub const PENDING: &str = "Pending";
|
||||
}
|
||||
|
||||
@ -4,7 +4,7 @@ use std::{collections::HashMap, str::FromStr, sync::Arc};
|
||||
use actix_web::{dev::Server, web, Scope};
|
||||
use api_models::health_check::SchedulerHealthCheckResponse;
|
||||
use common_utils::ext_traits::{OptionExt, StringExt};
|
||||
use diesel_models::process_tracker as storage;
|
||||
use diesel_models::process_tracker::{self as storage, business_status};
|
||||
use error_stack::ResultExt;
|
||||
use router::{
|
||||
configs::settings::{CmdLineConf, Settings},
|
||||
@ -329,10 +329,13 @@ impl ProcessTrackerWorkflows<routes::SessionState> for WorkflowRunner {
|
||||
let status = state
|
||||
.get_db()
|
||||
.as_scheduler()
|
||||
.finish_process_with_business_status(process, "GLOBAL_FAILURE".to_string())
|
||||
.finish_process_with_business_status(
|
||||
process,
|
||||
business_status::GLOBAL_FAILURE,
|
||||
)
|
||||
.await;
|
||||
if let Err(err) = status {
|
||||
logger::error!(%err, "Failed while performing database operation: GLOBAL_FAILURE");
|
||||
logger::error!(%err, "Failed while performing database operation: {}", business_status::GLOBAL_FAILURE);
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
@ -381,7 +381,9 @@ pub async fn update_api_key_expiry_task(
|
||||
retry_count: Some(0),
|
||||
schedule_time,
|
||||
tracking_data: Some(updated_api_key_expiry_workflow_model),
|
||||
business_status: Some("Pending".to_string()),
|
||||
business_status: Some(String::from(
|
||||
diesel_models::process_tracker::business_status::PENDING,
|
||||
)),
|
||||
status: Some(storage_enums::ProcessTrackerStatus::New),
|
||||
updated_at: Some(current_time),
|
||||
};
|
||||
@ -450,7 +452,7 @@ pub async fn revoke_api_key_expiry_task(
|
||||
let task_ids = vec![task_id];
|
||||
let updated_process_tracker_data = storage::ProcessTrackerUpdate::StatusUpdate {
|
||||
status: storage_enums::ProcessTrackerStatus::Finish,
|
||||
business_status: Some("Revoked".to_string()),
|
||||
business_status: Some(String::from(diesel_models::business_status::REVOKED)),
|
||||
};
|
||||
|
||||
store
|
||||
|
||||
@ -1178,7 +1178,7 @@ pub async fn start_tokenize_data_workflow(
|
||||
db.as_scheduler()
|
||||
.finish_process_with_business_status(
|
||||
tokenize_tracker.clone(),
|
||||
"COMPLETED_BY_PT".to_string(),
|
||||
diesel_models::process_tracker::business_status::COMPLETED_BY_PT,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
@ -1241,7 +1241,10 @@ pub async fn retry_delete_tokenize(
|
||||
}
|
||||
None => db
|
||||
.as_scheduler()
|
||||
.finish_process_with_business_status(pt, "RETRIES_EXCEEDED".to_string())
|
||||
.finish_process_with_business_status(
|
||||
pt,
|
||||
diesel_models::process_tracker::business_status::RETRIES_EXCEEDED,
|
||||
)
|
||||
.await
|
||||
.map_err(Into::into),
|
||||
}
|
||||
|
||||
@ -9,6 +9,7 @@ use common_utils::{
|
||||
ext_traits::{AsyncExt, ValueExt},
|
||||
types::MinorUnit,
|
||||
};
|
||||
use diesel_models::process_tracker::business_status;
|
||||
use error_stack::{report, ResultExt};
|
||||
use masking::PeekInterface;
|
||||
use router_env::{instrument, tracing};
|
||||
@ -1028,7 +1029,7 @@ pub async fn sync_refund_with_gateway_workflow(
|
||||
.as_scheduler()
|
||||
.finish_process_with_business_status(
|
||||
refund_tracker.clone(),
|
||||
"COMPLETED_BY_PT".to_string(),
|
||||
business_status::COMPLETED_BY_PT,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
@ -1193,7 +1194,7 @@ pub async fn trigger_refund_execute_workflow(
|
||||
db.as_scheduler()
|
||||
.finish_process_with_business_status(
|
||||
refund_tracker.clone(),
|
||||
"COMPLETED_BY_PT".to_string(),
|
||||
business_status::COMPLETED_BY_PT,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@ -3,6 +3,7 @@ use api_models::{
|
||||
webhooks,
|
||||
};
|
||||
use common_utils::{ext_traits::Encode, request::RequestContent};
|
||||
use diesel_models::process_tracker::business_status;
|
||||
use error_stack::{report, ResultExt};
|
||||
use masking::{ExposeInterface, Mask, PeekInterface, Secret};
|
||||
use router_env::{
|
||||
@ -231,7 +232,7 @@ async fn trigger_webhook_to_merchant(
|
||||
state
|
||||
.store
|
||||
.as_scheduler()
|
||||
.finish_process_with_business_status(process_tracker, "FAILURE".into())
|
||||
.finish_process_with_business_status(process_tracker, business_status::FAILURE)
|
||||
.await
|
||||
.change_context(
|
||||
errors::WebhooksFlowError::OutgoingWebhookProcessTrackerTaskUpdateFailed,
|
||||
@ -304,7 +305,7 @@ async fn trigger_webhook_to_merchant(
|
||||
state.clone(),
|
||||
&business_profile.merchant_id,
|
||||
process_tracker,
|
||||
"INITIAL_DELIVERY_ATTEMPT_SUCCESSFUL",
|
||||
business_status::INITIAL_DELIVERY_ATTEMPT_SUCCESSFUL,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
@ -769,7 +770,7 @@ async fn success_response_handler(
|
||||
Some(process_tracker) => state
|
||||
.store
|
||||
.as_scheduler()
|
||||
.finish_process_with_business_status(process_tracker, business_status.into())
|
||||
.finish_process_with_business_status(process_tracker, business_status)
|
||||
.await
|
||||
.change_context(
|
||||
errors::WebhooksFlowError::OutgoingWebhookProcessTrackerTaskUpdateFailed,
|
||||
|
||||
@ -1821,7 +1821,7 @@ impl ProcessTrackerInterface for KafkaStore {
|
||||
async fn finish_process_with_business_status(
|
||||
&self,
|
||||
this: storage::ProcessTracker,
|
||||
business_status: String,
|
||||
business_status: &'static str,
|
||||
) -> CustomResult<(), errors::StorageError> {
|
||||
self.diesel_store
|
||||
.finish_process_with_business_status(this, business_status)
|
||||
|
||||
@ -40,7 +40,8 @@ pub mod user_role;
|
||||
use std::collections::HashMap;
|
||||
|
||||
pub use diesel_models::{
|
||||
ProcessTracker, ProcessTrackerNew, ProcessTrackerRunner, ProcessTrackerUpdate,
|
||||
process_tracker::business_status, ProcessTracker, ProcessTrackerNew, ProcessTrackerRunner,
|
||||
ProcessTrackerUpdate,
|
||||
};
|
||||
pub use hyperswitch_domain_models::payments::{
|
||||
payment_attempt::{PaymentAttempt, PaymentAttemptNew, PaymentAttemptUpdate},
|
||||
|
||||
@ -1,5 +1,7 @@
|
||||
use common_utils::{errors::ValidationError, ext_traits::ValueExt};
|
||||
use diesel_models::{enums as storage_enums, ApiKeyExpiryTrackingData};
|
||||
use diesel_models::{
|
||||
enums as storage_enums, process_tracker::business_status, ApiKeyExpiryTrackingData,
|
||||
};
|
||||
use router_env::logger;
|
||||
use scheduler::{workflows::ProcessTrackerWorkflow, SchedulerSessionState};
|
||||
|
||||
@ -96,7 +98,7 @@ impl ProcessTrackerWorkflow<SessionState> for ApiKeyExpiryWorkflow {
|
||||
state
|
||||
.get_db()
|
||||
.as_scheduler()
|
||||
.finish_process_with_business_status(process, "COMPLETED_BY_PT".to_string())
|
||||
.finish_process_with_business_status(process, business_status::COMPLETED_BY_PT)
|
||||
.await?
|
||||
}
|
||||
// If tasks are remaining that has to be scheduled
|
||||
|
||||
@ -6,6 +6,7 @@ use api_models::{
|
||||
webhooks::{OutgoingWebhook, OutgoingWebhookContent},
|
||||
};
|
||||
use common_utils::ext_traits::{StringExt, ValueExt};
|
||||
use diesel_models::process_tracker::business_status;
|
||||
use error_stack::ResultExt;
|
||||
use masking::PeekInterface;
|
||||
use router_env::tracing::{self, instrument};
|
||||
@ -197,7 +198,7 @@ impl ProcessTrackerWorkflow<SessionState> for OutgoingWebhookRetryWorkflow {
|
||||
db.as_scheduler()
|
||||
.finish_process_with_business_status(
|
||||
process.clone(),
|
||||
"RESOURCE_STATUS_MISMATCH".to_string(),
|
||||
business_status::RESOURCE_STATUS_MISMATCH,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
@ -309,7 +310,7 @@ pub(crate) async fn retry_webhook_delivery_task(
|
||||
}
|
||||
None => {
|
||||
db.as_scheduler()
|
||||
.finish_process_with_business_status(process, "RETRIES_EXCEEDED".to_string())
|
||||
.finish_process_with_business_status(process, business_status::RETRIES_EXCEEDED)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
use common_utils::ext_traits::{OptionExt, StringExt, ValueExt};
|
||||
use diesel_models::process_tracker::business_status;
|
||||
use error_stack::ResultExt;
|
||||
use router_env::logger;
|
||||
use scheduler::{
|
||||
@ -89,7 +90,7 @@ impl ProcessTrackerWorkflow<SessionState> for PaymentsSyncWorkflow {
|
||||
state
|
||||
.store
|
||||
.as_scheduler()
|
||||
.finish_process_with_business_status(process, "COMPLETED_BY_PT".to_string())
|
||||
.finish_process_with_business_status(process, business_status::COMPLETED_BY_PT)
|
||||
.await?
|
||||
}
|
||||
_ => {
|
||||
@ -269,7 +270,7 @@ pub async fn retry_sync_task(
|
||||
}
|
||||
None => {
|
||||
db.as_scheduler()
|
||||
.finish_process_with_business_status(pt, "RETRIES_EXCEEDED".to_string())
|
||||
.finish_process_with_business_status(pt, business_status::RETRIES_EXCEEDED)
|
||||
.await?;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
@ -30,7 +30,7 @@ use crate::{
|
||||
|
||||
// Valid consumer business statuses
|
||||
pub fn valid_business_statuses() -> Vec<&'static str> {
|
||||
vec!["Pending"]
|
||||
vec![storage::business_status::PENDING]
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
@ -262,7 +262,7 @@ pub async fn consumer_error_handler(
|
||||
vec![process.id],
|
||||
storage::ProcessTrackerUpdate::StatusUpdate {
|
||||
status: enums::ProcessTrackerStatus::Finish,
|
||||
business_status: Some("GLOBAL_ERROR".to_string()),
|
||||
business_status: Some(String::from(storage::business_status::GLOBAL_ERROR)),
|
||||
},
|
||||
)
|
||||
.await
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
use async_trait::async_trait;
|
||||
use common_utils::errors::CustomResult;
|
||||
pub use diesel_models::process_tracker as storage;
|
||||
use diesel_models::process_tracker::business_status;
|
||||
use router_env::logger;
|
||||
|
||||
use crate::{errors, SchedulerSessionState};
|
||||
@ -45,7 +46,10 @@ pub trait ProcessTrackerWorkflows<T>: Send + Sync {
|
||||
let status = app_state
|
||||
.get_db()
|
||||
.as_scheduler()
|
||||
.finish_process_with_business_status(process, "GLOBAL_FAILURE".to_string())
|
||||
.finish_process_with_business_status(
|
||||
process,
|
||||
business_status::GLOBAL_FAILURE,
|
||||
)
|
||||
.await;
|
||||
if let Err(error) = status {
|
||||
logger::error!(?error, "Failed to update process business status");
|
||||
|
||||
@ -52,7 +52,7 @@ pub trait ProcessTrackerInterface: Send + Sync + 'static {
|
||||
async fn finish_process_with_business_status(
|
||||
&self,
|
||||
this: storage::ProcessTracker,
|
||||
business_status: String,
|
||||
business_status: &'static str,
|
||||
) -> CustomResult<(), errors::StorageError>;
|
||||
|
||||
async fn find_processes_by_time_status(
|
||||
@ -166,13 +166,13 @@ impl ProcessTrackerInterface for Store {
|
||||
async fn finish_process_with_business_status(
|
||||
&self,
|
||||
this: storage::ProcessTracker,
|
||||
business_status: String,
|
||||
business_status: &'static str,
|
||||
) -> CustomResult<(), errors::StorageError> {
|
||||
self.update_process(
|
||||
this,
|
||||
storage::ProcessTrackerUpdate::StatusUpdate {
|
||||
status: storage_enums::ProcessTrackerStatus::Finish,
|
||||
business_status: Some(business_status),
|
||||
business_status: Some(String::from(business_status)),
|
||||
},
|
||||
)
|
||||
.await
|
||||
@ -284,7 +284,7 @@ impl ProcessTrackerInterface for MockDb {
|
||||
async fn finish_process_with_business_status(
|
||||
&self,
|
||||
_this: storage::ProcessTracker,
|
||||
_business_status: String,
|
||||
_business_status: &'static str,
|
||||
) -> CustomResult<(), errors::StorageError> {
|
||||
// [#172]: Implement function for `MockDb`
|
||||
Err(errors::StorageError::MockDbError)?
|
||||
|
||||
Reference in New Issue
Block a user