diff --git a/Cargo.lock b/Cargo.lock index 88dfe3a9e0..fff48158de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -432,6 +432,18 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-bb8-diesel" +version = "0.1.0" +source = "git+https://github.com/juspay/async-bb8-diesel?rev=9a71d142726dbc33f41c1fd935ddaa79841c7be5#9a71d142726dbc33f41c1fd935ddaa79841c7be5" +dependencies = [ + "async-trait", + "bb8", + "diesel", + "thiserror", + "tokio", +] + [[package]] name = "async-bb8-diesel" version = "0.1.0" @@ -446,9 +458,9 @@ dependencies = [ [[package]] name = "async-channel" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" dependencies = [ "concurrent-queue", "event-listener", @@ -684,6 +696,39 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws-sdk-s3" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "392b9811ca489747ac84349790e49deaa1f16631949e7dd4156000251c260eae" +dependencies = [ + "aws-credential-types", + "aws-endpoint", + "aws-http", + "aws-sig-auth", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-checksums", + "aws-smithy-client", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes", + "http", + "http-body", + "once_cell", + "percent-encoding", + "regex", + "tokio-stream", + "tower", + "tracing", + "url", +] + [[package]] name = "aws-sdk-s3" version = "0.28.0" @@ -1745,11 +1790,14 @@ dependencies = [ name = "diesel_models" version = "0.1.0" dependencies = [ - "async-bb8-diesel", + "async-bb8-diesel 0.1.0 (git+https://github.com/oxidecomputer/async-bb8-diesel?rev=be3d9bce50051d8c0e0c06078e8066cc27db3001)", + "aws-config", + "aws-sdk-s3 0.28.0", "common_enums", "common_utils", "diesel", "error-stack", + "external_services", "frunk", "frunk_core", "masking", @@ -1818,7 +1866,7 @@ checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257" name = "drainer" version = "0.1.0" dependencies = [ - "async-bb8-diesel", + "async-bb8-diesel 0.1.0 (git+https://github.com/oxidecomputer/async-bb8-diesel?rev=be3d9bce50051d8c0e0c06078e8066cc27db3001)", "bb8", "clap", "common_utils", @@ -1866,7 +1914,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" dependencies = [ "atty", - "humantime", + "humantime 1.3.0", + "log", + "regex", + "termcolor", +] + +[[package]] +name = "env_logger" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85cdab6a89accf66733ad5a1693a4dcced6aeff64602b634530dd73c1f3ee9f0" +dependencies = [ + "humantime 2.1.0", + "is-terminal", "log", "regex", "termcolor", @@ -1948,9 +2009,9 @@ dependencies = [ [[package]] name = "fake" -version = "2.6.1" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a44c765350db469b774425ff1c833890b16ceb9612fb5d7c4bbdf4a1b55f876" +checksum = "9af7b0c58ac9d03169e27f080616ce9f64004edca3d2ef4147a811c21b23b319" dependencies = [ "rand 0.8.5", "unidecode", @@ -2454,6 +2515,12 @@ dependencies = [ "quick-error", ] +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.27" @@ -2638,6 +2705,18 @@ version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "12b6ee2129af8d4fb011108c73d99a1b83a85977f23b82460c0ae2e25bb4b57f" +[[package]] +name = "is-terminal" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f" +dependencies = [ + "hermit-abi 0.3.1", + "io-lifetimes", + "rustix", + "windows-sys 0.48.0", +] + [[package]] name = "itertools" version = "0.10.5" @@ -3517,7 +3596,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "926d36b9553851b8b0005f1275891b392ee4d2d833852c417ed025477350fb9d" dependencies = [ - "env_logger", + "env_logger 0.7.1", "log", ] @@ -3971,11 +4050,11 @@ dependencies = [ "actix-rt", "actix-web", "api_models", - "async-bb8-diesel", + "async-bb8-diesel 0.1.0 (git+https://github.com/oxidecomputer/async-bb8-diesel?rev=be3d9bce50051d8c0e0c06078e8066cc27db3001)", "async-trait", "awc", "aws-config", - "aws-sdk-s3", + "aws-sdk-s3 0.28.0", "base64 0.21.2", "bb8", "blake3", @@ -4017,6 +4096,7 @@ dependencies = [ "router_derive", "router_env", "roxmltree", + "scheduler", "serde", "serde_json", "serde_path_to_error", @@ -4249,6 +4329,49 @@ dependencies = [ "parking_lot", ] +[[package]] +name = "scheduler" +version = "0.1.0" +dependencies = [ + "actix-multipart", + "actix-rt", + "actix-web", + "api_models", + "async-bb8-diesel 0.1.0 (git+https://github.com/juspay/async-bb8-diesel?rev=9a71d142726dbc33f41c1fd935ddaa79841c7be5)", + "async-trait", + "aws-config", + "aws-sdk-s3 0.25.1", + "cards", + "clap", + "common_utils", + "diesel", + "diesel_models", + "dyn-clone", + "env_logger 0.10.0", + "error-stack", + "external_services", + "frunk", + "frunk_core", + "futures", + "infer 0.13.0", + "masking", + "once_cell", + "rand 0.8.5", + "redis_interface", + "router_derive", + "router_env", + "serde", + "serde_json", + "signal-hook", + "signal-hook-tokio", + "storage_impl", + "strum 0.24.1", + "thiserror", + "time 0.3.22", + "tokio", + "uuid", +] + [[package]] name = "scoped_threadpool" version = "0.1.9" @@ -4604,23 +4727,32 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" name = "storage_impl" version = "0.1.0" dependencies = [ + "actix-web", "api_models", - "async-bb8-diesel", + "async-bb8-diesel 0.1.0 (git+https://github.com/oxidecomputer/async-bb8-diesel?rev=be3d9bce50051d8c0e0c06078e8066cc27db3001)", "async-trait", "bb8", + "bytes", "common_utils", + "config", "crc32fast", "data_models", "diesel", "diesel_models", "dyn-clone", "error-stack", + "external_services", "futures", + "http", "masking", + "mime", "moka", "once_cell", "redis_interface", + "ring", "router_env", + "serde", + "thiserror", "tokio", ] diff --git a/crates/common_utils/src/ext_traits.rs b/crates/common_utils/src/ext_traits.rs index e562c5914f..3f3985997f 100644 --- a/crates/common_utils/src/ext_traits.rs +++ b/crates/common_utils/src/ext_traits.rs @@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize}; use crate::{ crypto, errors::{self, CustomResult}, + fp_utils::when, }; /// @@ -489,3 +490,105 @@ impl XmlExt for &str { de::from_str(self) } } + +/// Extension trait for Option to validate missing fields +pub trait OptionExt { + /// check if the current option is Some + fn check_value_present( + &self, + field_name: &'static str, + ) -> CustomResult<(), errors::ValidationError>; + + /// Throw missing required field error when the value is None + fn get_required_value( + self, + field_name: &'static str, + ) -> CustomResult; + + /// Try parsing the option as Enum + fn parse_enum(self, enum_name: &'static str) -> CustomResult + where + T: AsRef, + E: std::str::FromStr, + // Requirement for converting the `Err` variant of `FromStr` to `Report` + ::Err: std::error::Error + Send + Sync + 'static; + + /// Try parsing the option as Type + fn parse_value(self, type_name: &'static str) -> CustomResult + where + T: ValueExt, + U: serde::de::DeserializeOwned; + + /// update option value + fn update_value(&mut self, value: Option); +} + +impl OptionExt for Option +where + T: std::fmt::Debug, +{ + #[track_caller] + fn check_value_present( + &self, + field_name: &'static str, + ) -> CustomResult<(), errors::ValidationError> { + when(self.is_none(), || { + Err(errors::ValidationError::MissingRequiredField { + field_name: field_name.to_string(), + }) + .into_report() + .attach_printable(format!("Missing required field {field_name} in {self:?}")) + }) + } + + // This will allow the error message that was generated in this function to point to the call site + #[track_caller] + fn get_required_value( + self, + field_name: &'static str, + ) -> CustomResult { + match self { + Some(v) => Ok(v), + None => Err(errors::ValidationError::MissingRequiredField { + field_name: field_name.to_string(), + }) + .into_report() + .attach_printable(format!("Missing required field {field_name} in {self:?}")), + } + } + + #[track_caller] + fn parse_enum(self, enum_name: &'static str) -> CustomResult + where + T: AsRef, + E: std::str::FromStr, + ::Err: std::error::Error + Send + Sync + 'static, + { + let value = self + .get_required_value(enum_name) + .change_context(errors::ParsingError::UnknownError)?; + + E::from_str(value.as_ref()) + .into_report() + .change_context(errors::ParsingError::UnknownError) + .attach_printable_lazy(|| format!("Invalid {{ {enum_name}: {value:?} }} ")) + } + + #[track_caller] + fn parse_value(self, type_name: &'static str) -> CustomResult + where + T: ValueExt, + U: serde::de::DeserializeOwned, + { + let value = self + .get_required_value(type_name) + .change_context(errors::ParsingError::UnknownError)?; + value.parse_value(type_name) + } + + fn update_value(&mut self, value: Self) { + if let Some(a) = value { + *self = Some(a) + } + } +} diff --git a/crates/diesel_models/Cargo.toml b/crates/diesel_models/Cargo.toml index 19b28aac73..3e8efe3002 100644 --- a/crates/diesel_models/Cargo.toml +++ b/crates/diesel_models/Cargo.toml @@ -9,10 +9,15 @@ license.workspace = true [features] default = ["kv_store"] +email = ["external_services/email", "dep:aws-config"] +kms = ["external_services/kms", "dep:aws-config"] kv_store = [] +s3 = ["dep:aws-sdk-s3", "dep:aws-config"] [dependencies] async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "be3d9bce50051d8c0e0c06078e8066cc27db3001" } +aws-config = { version = "0.55.3", optional = true } +aws-sdk-s3 = { version = "0.28.0", optional = true } diesel = { version = "2.1.0", features = ["postgres", "serde_json", "time", "64-column-tables"] } error-stack = "0.3.1" frunk = "0.4.1" @@ -26,6 +31,7 @@ time = { version = "0.3.21", features = ["serde", "serde-well-known", "std"] } # First party crates common_enums = { path = "../common_enums" } common_utils = { version = "0.1.0", path = "../common_utils" } +external_services = { version = "0.1.0", path = "../external_services" } masking = { version = "0.1.0", path = "../masking" } router_derive = { version = "0.1.0", path = "../router_derive" } router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] } diff --git a/crates/diesel_models/src/lib.rs b/crates/diesel_models/src/lib.rs index ca979d30a2..474ac984e6 100644 --- a/crates/diesel_models/src/lib.rs +++ b/crates/diesel_models/src/lib.rs @@ -18,6 +18,7 @@ pub mod fraud_check; #[cfg(feature = "kv_store")] pub mod kv; pub mod locker_mock_up; +pub mod macros; pub mod mandate; pub mod merchant_account; pub mod merchant_connector_account; @@ -38,6 +39,12 @@ use diesel_impl::{DieselArray, OptionalDieselArray}; pub type StorageResult = error_stack::Result; pub type PgPooledConn = async_bb8_diesel::Connection; +pub use self::{ + address::*, api_keys::*, cards_info::*, configs::*, connector_response::*, customers::*, + dispute::*, ephemeral_key::*, events::*, file::*, locker_mock_up::*, mandate::*, + merchant_account::*, merchant_connector_account::*, payment_attempt::*, payment_intent::*, + payment_method::*, process_tracker::*, refund::*, reverse_lookup::*, +}; /// The types and implementations provided by this module are required for the schema generated by /// `diesel_cli` 2.0 to work with the types defined in Rust code. This is because diff --git a/crates/diesel_models/src/macros.rs b/crates/diesel_models/src/macros.rs new file mode 100644 index 0000000000..de3596ecc1 --- /dev/null +++ b/crates/diesel_models/src/macros.rs @@ -0,0 +1,6 @@ +#[macro_export] +macro_rules! async_spawn { + ($t:block) => { + tokio::spawn(async move { $t }); + }; +} diff --git a/crates/diesel_models/src/services/logger.rs b/crates/diesel_models/src/services/logger.rs new file mode 100644 index 0000000000..9c1b20c9d2 --- /dev/null +++ b/crates/diesel_models/src/services/logger.rs @@ -0,0 +1,5 @@ +//! +//! Logger of the system. +//! + +pub use crate::env::logger::*; diff --git a/crates/external_services/src/email.rs b/crates/external_services/src/email.rs index 2da15bead5..b2bf99d8e0 100644 --- a/crates/external_services/src/email.rs +++ b/crates/external_services/src/email.rs @@ -102,16 +102,20 @@ impl EmailClient for AwsSes { } } +#[allow(missing_docs)] /// Errors that could occur from EmailClient. #[derive(Debug, thiserror::Error)] pub enum EmailError { /// An error occurred when building email client. #[error("Error building email client")] ClientBuildingFailure, - /// An error occurred when sending email #[error("Error sending email to recipient")] EmailSendingFailure, + #[error("Failed to generate email token")] + TokenGenerationFailure, + #[error("Feature not implemented")] + NotImplemented, } /// Errors that could occur during SES operations. diff --git a/crates/router/Cargo.toml b/crates/router/Cargo.toml index 4872bf915a..51bfcddac4 100644 --- a/crates/router/Cargo.toml +++ b/crates/router/Cargo.toml @@ -16,9 +16,9 @@ email = ["external_services/email", "dep:aws-config"] basilisk = ["kms"] stripe = ["dep:serde_qs"] release = ["kms", "stripe", "basilisk", "s3", "email","accounts_cache"] -olap = ["data_models/olap", "storage_impl/olap"] +olap = ["data_models/olap", "storage_impl/olap", "scheduler/olap"] oltp = ["data_models/oltp", "storage_impl/oltp"] -kv_store = [] +kv_store = ["scheduler/kv_store"] accounts_cache = [] openapi = ["olap", "oltp", "payouts"] vergen = ["router_env/vergen"] @@ -95,6 +95,7 @@ redis_interface = { version = "0.1.0", path = "../redis_interface" } router_derive = { version = "0.1.0", path = "../router_derive" } router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] } diesel_models = { version = "0.1.0", path = "../diesel_models", features = ["kv_store"] } +scheduler = { version = "0.1.0", path = "../scheduler", default-features = false} data_models = { version = "0.1.0", path = "../data_models", default-features = false } storage_impl = { version = "0.1.0", path = "../storage_impl", default-features = false } diff --git a/crates/router/src/bin/scheduler.rs b/crates/router/src/bin/scheduler.rs index dc5b9bc42f..09f23bc3b2 100644 --- a/crates/router/src/bin/scheduler.rs +++ b/crates/router/src/bin/scheduler.rs @@ -1,18 +1,28 @@ #![recursion_limit = "256"] use std::{str::FromStr, sync::Arc}; +use common_utils::ext_traits::{OptionExt, StringExt}; +use diesel_models::process_tracker as storage; use error_stack::ResultExt; use router::{ configs::settings::{CmdLineConf, Settings}, core::errors::{self, CustomResult}, - logger, routes, scheduler, services, + logger, routes, services, + types::storage::ProcessTrackerExt, + workflows, }; +use scheduler::{ + consumer::workflows::ProcessTrackerWorkflow, errors::ProcessTrackerError, + workflows::ProcessTrackerWorkflows, SchedulerAppState, +}; +use serde::{Deserialize, Serialize}; +use strum::EnumString; use tokio::sync::{mpsc, oneshot}; const SCHEDULER_FLOW: &str = "SCHEDULER_FLOW"; #[tokio::main] -async fn main() -> CustomResult<(), errors::ProcessTrackerError> { +async fn main() -> CustomResult<(), ProcessTrackerError> { // console_subscriber::init(); let cmd_line = ::parse(); @@ -59,16 +69,97 @@ async fn main() -> CustomResult<(), errors::ProcessTrackerError> { Ok(()) } +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, EnumString)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +#[strum(serialize_all = "SCREAMING_SNAKE_CASE")] +pub enum PTRunner { + PaymentsSyncWorkflow, + RefundWorkflowRouter, + DeleteTokenizeDataWorkflow, +} + +#[derive(Debug, Copy, Clone)] +pub struct WorkflowRunner; + +#[async_trait::async_trait] +impl ProcessTrackerWorkflows for WorkflowRunner { + async fn trigger_workflow<'a>( + &'a self, + state: &'a routes::AppState, + process: storage::ProcessTracker, + ) -> Result<(), ProcessTrackerError> { + let runner = process.runner.clone().get_required_value("runner")?; + let runner: Option = runner.parse_enum("PTRunner").ok(); + let operation: Box> = match runner { + Some(PTRunner::PaymentsSyncWorkflow) => { + Box::new(workflows::payment_sync::PaymentsSyncWorkflow) + } + Some(PTRunner::RefundWorkflowRouter) => { + Box::new(workflows::refund_router::RefundWorkflowRouter) + } + Some(PTRunner::DeleteTokenizeDataWorkflow) => { + Box::new(workflows::tokenized_data::DeleteTokenizeDataWorkflow) + } + _ => Err(ProcessTrackerError::UnexpectedFlow)?, + }; + let app_state = &state.clone(); + let output = operation.execute_workflow(app_state, process.clone()).await; + match output { + Ok(_) => operation.success_handler(app_state, process).await, + Err(error) => match operation + .error_handler(app_state, process.clone(), error) + .await + { + Ok(_) => (), + Err(error) => { + logger::error!(%error, "Failed while handling error"); + let status = process + .finish_with_status( + state.get_db().as_scheduler(), + "GLOBAL_FAILURE".to_string(), + ) + .await; + if let Err(err) = status { + logger::error!(%err, "Failed while performing database operation: GLOBAL_FAILURE"); + } + } + }, + }; + Ok(()) + } +} + async fn start_scheduler( state: &routes::AppState, scheduler_flow: scheduler::SchedulerFlow, channel: (mpsc::Sender<()>, mpsc::Receiver<()>), -) -> CustomResult<(), errors::ProcessTrackerError> { +) -> CustomResult<(), ProcessTrackerError> { let scheduler_settings = state .conf .scheduler .clone() .ok_or(errors::ProcessTrackerError::ConfigurationError)?; - scheduler::start_process_tracker(state, scheduler_flow, Arc::new(scheduler_settings), channel) - .await + scheduler::start_process_tracker( + state, + scheduler_flow, + Arc::new(scheduler_settings), + channel, + WorkflowRunner {}, + ) + .await +} + +#[cfg(test)] +mod workflow_tests { + #![allow(clippy::unwrap_used)] + use common_utils::ext_traits::StringExt; + + use super::PTRunner; + + #[test] + fn test_enum_to_string() { + let string_format = "PAYMENTS_SYNC_WORKFLOW".to_string(); + let enum_format: PTRunner = string_format.parse_enum("PTRunner").unwrap(); + assert_eq!(enum_format, PTRunner::PaymentsSyncWorkflow) + } } diff --git a/crates/router/src/configs/defaults.rs b/crates/router/src/configs/defaults.rs index 1d972d43fd..cebb001406 100644 --- a/crates/router/src/configs/defaults.rs +++ b/crates/router/src/configs/defaults.rs @@ -89,39 +89,6 @@ impl Default for super::settings::EphemeralConfig { } } -impl Default for super::settings::SchedulerSettings { - fn default() -> Self { - Self { - stream: "SCHEDULER_STREAM".into(), - producer: super::settings::ProducerSettings::default(), - consumer: super::settings::ConsumerSettings::default(), - graceful_shutdown_interval: 60000, - loop_interval: 5000, - } - } -} - -impl Default for super::settings::ProducerSettings { - fn default() -> Self { - Self { - upper_fetch_limit: 0, - lower_fetch_limit: 1800, - lock_key: "PRODUCER_LOCKING_KEY".into(), - lock_ttl: 160, - batch_size: 200, - } - } -} - -impl Default for super::settings::ConsumerSettings { - fn default() -> Self { - Self { - disabled: false, - consumer_group: "SCHEDULER_GROUP".into(), - } - } -} - #[cfg(feature = "kv_store")] impl Default for super::settings::DrainerSettings { fn default() -> Self { diff --git a/crates/router/src/configs/settings.rs b/crates/router/src/configs/settings.rs index fd098f5e58..168a8bc3f0 100644 --- a/crates/router/src/configs/settings.rs +++ b/crates/router/src/configs/settings.rs @@ -13,6 +13,7 @@ use external_services::email::EmailSettings; use external_services::kms; use redis_interface::RedisSettings; pub use router_env::config::{Log, LogConsole, LogFile, LogTelemetry}; +use scheduler::SchedulerSettings; use serde::{de::Error, Deserialize, Deserializer}; use crate::{ @@ -558,34 +559,6 @@ pub struct ConnectorParamsWithSecondaryBaseUrl { pub secondary_base_url: String, } -#[derive(Debug, Clone, Deserialize)] -#[serde(default)] -pub struct SchedulerSettings { - pub stream: String, - pub producer: ProducerSettings, - pub consumer: ConsumerSettings, - pub loop_interval: u64, - pub graceful_shutdown_interval: u64, -} - -#[derive(Debug, Clone, Deserialize)] -#[serde(default)] -pub struct ProducerSettings { - pub upper_fetch_limit: i64, - pub lower_fetch_limit: i64, - - pub lock_key: String, - pub lock_ttl: i64, - pub batch_size: usize, -} - -#[derive(Debug, Clone, Deserialize)] -#[serde(default)] -pub struct ConsumerSettings { - pub disabled: bool, - pub consumer_group: String, -} - #[cfg(feature = "kv_store")] #[derive(Debug, Clone, Deserialize)] #[serde(default)] diff --git a/crates/router/src/configs/validations.rs b/crates/router/src/configs/validations.rs index 626591d1a3..702852a1fc 100644 --- a/crates/router/src/configs/validations.rs +++ b/crates/router/src/configs/validations.rs @@ -1,6 +1,5 @@ use common_utils::ext_traits::ConfigExt; - -use crate::core::errors::ApplicationError; +use storage_impl::errors::ApplicationError; impl super::settings::Secrets { pub fn validate(&self) -> Result<(), ApplicationError> { @@ -117,38 +116,6 @@ impl super::settings::SupportedConnectors { } } -impl super::settings::SchedulerSettings { - pub fn validate(&self) -> Result<(), ApplicationError> { - use common_utils::fp_utils::when; - - when(self.stream.is_default_or_empty(), || { - Err(ApplicationError::InvalidConfigurationValueError( - "scheduler stream must not be empty".into(), - )) - })?; - - when(self.consumer.consumer_group.is_default_or_empty(), || { - Err(ApplicationError::InvalidConfigurationValueError( - "scheduler consumer group must not be empty".into(), - )) - })?; - - self.producer.validate()?; - - Ok(()) - } -} - -impl super::settings::ProducerSettings { - pub fn validate(&self) -> Result<(), ApplicationError> { - common_utils::fp_utils::when(self.lock_key.is_default_or_empty(), || { - Err(ApplicationError::InvalidConfigurationValueError( - "producer lock key must not be empty".into(), - )) - }) - } -} - #[cfg(feature = "kv_store")] impl super::settings::DrainerSettings { pub fn validate(&self) -> Result<(), ApplicationError> { diff --git a/crates/router/src/connection.rs b/crates/router/src/connection.rs index 06536572c8..00d6496f1b 100644 --- a/crates/router/src/connection.rs +++ b/crates/router/src/connection.rs @@ -1,6 +1,7 @@ use bb8::PooledConnection; use diesel::PgConnection; use error_stack::{IntoReport, ResultExt}; +use storage_impl::errors as storage_errors; use crate::errors; @@ -25,7 +26,7 @@ pub async fn pg_connection_read( store: &T, ) -> errors::CustomResult< PooledConnection<'_, async_bb8_diesel::ConnectionManager>, - errors::StorageError, + storage_errors::StorageError, > { // If only OLAP is enabled get replica pool. #[cfg(all(feature = "olap", not(feature = "oltp")))] @@ -45,14 +46,14 @@ pub async fn pg_connection_read( pool.get() .await .into_report() - .change_context(errors::StorageError::DatabaseConnectionError) + .change_context(storage_errors::StorageError::DatabaseConnectionError) } pub async fn pg_connection_write( store: &T, ) -> errors::CustomResult< PooledConnection<'_, async_bb8_diesel::ConnectionManager>, - errors::StorageError, + storage_errors::StorageError, > { // Since all writes should happen to master DB only choose master DB. let pool = store.get_master_pool(); @@ -60,5 +61,5 @@ pub async fn pg_connection_write( pool.get() .await .into_report() - .change_context(errors::StorageError::DatabaseConnectionError) + .change_context(storage_errors::StorageError::DatabaseConnectionError) } diff --git a/crates/router/src/connector/utils.rs b/crates/router/src/connector/utils.rs index 90ac405e5c..fb2242e43f 100644 --- a/crates/router/src/connector/utils.rs +++ b/crates/router/src/connector/utils.rs @@ -791,18 +791,18 @@ impl AddressDetailsData for api::AddressDetails { .ok_or_else(missing_field_err("address.city")) } - fn get_line2(&self) -> Result<&Secret, Error> { - self.line2 - .as_ref() - .ok_or_else(missing_field_err("address.line2")) - } - fn get_state(&self) -> Result<&Secret, Error> { self.state .as_ref() .ok_or_else(missing_field_err("address.state")) } + fn get_line2(&self) -> Result<&Secret, Error> { + self.line2 + .as_ref() + .ok_or_else(missing_field_err("address.line2")) + } + fn get_zip(&self) -> Result<&Secret, Error> { self.zip .as_ref() diff --git a/crates/router/src/core/errors.rs b/crates/router/src/core/errors.rs index f429d5319b..35d76247ec 100644 --- a/crates/router/src/core/errors.rs +++ b/crates/router/src/core/errors.rs @@ -5,26 +5,25 @@ pub mod utils; use std::fmt::Display; -use actix_web::{body::BoxBody, http::StatusCode, ResponseError}; -use common_utils::errors::ErrorSwitch; +use actix_web::{body::BoxBody, ResponseError}; pub use common_utils::errors::{CustomResult, ParsingError, ValidationError}; -use config::ConfigError; pub use data_models::errors::StorageError as DataStorageError; use diesel_models::errors as storage_errors; pub use redis_interface::errors::RedisError; -use router_env::opentelemetry::metrics::MetricsError; +use scheduler::errors as sch_errors; +use storage_impl::errors as storage_impl_errors; pub use self::{ api_error_response::ApiErrorResponse, + sch_errors::*, + storage_errors::*, + storage_impl_errors::*, utils::{ConnectorErrorExt, StorageErrorExt}, }; use crate::services; pub type RouterResult = CustomResult; pub type RouterResponse = CustomResult, ApiErrorResponse>; -pub type ApplicationResult = Result; -pub type ApplicationResponse = ApplicationResult>; - macro_rules! impl_error_display { ($st: ident, $arg: tt) => { impl Display for $st { @@ -50,201 +49,14 @@ macro_rules! impl_error_type { }; } -#[derive(Debug, thiserror::Error)] -pub enum StorageError { - #[error("DatabaseError: {0:?}")] - DatabaseError(error_stack::Report), - #[error("ValueNotFound: {0}")] - ValueNotFound(String), - #[error("DuplicateValue: {entity} already exists {key:?}")] - DuplicateValue { - entity: &'static str, - key: Option, - }, - #[error("Timed out while trying to connect to the database")] - DatabaseConnectionError, - #[error("KV error")] - KVError, - #[error("Serialization failure")] - SerializationFailed, - #[error("MockDb error")] - MockDbError, - #[error("Customer with this id is Redacted")] - CustomerRedacted, - #[error("Deserialization failure")] - DeserializationFailed, - #[error("Error while encrypting data")] - EncryptionError, - #[error("Error while decrypting data from database")] - DecryptionError, - #[error("RedisError: {0:?}")] - RedisError(error_stack::Report), -} - -impl ErrorSwitch for StorageError { - fn switch(&self) -> DataStorageError { - self.into() - } -} - -#[allow(clippy::from_over_into)] -impl Into for &StorageError { - fn into(self) -> DataStorageError { - match self { - StorageError::DatabaseError(i) => match i.current_context() { - storage_errors::DatabaseError::DatabaseConnectionError => { - DataStorageError::DatabaseConnectionError - } - // TODO: Update this error type to encompass & propagate the missing type (instead of generic `db value not found`) - storage_errors::DatabaseError::NotFound => { - DataStorageError::ValueNotFound(String::from("db value not found")) - } - // TODO: Update this error type to encompass & propagate the duplicate type (instead of generic `db value not found`) - storage_errors::DatabaseError::UniqueViolation => { - DataStorageError::DuplicateValue { - entity: "db entity", - key: None, - } - } - storage_errors::DatabaseError::NoFieldsToUpdate => { - DataStorageError::DatabaseError("No fields to update".to_string()) - } - storage_errors::DatabaseError::QueryGenerationFailed => { - DataStorageError::DatabaseError("Query generation failed".to_string()) - } - storage_errors::DatabaseError::Others => { - DataStorageError::DatabaseError("Unknown database error".to_string()) - } - }, - StorageError::ValueNotFound(i) => DataStorageError::ValueNotFound(i.clone()), - StorageError::DuplicateValue { entity, key } => DataStorageError::DuplicateValue { - entity, - key: key.clone(), - }, - StorageError::DatabaseConnectionError => DataStorageError::DatabaseConnectionError, - StorageError::KVError => DataStorageError::KVError, - StorageError::SerializationFailed => DataStorageError::SerializationFailed, - StorageError::MockDbError => DataStorageError::MockDbError, - StorageError::CustomerRedacted => DataStorageError::CustomerRedacted, - StorageError::DeserializationFailed => DataStorageError::DeserializationFailed, - StorageError::EncryptionError => DataStorageError::EncryptionError, - StorageError::DecryptionError => DataStorageError::DecryptionError, - StorageError::RedisError(i) => match i.current_context() { - // TODO: Update this error type to encompass & propagate the missing type (instead of generic `redis value not found`) - RedisError::NotFound => { - DataStorageError::ValueNotFound("redis value not found".to_string()) - } - RedisError::JsonSerializationFailed => DataStorageError::SerializationFailed, - RedisError::JsonDeserializationFailed => DataStorageError::DeserializationFailed, - i => DataStorageError::RedisError(format!("{:?}", i)), - }, - } - } -} - -impl From> for StorageError { - fn from(err: error_stack::Report) -> Self { - Self::RedisError(err) - } -} - -impl From> for StorageError { - fn from(err: error_stack::Report) -> Self { - Self::DatabaseError(err) - } -} - -impl StorageError { - pub fn is_db_not_found(&self) -> bool { - match self { - Self::DatabaseError(err) => matches!( - err.current_context(), - storage_errors::DatabaseError::NotFound - ), - Self::ValueNotFound(_) => true, - _ => false, - } - } - - pub fn is_db_unique_violation(&self) -> bool { - match self { - Self::DatabaseError(err) => matches!( - err.current_context(), - storage_errors::DatabaseError::UniqueViolation, - ), - _ => false, - } - } -} - impl_error_type!(EncryptionError, "Encryption error"); -#[derive(Debug, thiserror::Error)] -pub enum ApplicationError { - // Display's impl can be overridden by the attribute error marco. - // Don't use Debug here, Debug gives error stack in response. - #[error("Application configuration error: {0}")] - ConfigurationError(ConfigError), - - #[error("Invalid configuration value provided: {0}")] - InvalidConfigurationValueError(String), - - #[error("Metrics error: {0}")] - MetricsError(MetricsError), - - #[error("I/O: {0}")] - IoError(std::io::Error), - - #[error("Error while constructing api client: {0}")] - ApiClientError(ApiClientError), -} - -impl From for ApplicationError { - fn from(err: MetricsError) -> Self { - Self::MetricsError(err) - } -} - -impl From for ApplicationError { - fn from(err: std::io::Error) -> Self { - Self::IoError(err) - } -} - impl From for EncryptionError { fn from(_: ring::error::Unspecified) -> Self { Self } } -impl From for ApplicationError { - fn from(err: ConfigError) -> Self { - Self::ConfigurationError(err) - } -} - -fn error_response(err: &T) -> actix_web::HttpResponse { - actix_web::HttpResponse::BadRequest() - .content_type(mime::APPLICATION_JSON) - .body(format!(r#"{{ "error": {{ "message": "{err}" }} }}"#)) -} - -impl ResponseError for ApplicationError { - fn status_code(&self) -> StatusCode { - match self { - Self::MetricsError(_) - | Self::IoError(_) - | Self::ConfigurationError(_) - | Self::InvalidConfigurationValueError(_) - | Self::ApiClientError(_) => StatusCode::INTERNAL_SERVER_ERROR, - } - } - - fn error_response(&self) -> actix_web::HttpResponse { - error_response(self) - } -} - pub fn http_not_implemented() -> actix_web::HttpResponse { ApiErrorResponse::NotImplemented { message: api_error_response::NotImplementedMessage::Default, @@ -252,46 +64,6 @@ pub fn http_not_implemented() -> actix_web::HttpResponse { .error_response() } -#[derive(Debug, thiserror::Error, PartialEq, Clone)] -pub enum ApiClientError { - #[error("Header map construction failed")] - HeaderMapConstructionFailed, - #[error("Invalid proxy configuration")] - InvalidProxyConfiguration, - #[error("Client construction failed")] - ClientConstructionFailed, - #[error("Certificate decode failed")] - CertificateDecodeFailed, - #[error("Request body serialization failed")] - BodySerializationFailed, - #[error("Unexpected state reached/Invariants conflicted")] - UnexpectedState, - - #[error("URL encoding of request payload failed")] - UrlEncodingFailed, - #[error("Failed to send request to connector {0}")] - RequestNotSent(String), - #[error("Failed to decode response")] - ResponseDecodingFailed, - - #[error("Server responded with Request Timeout")] - RequestTimeoutReceived, - - #[error("connection closed before a message could complete")] - ConnectionClosed, - - #[error("Server responded with Internal Server Error")] - InternalServerErrorReceived, - #[error("Server responded with Bad Gateway")] - BadGatewayReceived, - #[error("Server responded with Service Unavailable")] - ServiceUnavailableReceived, - #[error("Server responded with Gateway Timeout")] - GatewayTimeoutReceived, - #[error("Server responded with unexpected response")] - UnexpectedServerResponse, -} - #[derive(Debug, thiserror::Error, PartialEq)] pub enum ConnectorError { #[error("Error while obtaining URL for the integration")] @@ -436,97 +208,6 @@ pub enum KmsError { Utf8DecodingFailed, } -#[derive(Debug, thiserror::Error)] -pub enum ProcessTrackerError { - #[error("An unexpected flow was specified")] - UnexpectedFlow, - #[error("Failed to serialize object")] - SerializationFailed, - #[error("Failed to deserialize object")] - DeserializationFailed, - #[error("Missing required field")] - MissingRequiredField, - #[error("Failed to insert process batch into stream")] - BatchInsertionFailed, - #[error("Failed to insert process into stream")] - ProcessInsertionFailed, - #[error("The process batch with the specified details was not found")] - BatchNotFound, - #[error("Failed to update process batch in stream")] - BatchUpdateFailed, - #[error("Failed to delete process batch from stream")] - BatchDeleteFailed, - #[error("An error occurred when trying to read process tracker configuration")] - ConfigurationError, - #[error("Failed to update process in database")] - ProcessUpdateFailed, - #[error("Failed to fetch processes from database")] - ProcessFetchingFailed, - #[error("Failed while fetching: {resource_name}")] - ResourceFetchingFailed { resource_name: &'static str }, - #[error("Failed while executing: {flow}")] - FlowExecutionError { flow: &'static str }, - #[error("Not Implemented")] - NotImplemented, - #[error("Job not found")] - JobNotFound, - #[error("Received Error ApiResponseError: {0}")] - EApiErrorResponse(error_stack::Report), - #[error("Received Error StorageError: {0}")] - EStorageError(error_stack::Report), - #[error("Received Error RedisError: {0}")] - ERedisError(error_stack::Report), - #[error("Received Error ParsingError: {0}")] - EParsingError(error_stack::Report), - #[error("Validation Error Received: {0}")] - EValidationError(error_stack::Report), - #[error("Type Conversion error")] - TypeConversionError, -} - -macro_rules! error_to_process_tracker_error { - ($($path: ident)::+ < $st: ident >, $($path2:ident)::* ($($inner_path2:ident)::+ <$st2:ident>) ) => { - impl From<$($path)::+ <$st>> for ProcessTrackerError { - fn from(err: $($path)::+ <$st> ) -> Self { - $($path2)::*(err) - } - } - }; - - ($($path: ident)::+ <$($inner_path:ident)::+>, $($path2:ident)::* ($($inner_path2:ident)::+ <$st2:ident>) ) => { - impl<'a> From< $($path)::+ <$($inner_path)::+> > for ProcessTrackerError { - fn from(err: $($path)::+ <$($inner_path)::+> ) -> Self { - $($path2)::*(err) - } - } - }; -} - -error_to_process_tracker_error!( - error_stack::Report, - ProcessTrackerError::EApiErrorResponse(error_stack::Report) -); - -error_to_process_tracker_error!( - error_stack::Report, - ProcessTrackerError::EStorageError(error_stack::Report) -); - -error_to_process_tracker_error!( - error_stack::Report, - ProcessTrackerError::ERedisError(error_stack::Report) -); - -error_to_process_tracker_error!( - error_stack::Report, - ProcessTrackerError::EParsingError(error_stack::Report) -); - -error_to_process_tracker_error!( - error_stack::Report, - ProcessTrackerError::EValidationError(error_stack::Report) -); - #[derive(Debug, thiserror::Error)] pub enum WebhooksFlowError { #[error("Merchant webhook config not found")] @@ -569,15 +250,6 @@ pub enum WebhooksFlowError { MissingRequiredField { field_name: &'static str }, } -impl ApiClientError { - pub fn is_upstream_timeout(&self) -> bool { - self == &Self::RequestTimeoutReceived - } - pub fn is_connection_closed(&self) -> bool { - self == &Self::ConnectionClosed - } -} - impl ConnectorError { pub fn is_connector_timeout(&self) -> bool { self == &Self::RequestTimeoutReceived diff --git a/crates/router/src/core/errors/api_error_response.rs b/crates/router/src/core/errors/api_error_response.rs index de07f061c8..bae5d0e00d 100644 --- a/crates/router/src/core/errors/api_error_response.rs +++ b/crates/router/src/core/errors/api_error_response.rs @@ -1,6 +1,7 @@ #![allow(dead_code, unused_variables)] use http::StatusCode; +use scheduler::errors::{PTError, ProcessTrackerError}; #[derive(Clone, Debug, serde::Serialize)] #[serde(rename_all = "snake_case")] @@ -227,6 +228,12 @@ pub enum ApiErrorResponse { CurrencyNotSupported { message: String }, } +impl PTError for ApiErrorResponse { + fn to_pt_error(&self) -> ProcessTrackerError { + ProcessTrackerError::EApiErrorResponse + } +} + #[derive(Clone)] pub enum NotImplementedMessage { Reason(String), diff --git a/crates/router/src/core/payment_methods/cards.rs b/crates/router/src/core/payment_methods/cards.rs index a96e2f1bd4..7155a5b3ac 100644 --- a/crates/router/src/core/payment_methods/cards.rs +++ b/crates/router/src/core/payment_methods/cards.rs @@ -23,8 +23,6 @@ use error_stack::{report, IntoReport, ResultExt}; use masking::Secret; use router_env::{instrument, tracing}; -#[cfg(feature = "basilisk")] -use crate::scheduler::metrics as scheduler_metrics; use crate::{ configs::settings, core::{ @@ -2101,7 +2099,7 @@ impl BasiliskCardSupport { enums::PaymentMethod::Card, ) .await?; - scheduler_metrics::TOKENIZED_DATA_COUNT.add(&metrics::CONTEXT, 1, &[]); + metrics::TOKENIZED_DATA_COUNT.add(&metrics::CONTEXT, 1, &[]); Ok(card) } } diff --git a/crates/router/src/core/payment_methods/vault.rs b/crates/router/src/core/payment_methods/vault.rs index 3f593fcea9..d16269deb9 100644 --- a/crates/router/src/core/payment_methods/vault.rs +++ b/crates/router/src/core/payment_methods/vault.rs @@ -6,6 +6,8 @@ use error_stack::{IntoReport, ResultExt}; use josekit::jwe; use masking::PeekInterface; use router_env::{instrument, tracing}; +#[cfg(feature = "basilisk")] +use scheduler::{types::process_data, utils as process_tracker_utils}; #[cfg(feature = "basilisk")] use crate::routes::metrics; @@ -24,11 +26,8 @@ use crate::{ #[cfg(feature = "basilisk")] use crate::{core::payment_methods::transformers as payment_methods, services, utils::BytesExt}; #[cfg(feature = "basilisk")] -use crate::{ - db, - scheduler::{metrics as scheduler_metrics, process_data, utils as process_tracker_utils}, - types::storage::ProcessTrackerExt, -}; +use crate::{db, types::storage::ProcessTrackerExt}; + #[cfg(feature = "basilisk")] const VAULT_SERVICE_NAME: &str = "CARD"; #[cfg(feature = "basilisk")] @@ -843,7 +842,7 @@ impl Vault { let lookup_key = create_tokenize(state, value1, Some(value2), lookup_key).await?; add_delete_tokenized_data_task(&*state.store, &lookup_key, pm).await?; - scheduler_metrics::TOKENIZED_DATA_COUNT.add(&metrics::CONTEXT, 1, &[]); + metrics::TOKENIZED_DATA_COUNT.add(&metrics::CONTEXT, 1, &[]); Ok(lookup_key) } @@ -1223,20 +1222,20 @@ pub async fn start_tokenize_data_workflow( let id = tokenize_tracker.id.clone(); tokenize_tracker .clone() - .finish_with_status(db, format!("COMPLETED_BY_PT_{id}")) + .finish_with_status(db.as_scheduler(), format!("COMPLETED_BY_PT_{id}")) .await?; } else { logger::error!("Error: Deleting Card From Locker : {:?}", resp); retry_delete_tokenize(db, &delete_tokenize_data.pm, tokenize_tracker.to_owned()) .await?; - scheduler_metrics::RETRIED_DELETE_DATA_COUNT.add(&metrics::CONTEXT, 1, &[]); + metrics::RETRIED_DELETE_DATA_COUNT.add(&metrics::CONTEXT, 1, &[]); } } Err(err) => { logger::error!("Err: Deleting Card From Locker : {:?}", err); retry_delete_tokenize(db, &delete_tokenize_data.pm, tokenize_tracker.to_owned()) .await?; - scheduler_metrics::RETRIED_DELETE_DATA_COUNT.add(&metrics::CONTEXT, 1, &[]); + metrics::RETRIED_DELETE_DATA_COUNT.add(&metrics::CONTEXT, 1, &[]); } } Ok(()) @@ -1275,9 +1274,9 @@ pub async fn retry_delete_tokenize( let schedule_time = get_delete_tokenize_schedule_time(db, pm, pt.retry_count).await; match schedule_time { - Some(s_time) => pt.retry(db, s_time).await, + Some(s_time) => pt.retry(db.as_scheduler(), s_time).await, None => { - pt.finish_with_status(db, "RETRIES_EXCEEDED".to_string()) + pt.finish_with_status(db.as_scheduler(), "RETRIES_EXCEEDED".to_string()) .await } } diff --git a/crates/router/src/core/payments.rs b/crates/router/src/core/payments.rs index a1a21a7910..bb8eb772d1 100644 --- a/crates/router/src/core/payments.rs +++ b/crates/router/src/core/payments.rs @@ -15,6 +15,7 @@ use error_stack::{IntoReport, ResultExt}; use futures::future::join_all; use masking::Secret; use router_env::{instrument, tracing}; +use scheduler::{db::process_tracker::ProcessTrackerExt, errors as sch_errors, utils as pt_utils}; use time; pub use self::operations::{ @@ -33,13 +34,13 @@ use crate::{ db::StorageInterface, logger, routes::{metrics, payment_methods::ParentPaymentMethodToken, AppState}, - scheduler::{utils as pt_utils, workflows::payment_sync}, services::{self, api::Authenticate}, types::{ self as router_types, api, domain, - storage::{self, enums as storage_enums, ProcessTrackerExt}, + storage::{self, enums as storage_enums}, }, utils::{add_connector_http_status_code_metrics, Encode, OptionExt, ValueExt}, + workflows::payment_sync, }; #[instrument(skip_all, fields(payment_id, merchant_id))] @@ -1412,7 +1413,7 @@ pub async fn add_process_sync_task( db: &dyn StorageInterface, payment_attempt: &storage::PaymentAttempt, schedule_time: time::PrimitiveDateTime, -) -> Result<(), errors::ProcessTrackerError> { +) -> Result<(), sch_errors::ProcessTrackerError> { let tracking_data = api::PaymentsRetrieveRequest { force_sync: true, merchant_id: Some(payment_attempt.merchant_id.clone()), @@ -1456,7 +1457,9 @@ pub async fn reset_process_sync_task( .find_process_by_id(&process_tracker_id) .await? .ok_or(errors::ProcessTrackerError::ProcessFetchingFailed)?; - psync_process.reset(db, schedule_time).await?; + psync_process + .reset(db.as_scheduler(), schedule_time) + .await?; Ok(()) } diff --git a/crates/router/src/core/payments/helpers.rs b/crates/router/src/core/payments/helpers.rs index f927adf116..d714c5c7ed 100644 --- a/crates/router/src/core/payments/helpers.rs +++ b/crates/router/src/core/payments/helpers.rs @@ -29,7 +29,6 @@ use crate::{ }, db::StorageInterface, routes::{metrics, payment_methods, AppState}, - scheduler::metrics as scheduler_metrics, services, types::{ api::{self, admin, enums as api_enums, CustomerAcceptanceExt, MandateValidationFieldsExt}, @@ -785,14 +784,14 @@ where match schedule_time { Some(stime) => { if !requeue { - scheduler_metrics::TASKS_ADDED_COUNT.add(&metrics::CONTEXT, 1, &[]); // Metrics + // scheduler_metrics::TASKS_ADDED_COUNT.add(&metrics::CONTEXT, 1, &[]); // Metrics super::add_process_sync_task(&*state.store, payment_attempt, stime) .await .into_report() .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Failed while adding task to process tracker") } else { - scheduler_metrics::TASKS_RESET_COUNT.add(&metrics::CONTEXT, 1, &[]); // Metrics + // scheduler_metrics::TASKS_RESET_COUNT.add(&metrics::CONTEXT, 1, &[]); // Metrics super::reset_process_sync_task(&*state.store, payment_attempt, stime) .await .into_report() diff --git a/crates/router/src/core/refunds.rs b/crates/router/src/core/refunds.rs index 41729ab517..ac755cfc13 100644 --- a/crates/router/src/core/refunds.rs +++ b/crates/router/src/core/refunds.rs @@ -3,6 +3,7 @@ pub mod validator; use common_utils::ext_traits::AsyncExt; use error_stack::{report, IntoReport, ResultExt}; use router_env::{instrument, tracing}; +use scheduler::{consumer::types::process_data, utils as process_tracker_utils}; use crate::{ consts, @@ -13,7 +14,6 @@ use crate::{ }, db, logger, routes::{metrics, AppState}, - scheduler::{process_data, utils as process_tracker_utils, workflows::payment_sync}, services, types::{ self, @@ -23,6 +23,7 @@ use crate::{ transformers::{ForeignFrom, ForeignInto}, }, utils::{self, OptionExt}, + workflows::payment_sync, }; // ********************************************** REFUND EXECUTE ********************************************** @@ -852,7 +853,7 @@ pub async fn sync_refund_with_gateway_workflow( let id = refund_tracker.id.clone(); refund_tracker .clone() - .finish_with_status(&*state.store, format!("COMPLETED_BY_PT_{id}")) + .finish_with_status(state.store.as_scheduler(), format!("COMPLETED_BY_PT_{id}")) .await? } _ => { @@ -967,7 +968,7 @@ pub async fn trigger_refund_execute_workflow( let id = refund_tracker.id.clone(); refund_tracker .clone() - .finish_with_status(db, format!("COMPLETED_BY_PT_{id}")) + .finish_with_status(db.as_scheduler(), format!("COMPLETED_BY_PT_{id}")) .await?; } }; @@ -1106,9 +1107,9 @@ pub async fn retry_refund_sync_task( get_refund_sync_process_schedule_time(db, &connector, &merchant_id, pt.retry_count).await?; match schedule_time { - Some(s_time) => pt.retry(db, s_time).await, + Some(s_time) => pt.retry(db.as_scheduler(), s_time).await, None => { - pt.finish_with_status(db, "RETRIES_EXCEEDED".to_string()) + pt.finish_with_status(db.as_scheduler(), "RETRIES_EXCEEDED".to_string()) .await } } diff --git a/crates/router/src/db.rs b/crates/router/src/db.rs index 15001a54fa..eeeba2b756 100644 --- a/crates/router/src/db.rs +++ b/crates/router/src/db.rs @@ -18,23 +18,17 @@ pub mod merchant_account; pub mod merchant_connector_account; pub mod merchant_key_store; pub mod payment_attempt; -pub mod payment_intent; pub mod payment_method; pub mod payout_attempt; pub mod payouts; -pub mod process_tracker; -pub mod queue; pub mod refund; pub mod reverse_lookup; -use std::sync::Arc; - use data_models::payments::payment_intent::PaymentIntentInterface; -use futures::lock::Mutex; use masking::PeekInterface; -use storage_impl::redis::kv_store::RedisConnInterface; +use storage_impl::{redis::kv_store::RedisConnInterface, MockDb}; -use crate::{services::Store, types::storage}; +use crate::services::Store; #[derive(PartialEq, Eq)] pub enum StorageImpl { @@ -67,10 +61,9 @@ pub trait StorageInterface: + payment_attempt::PaymentAttemptInterface + PaymentIntentInterface + payment_method::PaymentMethodInterface + + scheduler::SchedulerInterface + payout_attempt::PayoutAttemptInterface + payouts::PayoutsInterface - + process_tracker::ProcessTrackerInterface - + queue::QueueInterface + refund::RefundInterface + reverse_lookup::ReverseLookupInterface + cards_info::CardsInfoInterface @@ -80,6 +73,7 @@ pub trait StorageInterface: + business_profile::BusinessProfileInterface + 'static { + fn get_scheduler_db(&self) -> Box; } pub trait MasterKeyInterface { @@ -103,63 +97,18 @@ impl MasterKeyInterface for MockDb { } #[async_trait::async_trait] -impl StorageInterface for Store {} - -#[derive(Clone)] -pub struct MockDb { - addresses: Arc>>, - configs: Arc>>, - merchant_accounts: Arc>>, - merchant_connector_accounts: Arc>>, - payment_attempts: Arc>>, - payment_intents: Arc>>, - payment_methods: Arc>>, - customers: Arc>>, - refunds: Arc>>, - processes: Arc>>, - connector_response: Arc>>, - redis: Arc, - api_keys: Arc>>, - ephemeral_keys: Arc>>, - cards_info: Arc>>, - events: Arc>>, - disputes: Arc>>, - lockers: Arc>>, - mandates: Arc>>, - captures: Arc>>, - merchant_key_store: Arc>>, -} - -impl MockDb { - pub async fn new(redis: &crate::configs::settings::Settings) -> Self { - Self { - addresses: Default::default(), - configs: Default::default(), - merchant_accounts: Default::default(), - merchant_connector_accounts: Default::default(), - payment_attempts: Default::default(), - payment_intents: Default::default(), - payment_methods: Default::default(), - customers: Default::default(), - refunds: Default::default(), - processes: Default::default(), - connector_response: Default::default(), - redis: Arc::new(crate::connection::redis_connection(redis).await), - api_keys: Default::default(), - ephemeral_keys: Default::default(), - cards_info: Default::default(), - events: Default::default(), - disputes: Default::default(), - lockers: Default::default(), - mandates: Default::default(), - captures: Default::default(), - merchant_key_store: Default::default(), - } +impl StorageInterface for Store { + fn get_scheduler_db(&self) -> Box { + Box::new(self.clone()) } } #[async_trait::async_trait] -impl StorageInterface for MockDb {} +impl StorageInterface for MockDb { + fn get_scheduler_db(&self) -> Box { + Box::new(self.clone()) + } +} pub async fn get_and_deserialize_key( db: &dyn StorageInterface, @@ -178,15 +127,4 @@ where .change_context(redis_interface::errors::RedisError::JsonDeserializationFailed) } -impl RedisConnInterface for MockDb { - fn get_redis_conn( - &self, - ) -> Result< - Arc, - error_stack::Report, - > { - Ok(self.redis.clone()) - } -} - dyn_clone::clone_trait_object!(StorageInterface); diff --git a/crates/router/src/db/api_keys.rs b/crates/router/src/db/api_keys.rs index 70f0dc42f9..19d5e73d1c 100644 --- a/crates/router/src/db/api_keys.rs +++ b/crates/router/src/db/api_keys.rs @@ -390,7 +390,7 @@ mod tests { #[allow(clippy::unwrap_used)] #[tokio::test] async fn test_mockdb_api_key_interface() { - let mockdb = MockDb::new(&Default::default()).await; + let mockdb = MockDb::new().await; let key1 = mockdb .insert_api_key(storage::ApiKeyNew { @@ -473,7 +473,7 @@ mod tests { #[allow(clippy::unwrap_used)] #[tokio::test] async fn test_api_keys_cache() { - let db = MockDb::new(&Default::default()).await; + let db = MockDb::new().await; let redis_conn = db.get_redis_conn().unwrap(); redis_conn diff --git a/crates/router/src/db/dispute.rs b/crates/router/src/db/dispute.rs index aa0c92bc77..b6c102693c 100644 --- a/crates/router/src/db/dispute.rs +++ b/crates/router/src/db/dispute.rs @@ -409,7 +409,7 @@ mod tests { #[tokio::test] async fn test_insert_dispute() { - let mockdb = MockDb::new(&Default::default()).await; + let mockdb = MockDb::new().await; let created_dispute = mockdb .insert_dispute(create_dispute_new(DisputeNewIds { @@ -437,7 +437,7 @@ mod tests { #[tokio::test] async fn test_find_by_merchant_id_payment_id_connector_dispute_id() { - let mockdb = MockDb::new(&Default::default()).await; + let mockdb = MockDb::new().await; let created_dispute = mockdb .insert_dispute(create_dispute_new(DisputeNewIds { @@ -477,7 +477,7 @@ mod tests { #[tokio::test] async fn test_find_dispute_by_merchant_id_dispute_id() { - let mockdb = MockDb::new(&Default::default()).await; + let mockdb = MockDb::new().await; let created_dispute = mockdb .insert_dispute(create_dispute_new(DisputeNewIds { @@ -511,7 +511,7 @@ mod tests { #[tokio::test] async fn test_find_disputes_by_merchant_id() { - let mockdb = MockDb::new(&Default::default()).await; + let mockdb = MockDb::new().await; let created_dispute = mockdb .insert_dispute(create_dispute_new(DisputeNewIds { @@ -561,7 +561,7 @@ mod tests { #[tokio::test] async fn test_find_disputes_by_merchant_id_payment_id() { - let mockdb = MockDb::new(&Default::default()).await; + let mockdb = MockDb::new().await; let created_dispute = mockdb .insert_dispute(create_dispute_new(DisputeNewIds { @@ -614,7 +614,7 @@ mod tests { #[tokio::test] async fn test_update_dispute_update() { - let mockdb = MockDb::new(&Default::default()).await; + let mockdb = MockDb::new().await; let created_dispute = mockdb .insert_dispute(create_dispute_new(DisputeNewIds { @@ -691,7 +691,7 @@ mod tests { #[tokio::test] async fn test_update_dispute_update_status() { - let mockdb = MockDb::new(&Default::default()).await; + let mockdb = MockDb::new().await; let created_dispute = mockdb .insert_dispute(create_dispute_new(DisputeNewIds { @@ -763,7 +763,7 @@ mod tests { #[tokio::test] async fn test_update_dispute_update_evidence() { - let mockdb = MockDb::new(&Default::default()).await; + let mockdb = MockDb::new().await; let created_dispute = mockdb .insert_dispute(create_dispute_new(DisputeNewIds { diff --git a/crates/router/src/db/events.rs b/crates/router/src/db/events.rs index 67819a9e92..0e88d53108 100644 --- a/crates/router/src/db/events.rs +++ b/crates/router/src/db/events.rs @@ -108,7 +108,7 @@ mod tests { #[allow(clippy::unwrap_used)] #[tokio::test] async fn test_mockdb_event_interface() { - let mockdb = MockDb::new(&Default::default()).await; + let mockdb = MockDb::new().await; let event1 = mockdb .insert_event(storage::EventNew { diff --git a/crates/router/src/db/locker_mock_up.rs b/crates/router/src/db/locker_mock_up.rs index d476a26094..b47484c610 100644 --- a/crates/router/src/db/locker_mock_up.rs +++ b/crates/router/src/db/locker_mock_up.rs @@ -163,7 +163,7 @@ mod tests { #[tokio::test] async fn find_locker_by_card_id() { - let mockdb = MockDb::new(&Default::default()).await; + let mockdb = MockDb::new().await; let created_locker = mockdb .insert_locker_mock_up(create_locker_mock_up_new(LockerMockUpIds { @@ -191,7 +191,7 @@ mod tests { #[tokio::test] async fn insert_locker_mock_up() { - let mockdb = MockDb::new(&Default::default()).await; + let mockdb = MockDb::new().await; let created_locker = mockdb .insert_locker_mock_up(create_locker_mock_up_new(LockerMockUpIds { @@ -218,7 +218,7 @@ mod tests { #[tokio::test] async fn delete_locker_mock_up() { - let mockdb = MockDb::new(&Default::default()).await; + let mockdb = MockDb::new().await; let created_locker = mockdb .insert_locker_mock_up(create_locker_mock_up_new(LockerMockUpIds { diff --git a/crates/router/src/db/merchant_connector_account.rs b/crates/router/src/db/merchant_connector_account.rs index bfde360c4a..36c2e7c498 100644 --- a/crates/router/src/db/merchant_connector_account.rs +++ b/crates/router/src/db/merchant_connector_account.rs @@ -692,7 +692,7 @@ mod merchant_connector_account_cache_tests { #[allow(clippy::unwrap_used)] #[tokio::test] async fn test_connector_label_cache() { - let db = MockDb::new(&Default::default()).await; + let db = MockDb::new().await; let redis_conn = db.get_redis_conn().unwrap(); let master_key = db.get_master_key(); diff --git a/crates/router/src/db/merchant_key_store.rs b/crates/router/src/db/merchant_key_store.rs index a85b5f5fe2..28d0a71a4b 100644 --- a/crates/router/src/db/merchant_key_store.rs +++ b/crates/router/src/db/merchant_key_store.rs @@ -155,7 +155,7 @@ mod tests { #[allow(clippy::unwrap_used)] #[tokio::test] async fn test_mock_db_merchant_key_store_interface() { - let mock_db = MockDb::new(&Default::default()).await; + let mock_db = MockDb::new().await; let master_key = mock_db.get_master_key(); let merchant_id = "merchant1"; diff --git a/crates/router/src/lib.rs b/crates/router/src/lib.rs index f5093a385b..f96d0efb5c 100644 --- a/crates/router/src/lib.rs +++ b/crates/router/src/lib.rs @@ -13,7 +13,7 @@ pub mod db; pub mod env; pub(crate) mod macros; pub mod routes; -pub mod scheduler; +pub mod workflows; pub mod middleware; pub mod openapi; @@ -28,12 +28,13 @@ use actix_web::{ }; use http::StatusCode; use routes::AppState; +use storage_impl::errors::ApplicationResult; use tokio::sync::{mpsc, oneshot}; pub use self::env::logger; use crate::{ configs::settings, - core::errors::{self, ApplicationResult}, + core::errors::{self}, }; #[cfg(feature = "mimalloc")] diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index 148f957a85..fa87aedaa5 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -3,6 +3,8 @@ use actix_web::{web, Scope}; use external_services::email::{AwsSes, EmailClient}; #[cfg(feature = "kms")] use external_services::kms::{self, decrypt::KmsDecrypt}; +use scheduler::SchedulerInterface; +use storage_impl::MockDb; use tokio::sync::oneshot; #[cfg(feature = "dummy_connector")] @@ -20,7 +22,7 @@ use super::{configs::*, customers::*, mandates::*, payments::*, refunds::*}; use super::{ephemeral_key::*, payment_methods::*, webhooks::*}; use crate::{ configs::settings, - db::{MockDb, StorageImpl, StorageInterface}, + db::{StorageImpl, StorageInterface}, routes::cards_info::card_iin_info, services::get_store, }; @@ -37,6 +39,12 @@ pub struct AppState { pub api_client: Box, } +impl scheduler::SchedulerAppState for AppState { + fn get_db(&self) -> Box { + self.store.get_scheduler_db() + } +} + pub trait AppStateInfo { fn conf(&self) -> settings::Settings; fn flow_name(&self) -> String; @@ -81,7 +89,7 @@ impl AppState { .await .expect("Failed to create store"), ), - StorageImpl::Mock => Box::new(MockDb::new(&conf).await), + StorageImpl::Mock => Box::new(MockDb::new().await), }; #[cfg(feature = "kms")] diff --git a/crates/router/src/routes/metrics.rs b/crates/router/src/routes/metrics.rs index f1e27cffaa..7fc9a77e7e 100644 --- a/crates/router/src/routes/metrics.rs +++ b/crates/router/src/routes/metrics.rs @@ -41,6 +41,8 @@ counter_metric!(DELETE_FROM_LOCKER, GLOBAL_METER); counter_metric!(CREATED_TOKENIZED_CARD, GLOBAL_METER); counter_metric!(DELETED_TOKENIZED_CARD, GLOBAL_METER); counter_metric!(GET_TOKENIZED_CARD, GLOBAL_METER); +counter_metric!(TOKENIZED_DATA_COUNT, GLOBAL_METER); // Tokenized data added +counter_metric!(RETRIED_DELETE_DATA_COUNT, GLOBAL_METER); // Tokenized data retried counter_metric!(CUSTOMER_CREATED, GLOBAL_METER); counter_metric!(CUSTOMER_REDACTED, GLOBAL_METER); diff --git a/crates/router/src/scheduler.rs b/crates/router/src/scheduler.rs deleted file mode 100644 index 5902489718..0000000000 --- a/crates/router/src/scheduler.rs +++ /dev/null @@ -1,46 +0,0 @@ -#![allow(dead_code)] - -pub mod consumer; -pub mod metrics; -pub mod producer; -pub mod types; -pub mod utils; -pub mod workflows; - -use std::sync::Arc; - -use tokio::sync::mpsc; - -pub use self::types::*; -use crate::{ - configs::settings::SchedulerSettings, - core::errors::{self, CustomResult}, - logger::error, - routes::AppState, -}; - -pub async fn start_process_tracker( - state: &AppState, - scheduler_flow: SchedulerFlow, - scheduler_settings: Arc, - channel: (mpsc::Sender<()>, mpsc::Receiver<()>), -) -> CustomResult<(), errors::ProcessTrackerError> { - match scheduler_flow { - SchedulerFlow::Producer => { - producer::start_producer(state, scheduler_settings, channel).await? - } - SchedulerFlow::Consumer => { - consumer::start_consumer( - state, - scheduler_settings, - workflows::runner_from_task, - channel, - ) - .await? - } - SchedulerFlow::Cleaner => { - error!("This flow has not been implemented yet!"); - } - } - Ok(()) -} diff --git a/crates/router/src/scheduler/types.rs b/crates/router/src/scheduler/types.rs deleted file mode 100644 index fa0a752030..0000000000 --- a/crates/router/src/scheduler/types.rs +++ /dev/null @@ -1,13 +0,0 @@ -pub mod batch; -pub mod config; -pub mod flow; -pub mod process_data; -pub mod state; - -pub use self::{ - batch::ProcessTrackerBatch, - config::SchedulerConfig, - flow::SchedulerFlow, - process_data::ProcessData, - state::{DummyWorkflowState, WorkflowState}, -}; diff --git a/crates/router/src/scheduler/types/config.rs b/crates/router/src/scheduler/types/config.rs deleted file mode 100644 index 84bc84c92d..0000000000 --- a/crates/router/src/scheduler/types/config.rs +++ /dev/null @@ -1,18 +0,0 @@ -// Add struct fields as necessary -#[derive(Debug)] -pub struct SchedulerConfig { - raw_body: String, - headers: Vec<(String, String)>, - raw_headers: Vec<(String, String)>, -} - -impl SchedulerConfig { - // Add parameters to new as required - pub fn new(headers: Vec<(String, String)>) -> Self { - Self { - raw_body: String::new(), - headers, - raw_headers: Vec::new(), - } - } -} diff --git a/crates/router/src/scheduler/types/state.rs b/crates/router/src/scheduler/types/state.rs deleted file mode 100644 index 23c9c33451..0000000000 --- a/crates/router/src/scheduler/types/state.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Add enum variants as necessary -#[derive(Debug)] -pub enum WorkflowState { - DummyWorkflowState(DummyWorkflowState), -} - -// Rename struct as necessary, typically based on runner/workflow -// Add fields as necessary -#[derive(Debug, Default)] -pub struct DummyWorkflowState { - order_id: Option, - merchant_id: Option, - acquired_locks: Vec, - flow_name: Option, -} - -impl DummyWorkflowState { - // Implement methods as required for each struct -} diff --git a/crates/router/src/scheduler/workflows.rs b/crates/router/src/scheduler/workflows.rs deleted file mode 100644 index 1b0cc66186..0000000000 --- a/crates/router/src/scheduler/workflows.rs +++ /dev/null @@ -1,108 +0,0 @@ -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; -use strum::EnumString; - -use crate::{ - core::errors, - routes::AppState, - types::storage, - utils::{OptionExt, StringExt}, -}; -#[cfg(feature = "email")] -pub mod api_key_expiry; - -pub mod payment_sync; -pub mod refund_router; -pub mod tokenized_data; - -macro_rules! runners { - ($(#[$attr:meta] $body:tt),*) => { - as_item! { - #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, EnumString)] - #[serde(rename_all = "SCREAMING_SNAKE_CASE")] - #[strum(serialize_all = "SCREAMING_SNAKE_CASE")] - pub enum PTRunner { - $(#[$attr] $body),* - } - } - - $( as_item! { - #[$attr] - pub struct $body; - } )* - - - pub fn runner_from_task(task: &storage::ProcessTracker) -> Result>, errors::ProcessTrackerError> { - let runner = task.runner.clone().get_required_value("runner")?; - let runner: Option = runner.parse_enum("PTRunner").ok(); - Ok(match runner { - $( #[$attr] Some( PTRunner::$body ) => { - Some(Box::new($body)) - } ,)* - None => { - None - } - }) - - } - }; -} -macro_rules! as_item { - ($i:item) => { - $i - }; -} - -runners! { - #[cfg(all())] PaymentsSyncWorkflow, - #[cfg(all())] RefundWorkflowRouter, - #[cfg(all())] DeleteTokenizeDataWorkflow, - #[cfg(feature = "email")] ApiKeyExpiryWorkflow -} - -pub type WorkflowSelectorFn = - fn( - &storage::ProcessTracker, - ) -> Result>, errors::ProcessTrackerError>; - -#[async_trait] -pub trait ProcessTrackerWorkflow: Send + Sync { - // The core execution of the workflow - async fn execute_workflow<'a>( - &'a self, - _state: &'a AppState, - _process: storage::ProcessTracker, - ) -> Result<(), errors::ProcessTrackerError> { - Err(errors::ProcessTrackerError::NotImplemented)? - } - // Callback function after successful execution of the `execute_workflow` - async fn success_handler<'a>( - &'a self, - _state: &'a AppState, - _process: storage::ProcessTracker, - ) { - } - // Callback function after error received from `execute_workflow` - async fn error_handler<'a>( - &'a self, - _state: &'a AppState, - _process: storage::ProcessTracker, - _error: errors::ProcessTrackerError, - ) -> errors::CustomResult<(), errors::ProcessTrackerError> { - Err(errors::ProcessTrackerError::NotImplemented)? - } -} - -#[cfg(test)] -mod workflow_tests { - #![allow(clippy::unwrap_used)] - use super::PTRunner; - use crate::utils::StringExt; - - #[test] - fn test_enum_to_string() { - let string_format = "PAYMENTS_SYNC_WORKFLOW".to_string(); - let enum_format: PTRunner = string_format.parse_enum("PTRunner").unwrap(); - assert_eq!(enum_format, PTRunner::PaymentsSyncWorkflow) - } -} diff --git a/crates/router/src/services.rs b/crates/router/src/services.rs index 3688dfc42d..d216d61642 100644 --- a/crates/router/src/services.rs +++ b/crates/router/src/services.rs @@ -21,9 +21,9 @@ pub use self::{api::*, encryption::*}; use crate::{configs::settings, consts, core::errors}; #[cfg(not(feature = "olap"))] -type StoreType = storage_impl::database::store::Store; +pub type StoreType = storage_impl::database::store::Store; #[cfg(feature = "olap")] -type StoreType = storage_impl::database::store::ReplicaStore; +pub type StoreType = storage_impl::database::store::ReplicaStore; #[cfg(not(feature = "kv_store"))] pub type Store = RouterStore; diff --git a/crates/router/src/types/storage.rs b/crates/router/src/types/storage.rs index d9a5b26f12..37e1c14ae4 100644 --- a/crates/router/src/types/storage.rs +++ b/crates/router/src/types/storage.rs @@ -20,12 +20,14 @@ pub mod merchant_connector_account; pub mod merchant_key_store; pub mod payment_attempt; pub mod payment_method; +pub use diesel_models::{ProcessTracker, ProcessTrackerNew, ProcessTrackerUpdate}; +pub use scheduler::db::process_tracker; +pub mod reverse_lookup; + pub mod payout_attempt; pub mod payouts; -pub mod process_tracker; mod query; pub mod refund; -pub mod reverse_lookup; pub use data_models::payments::payment_intent::{ PaymentIntent, PaymentIntentNew, PaymentIntentUpdate, diff --git a/crates/router/src/types/storage/process_tracker.rs b/crates/router/src/types/storage/process_tracker.rs deleted file mode 100644 index 3c06757551..0000000000 --- a/crates/router/src/types/storage/process_tracker.rs +++ /dev/null @@ -1,133 +0,0 @@ -pub use diesel_models::process_tracker::{ - ProcessData, ProcessTracker, ProcessTrackerNew, ProcessTrackerUpdate, - ProcessTrackerUpdateInternal, SchedulerOptions, -}; -use error_stack::ResultExt; -use serde::Serialize; -use time::PrimitiveDateTime; - -use crate::{ - core::errors, db::StorageInterface, scheduler::metrics, types::storage::enums as storage_enums, -}; - -#[async_trait::async_trait] -pub trait ProcessTrackerExt { - fn is_valid_business_status(&self, valid_statuses: &[&str]) -> bool; - - fn make_process_tracker_new<'a, T>( - process_tracker_id: String, - task: &'a str, - runner: &'a str, - tracking_data: T, - schedule_time: PrimitiveDateTime, - ) -> Result - where - T: Serialize; - - async fn reset( - self, - db: &dyn StorageInterface, - schedule_time: PrimitiveDateTime, - ) -> Result<(), errors::ProcessTrackerError>; - - async fn retry( - self, - db: &dyn StorageInterface, - schedule_time: PrimitiveDateTime, - ) -> Result<(), errors::ProcessTrackerError>; - - async fn finish_with_status( - self, - db: &dyn StorageInterface, - status: String, - ) -> Result<(), errors::ProcessTrackerError>; -} - -#[async_trait::async_trait] -impl ProcessTrackerExt for ProcessTracker { - fn is_valid_business_status(&self, valid_statuses: &[&str]) -> bool { - valid_statuses.iter().any(|x| x == &self.business_status) - } - - fn make_process_tracker_new<'a, T>( - process_tracker_id: String, - task: &'a str, - runner: &'a str, - tracking_data: T, - schedule_time: PrimitiveDateTime, - ) -> Result - where - T: Serialize, - { - let current_time = common_utils::date_time::now(); - Ok(ProcessTrackerNew { - id: process_tracker_id, - name: Some(String::from(task)), - tag: vec![String::from("SYNC"), String::from("PAYMENT")], - runner: Some(String::from(runner)), - retry_count: 0, - schedule_time: Some(schedule_time), - rule: String::new(), - tracking_data: serde_json::to_value(tracking_data) - .map_err(|_| errors::ProcessTrackerError::SerializationFailed)?, - business_status: String::from("Pending"), - status: storage_enums::ProcessTrackerStatus::New, - event: vec![], - created_at: current_time, - updated_at: current_time, - }) - } - - async fn reset( - self, - db: &dyn StorageInterface, - schedule_time: PrimitiveDateTime, - ) -> Result<(), errors::ProcessTrackerError> { - db.update_process_tracker( - self.clone(), - ProcessTrackerUpdate::StatusRetryUpdate { - status: storage_enums::ProcessTrackerStatus::New, - retry_count: 0, - schedule_time, - }, - ) - .await?; - Ok(()) - } - - async fn retry( - self, - db: &dyn StorageInterface, - schedule_time: PrimitiveDateTime, - ) -> Result<(), errors::ProcessTrackerError> { - metrics::TASK_RETRIED.add(&metrics::CONTEXT, 1, &[]); - db.update_process_tracker( - self.clone(), - ProcessTrackerUpdate::StatusRetryUpdate { - status: storage_enums::ProcessTrackerStatus::Pending, - retry_count: self.retry_count + 1, - schedule_time, - }, - ) - .await?; - Ok(()) - } - - async fn finish_with_status( - self, - db: &dyn StorageInterface, - status: String, - ) -> Result<(), errors::ProcessTrackerError> { - db.update_process( - self, - ProcessTrackerUpdate::StatusUpdate { - status: storage_enums::ProcessTrackerStatus::Finish, - business_status: Some(status), - }, - ) - .await - .attach_printable("Failed while updating status of the process")?; - metrics::TASK_FINISHED.add(&metrics::CONTEXT, 1, &[]); - Ok(()) - } -} diff --git a/crates/router/src/utils.rs b/crates/router/src/utils.rs index 4be6764cea..6676f34b75 100644 --- a/crates/router/src/utils.rs +++ b/crates/router/src/utils.rs @@ -23,7 +23,10 @@ use serde::de::DeserializeOwned; use serde_json::Value; use uuid::Uuid; -pub use self::ext_traits::{OptionExt, ValidateCall}; +pub use self::{ + ext_traits::{OptionExt, ValidateCall}, + storage::*, +}; use crate::{ consts, core::errors::{self, CustomResult, RouterResult, StorageErrorExt}, @@ -188,7 +191,7 @@ pub async fn find_payment_intent_from_payment_id_type( db: &dyn StorageInterface, payment_id_type: payments::PaymentIdType, merchant_account: &domain::MerchantAccount, -) -> CustomResult { +) -> CustomResult { match payment_id_type { payments::PaymentIdType::PaymentIntentId(payment_id) => db .find_payment_intent_by_payment_id_merchant_id( @@ -243,7 +246,7 @@ pub async fn find_payment_intent_from_refund_id_type( refund_id_type: webhooks::RefundIdType, merchant_account: &domain::MerchantAccount, connector_name: &str, -) -> CustomResult { +) -> CustomResult { let refund = match refund_id_type { webhooks::RefundIdType::RefundId(id) => db .find_refund_by_merchant_id_refund_id( diff --git a/crates/router/src/workflows.rs b/crates/router/src/workflows.rs new file mode 100644 index 0000000000..b036193bb2 --- /dev/null +++ b/crates/router/src/workflows.rs @@ -0,0 +1,3 @@ +pub mod payment_sync; +pub mod refund_router; +pub mod tokenized_data; diff --git a/crates/router/src/scheduler/workflows/api_key_expiry.rs b/crates/router/src/workflows/api_key_expiry.rs similarity index 100% rename from crates/router/src/scheduler/workflows/api_key_expiry.rs rename to crates/router/src/workflows/api_key_expiry.rs diff --git a/crates/router/src/scheduler/workflows/payment_sync.rs b/crates/router/src/workflows/payment_sync.rs similarity index 81% rename from crates/router/src/scheduler/workflows/payment_sync.rs rename to crates/router/src/workflows/payment_sync.rs index db82019a52..dca2df35ba 100644 --- a/crates/router/src/scheduler/workflows/payment_sync.rs +++ b/crates/router/src/workflows/payment_sync.rs @@ -1,29 +1,33 @@ -use common_utils::ext_traits::StringExt; +use common_utils::ext_traits::{OptionExt, StringExt, ValueExt}; use error_stack::ResultExt; use router_env::logger; +use scheduler::{ + consumer::{self, types::process_data, workflows::ProcessTrackerWorkflow}, + db::process_tracker::ProcessTrackerExt, + errors as sch_errors, utils, SchedulerAppState, +}; -use super::{PaymentsSyncWorkflow, ProcessTrackerWorkflow}; use crate::{ core::payments::{self as payment_flows, operations}, db::StorageInterface, errors, routes::AppState, - scheduler::{consumer, process_data, utils}, services, types::{ api, - storage::{self, enums, ProcessTrackerExt}, + storage::{self, enums}, }, - utils::{OptionExt, ValueExt}, }; +pub struct PaymentsSyncWorkflow; + #[async_trait::async_trait] -impl ProcessTrackerWorkflow for PaymentsSyncWorkflow { +impl ProcessTrackerWorkflow for PaymentsSyncWorkflow { async fn execute_workflow<'a>( &'a self, state: &'a AppState, process: storage::ProcessTracker, - ) -> Result<(), errors::ProcessTrackerError> { + ) -> Result<(), sch_errors::ProcessTrackerError> { let db: &dyn StorageInterface = &*state.store; let tracking_data: api::PaymentsRetrieveRequest = process .tracking_data @@ -75,14 +79,17 @@ impl ProcessTrackerWorkflow for PaymentsSyncWorkflow { status if terminal_status.contains(status) => { let id = process.id.clone(); process - .finish_with_status(db, format!("COMPLETED_BY_PT_{id}")) + .finish_with_status( + state.get_db().as_scheduler(), + format!("COMPLETED_BY_PT_{id}"), + ) .await? } _ => { let connector = payment_data .payment_attempt .connector - .ok_or(errors::ProcessTrackerError::MissingRequiredField)?; + .ok_or(sch_errors::ProcessTrackerError::MissingRequiredField)?; retry_sync_task( db, @@ -100,9 +107,9 @@ impl ProcessTrackerWorkflow for PaymentsSyncWorkflow { &'a self, state: &'a AppState, process: storage::ProcessTracker, - error: errors::ProcessTrackerError, - ) -> errors::CustomResult<(), errors::ProcessTrackerError> { - consumer::consumer_error_handler(state, process, error).await + error: sch_errors::ProcessTrackerError, + ) -> errors::CustomResult<(), sch_errors::ProcessTrackerError> { + consumer::consumer_error_handler(state.store.as_scheduler(), process, error).await } } @@ -141,14 +148,14 @@ pub async fn retry_sync_task( connector: String, merchant_id: String, pt: storage::ProcessTracker, -) -> Result<(), errors::ProcessTrackerError> { +) -> Result<(), sch_errors::ProcessTrackerError> { let schedule_time = get_sync_process_schedule_time(db, &connector, &merchant_id, pt.retry_count).await?; match schedule_time { - Some(s_time) => pt.retry(db, s_time).await, + Some(s_time) => pt.retry(db.as_scheduler(), s_time).await, None => { - pt.finish_with_status(db, "RETRIES_EXCEEDED".to_string()) + pt.finish_with_status(db.as_scheduler(), "RETRIES_EXCEEDED".to_string()) .await } } diff --git a/crates/router/src/scheduler/workflows/refund_router.rs b/crates/router/src/workflows/refund_router.rs similarity index 81% rename from crates/router/src/scheduler/workflows/refund_router.rs rename to crates/router/src/workflows/refund_router.rs index 7eaa171427..8ca3551cfc 100644 --- a/crates/router/src/scheduler/workflows/refund_router.rs +++ b/crates/router/src/workflows/refund_router.rs @@ -1,10 +1,13 @@ -use super::{ProcessTrackerWorkflow, RefundWorkflowRouter}; +use scheduler::consumer::workflows::ProcessTrackerWorkflow; + use crate::{ core::refunds as refund_flow, errors, logger::error, routes::AppState, types::storage, }; +pub struct RefundWorkflowRouter; + #[async_trait::async_trait] -impl ProcessTrackerWorkflow for RefundWorkflowRouter { +impl ProcessTrackerWorkflow for RefundWorkflowRouter { async fn execute_workflow<'a>( &'a self, state: &'a AppState, diff --git a/crates/router/src/scheduler/workflows/tokenized_data.rs b/crates/router/src/workflows/tokenized_data.rs similarity index 85% rename from crates/router/src/scheduler/workflows/tokenized_data.rs rename to crates/router/src/workflows/tokenized_data.rs index 8fdd9b2911..2f5d331732 100644 --- a/crates/router/src/scheduler/workflows/tokenized_data.rs +++ b/crates/router/src/workflows/tokenized_data.rs @@ -1,10 +1,13 @@ -use super::{DeleteTokenizeDataWorkflow, ProcessTrackerWorkflow}; +use scheduler::consumer::workflows::ProcessTrackerWorkflow; + #[cfg(feature = "basilisk")] use crate::core::payment_methods::vault; use crate::{errors, logger::error, routes::AppState, types::storage}; +pub struct DeleteTokenizeDataWorkflow; + #[async_trait::async_trait] -impl ProcessTrackerWorkflow for DeleteTokenizeDataWorkflow { +impl ProcessTrackerWorkflow for DeleteTokenizeDataWorkflow { #[cfg(feature = "basilisk")] async fn execute_workflow<'a>( &'a self, diff --git a/crates/scheduler/Cargo.toml b/crates/scheduler/Cargo.toml new file mode 100644 index 0000000000..3e74903473 --- /dev/null +++ b/crates/scheduler/Cargo.toml @@ -0,0 +1,59 @@ +[package] +name = "scheduler" +version = "0.1.0" +edition = "2021" + +[features] +default = ["kv_store", "olap"] +olap = [] +kv_store = [] + +[dependencies] +async-bb8-diesel = { git = "https://github.com/juspay/async-bb8-diesel", rev = "9a71d142726dbc33f41c1fd935ddaa79841c7be5" } +clap = { version = "4.2.2", default-features = false, features = ["std", "derive", "help", "usage"] } +diesel = { version = "2.0.3", features = ["postgres", "serde_json", "time"] } +error-stack = "0.3.1" +frunk = "0.4.1" +frunk_core = "0.4.1" +futures = "0.3.28" +once_cell = "1.17.1" +serde = "1.0.159" +serde_json = "1.0.91" +strum = { version = "0.24.1", features = ["derive"] } +time = { version = "0.3.20", features = ["serde", "serde-well-known", "std"] } +env_logger = "0.10.0" +rand = "0.8.5" +signal-hook = "0.3.15" +uuid = { version = "1.3.1", features = ["serde", "v4"] } + +# First party crates +api_models = { version = "0.1.0", path = "../api_models", features = ["errors"] } +common_utils = { version = "0.1.0", path = "../common_utils", features = ["signals", "async_ext"] } +cards = { version = "0.1.0", path = "../cards" } +external_services = { version = "0.1.0", path = "../external_services" } +masking = { version = "0.1.0", path = "../masking" } +redis_interface = { version = "0.1.0", path = "../redis_interface" } +router_derive = { version = "0.1.0", path = "../router_derive" } +storage_impl = { version = "0.1.0", path = "../storage_impl" , default-features = false } +router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] } +diesel_models = { version = "0.1.0", path = "../diesel_models", features = ["kv_store"] } +actix-multipart = "0.6.0" +aws-sdk-s3 = { version = "0.25.0", optional = true } +aws-config = {version = "0.55.1", optional = true } +infer = "0.13.0" + +[target.'cfg(not(target_os = "windows"))'.dependencies] +signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"]} + +# Third party crates +actix-rt = "2.8.0" +actix-web = "4.3.1" +thiserror = "1.0.39" +async-trait = "0.1.66" +dyn-clone = "1.0.11" +tokio = { version = "1.26.0", features = ["macros", "rt-multi-thread"] } + + +# [[bin]] +# name = "scheduler" +# path = "src/bin/scheduler.rs" \ No newline at end of file diff --git a/crates/scheduler/src/configs.rs b/crates/scheduler/src/configs.rs new file mode 100644 index 0000000000..a677f6b018 --- /dev/null +++ b/crates/scheduler/src/configs.rs @@ -0,0 +1,3 @@ +pub mod defaults; +pub mod settings; +pub mod validations; diff --git a/crates/scheduler/src/configs/defaults.rs b/crates/scheduler/src/configs/defaults.rs new file mode 100644 index 0000000000..25eb19e24f --- /dev/null +++ b/crates/scheduler/src/configs/defaults.rs @@ -0,0 +1,32 @@ +impl Default for super::settings::SchedulerSettings { + fn default() -> Self { + Self { + stream: "SCHEDULER_STREAM".into(), + producer: super::settings::ProducerSettings::default(), + consumer: super::settings::ConsumerSettings::default(), + graceful_shutdown_interval: 60000, + loop_interval: 5000, + } + } +} + +impl Default for super::settings::ProducerSettings { + fn default() -> Self { + Self { + upper_fetch_limit: 0, + lower_fetch_limit: 1800, + lock_key: "PRODUCER_LOCKING_KEY".into(), + lock_ttl: 160, + batch_size: 200, + } + } +} + +impl Default for super::settings::ConsumerSettings { + fn default() -> Self { + Self { + disabled: false, + consumer_group: "SCHEDULER_GROUP".into(), + } + } +} diff --git a/crates/scheduler/src/configs/settings.rs b/crates/scheduler/src/configs/settings.rs new file mode 100644 index 0000000000..56a9f4079a --- /dev/null +++ b/crates/scheduler/src/configs/settings.rs @@ -0,0 +1,36 @@ +#[cfg(feature = "kms")] +use external_services::kms; +pub use router_env::config::{Log, LogConsole, LogFile, LogTelemetry}; +use serde::Deserialize; +#[cfg(feature = "kms")] +pub type Password = kms::KmsValue; +#[cfg(not(feature = "kms"))] +pub type Password = masking::Secret; + +#[derive(Debug, Clone, Deserialize)] +#[serde(default)] +pub struct SchedulerSettings { + pub stream: String, + pub producer: ProducerSettings, + pub consumer: ConsumerSettings, + pub loop_interval: u64, + pub graceful_shutdown_interval: u64, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(default)] +pub struct ProducerSettings { + pub upper_fetch_limit: i64, + pub lower_fetch_limit: i64, + + pub lock_key: String, + pub lock_ttl: i64, + pub batch_size: usize, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(default)] +pub struct ConsumerSettings { + pub disabled: bool, + pub consumer_group: String, +} diff --git a/crates/scheduler/src/configs/validations.rs b/crates/scheduler/src/configs/validations.rs new file mode 100644 index 0000000000..e9f6621b2a --- /dev/null +++ b/crates/scheduler/src/configs/validations.rs @@ -0,0 +1,34 @@ +use common_utils::ext_traits::ConfigExt; +use storage_impl::errors::ApplicationError; + +impl super::settings::SchedulerSettings { + pub fn validate(&self) -> Result<(), ApplicationError> { + use common_utils::fp_utils::when; + + when(self.stream.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "scheduler stream must not be empty".into(), + )) + })?; + + when(self.consumer.consumer_group.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "scheduler consumer group must not be empty".into(), + )) + })?; + + self.producer.validate()?; + + Ok(()) + } +} + +impl super::settings::ProducerSettings { + pub fn validate(&self) -> Result<(), ApplicationError> { + common_utils::fp_utils::when(self.lock_key.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "producer lock key must not be empty".into(), + )) + }) + } +} diff --git a/crates/router/src/scheduler/consumer.rs b/crates/scheduler/src/consumer.rs similarity index 70% rename from crates/router/src/scheduler/consumer.rs rename to crates/scheduler/src/consumer.rs index 21f10b46d5..2fbc77273e 100644 --- a/crates/router/src/scheduler/consumer.rs +++ b/crates/scheduler/src/consumer.rs @@ -1,11 +1,12 @@ // TODO: Figure out what to log -use std::{ - fmt, - sync::{self, atomic}, -}; +use std::sync::{self, atomic}; +pub mod types; +pub mod workflows; -use common_utils::signals::get_allowed_signals; +use common_utils::{errors::CustomResult, signals::get_allowed_signals}; +use diesel_models::enums; +pub use diesel_models::{self, process_tracker as storage}; use error_stack::{IntoReport, ResultExt}; use futures::future; use redis_interface::{RedisConnectionPool, RedisEntryId}; @@ -14,18 +15,12 @@ use time::PrimitiveDateTime; use tokio::sync::mpsc; use uuid::Uuid; -use super::{ - metrics, - workflows::{self, ProcessTrackerWorkflow}, -}; +use super::env::logger; +pub use super::workflows::ProcessTrackerWorkflow; use crate::{ - configs::settings, - core::errors::{self, CustomResult}, - db::StorageInterface, - logger, - routes::AppState, - scheduler::utils as pt_utils, - types::storage::{self, enums, ProcessTrackerExt}, + configs::settings::SchedulerSettings, + db::process_tracker::{ProcessTrackerExt, ProcessTrackerInterface}, + errors, metrics, utils as pt_utils, SchedulerAppState, SchedulerInterface, }; // Valid consumer business statuses @@ -34,10 +29,10 @@ pub fn valid_business_statuses() -> Vec<&'static str> { } #[instrument(skip_all)] -pub async fn start_consumer( - state: &AppState, - settings: sync::Arc, - workflow_selector: workflows::WorkflowSelectorFn, +pub async fn start_consumer( + state: &T, + settings: sync::Arc, + workflow_selector: impl workflows::ProcessTrackerWorkflows + 'static + Copy + std::fmt::Debug, (tx, mut rx): (mpsc::Sender<()>, mpsc::Receiver<()>), ) -> CustomResult<(), errors::ProcessTrackerError> { use std::time::Duration; @@ -108,17 +103,17 @@ pub async fn start_consumer( } #[instrument(skip_all)] -pub async fn consumer_operations( - state: &AppState, - settings: &settings::SchedulerSettings, - workflow_selector: workflows::WorkflowSelectorFn, +pub async fn consumer_operations( + state: &T, + settings: &SchedulerSettings, + workflow_selector: impl workflows::ProcessTrackerWorkflows + 'static + Copy + std::fmt::Debug, ) -> CustomResult<(), errors::ProcessTrackerError> { let stream_name = settings.stream.clone(); let group_name = settings.consumer.consumer_group.clone(); let consumer_name = format!("consumer_{}", Uuid::new_v4()); let group_created = &mut state - .store + .get_db() .consumer_group_create(&stream_name, &group_name, &RedisEntryId::AfterLastID) .await; if group_created.is_err() { @@ -126,7 +121,8 @@ pub async fn consumer_operations( } let mut tasks = state - .store + .get_db() + .as_scheduler() .fetch_consumer_tasks(&stream_name, &group_name, &consumer_name) .await?; @@ -139,12 +135,12 @@ pub async fn consumer_operations( pt_utils::add_histogram_metrics(&pickup_time, task, &stream_name); metrics::TASK_CONSUMED.add(&metrics::CONTEXT, 1, &[]); - let runner = workflow_selector(task)?.ok_or(errors::ProcessTrackerError::UnexpectedFlow)?; + // let runner = workflow_selector(task)?.ok_or(errors::ProcessTrackerError::UnexpectedFlow)?; handler.push(tokio::task::spawn(start_workflow( state.clone(), task.clone(), pickup_time, - runner, + workflow_selector, ))) } future::join_all(handler).await; @@ -154,7 +150,7 @@ pub async fn consumer_operations( #[instrument(skip(db, redis_conn))] pub async fn fetch_consumer_tasks( - db: &dyn StorageInterface, + db: &dyn ProcessTrackerInterface, redis_conn: &RedisConnectionPool, stream_name: &str, group_name: &str, @@ -194,58 +190,49 @@ pub async fn fetch_consumer_tasks( } // Accept flow_options if required -#[instrument(skip(state, runner), fields(workflow_id))] -pub async fn start_workflow( - state: AppState, +#[instrument(skip(state), fields(workflow_id))] +pub async fn start_workflow( + state: T, process: storage::ProcessTracker, _pickup_time: PrimitiveDateTime, - runner: Box, -) { + workflow_selector: impl workflows::ProcessTrackerWorkflows + 'static + std::fmt::Debug, +) -> Result<(), errors::ProcessTrackerError> +where + T: SchedulerAppState, +{ tracing::Span::current().record("workflow_id", Uuid::new_v4().to_string()); - run_executor(&state, process, runner).await -} - -pub async fn run_executor( - state: &AppState, - process: storage::ProcessTracker, - operation: Box, -) { - let output = operation.execute_workflow(state, process.clone()).await; - match output { - Ok(_) => operation.success_handler(state, process).await, - Err(error) => match operation.error_handler(state, process.clone(), error).await { - Ok(_) => (), - Err(error) => { - logger::error!(%error, "Failed while handling error"); - let status = process - .finish_with_status(&*state.store, "GLOBAL_FAILURE".to_string()) - .await; - if let Err(err) = status { - logger::error!(%err, "Failed while performing database operation: GLOBAL_FAILURE"); - } - } - }, - }; + let res = workflow_selector + .trigger_workflow(&state.clone(), process.clone()) + .await; metrics::TASK_PROCESSED.add(&metrics::CONTEXT, 1, &[]); + res } #[instrument(skip_all)] -pub async fn consumer_error_handler( - state: &AppState, +pub async fn consumer_error_handler( + state: &(dyn SchedulerInterface + 'static), process: storage::ProcessTracker, - error: E, + error: errors::ProcessTrackerError, ) -> CustomResult<(), errors::ProcessTrackerError> { logger::error!(pt.name = ?process.name, pt.id = %process.id, ?error, "ERROR: Failed while executing workflow"); - let db: &dyn StorageInterface = &*state.store; - db.process_tracker_update_process_status_by_ids( - vec![process.id], - storage::ProcessTrackerUpdate::StatusUpdate { - status: enums::ProcessTrackerStatus::Finish, - business_status: Some("GLOBAL_ERROR".to_string()), - }, - ) - .await - .change_context(errors::ProcessTrackerError::ProcessUpdateFailed)?; + state + .process_tracker_update_process_status_by_ids( + vec![process.id], + storage::ProcessTrackerUpdate::StatusUpdate { + status: enums::ProcessTrackerStatus::Finish, + business_status: Some("GLOBAL_ERROR".to_string()), + }, + ) + .await + .change_context(errors::ProcessTrackerError::ProcessUpdateFailed)?; + Ok(()) +} + +pub async fn create_task( + db: &dyn ProcessTrackerInterface, + process_tracker_entry: storage::ProcessTrackerNew, +) -> CustomResult<(), storage_impl::errors::StorageError> { + db.insert_process(process_tracker_entry).await?; Ok(()) } diff --git a/crates/scheduler/src/consumer/types.rs b/crates/scheduler/src/consumer/types.rs new file mode 100644 index 0000000000..b6a66019a9 --- /dev/null +++ b/crates/scheduler/src/consumer/types.rs @@ -0,0 +1,4 @@ +pub mod batch; +pub mod process_data; + +pub use self::batch::ProcessTrackerBatch; diff --git a/crates/router/src/scheduler/types/batch.rs b/crates/scheduler/src/consumer/types/batch.rs similarity index 97% rename from crates/router/src/scheduler/types/batch.rs rename to crates/scheduler/src/consumer/types/batch.rs index 052e3953b8..b9748e1eca 100644 --- a/crates/router/src/scheduler/types/batch.rs +++ b/crates/scheduler/src/consumer/types/batch.rs @@ -1,13 +1,11 @@ use std::collections::HashMap; +use common_utils::{errors::CustomResult, ext_traits::OptionExt}; +use diesel_models::process_tracker::ProcessTracker; use error_stack::{IntoReport, ResultExt}; use time::PrimitiveDateTime; -use crate::{ - core::errors::{self, CustomResult}, - types::storage::process_tracker::ProcessTracker, - utils::OptionExt, -}; +use crate::errors; #[derive(Debug, Clone)] pub struct ProcessTrackerBatch { diff --git a/crates/router/src/scheduler/types/process_data.rs b/crates/scheduler/src/consumer/types/process_data.rs similarity index 87% rename from crates/router/src/scheduler/types/process_data.rs rename to crates/scheduler/src/consumer/types/process_data.rs index 589d54ebfe..d5299493b1 100644 --- a/crates/router/src/scheduler/types/process_data.rs +++ b/crates/scheduler/src/consumer/types/process_data.rs @@ -1,15 +1,8 @@ use std::collections::HashMap; +use diesel_models::enums; use serde::{Deserialize, Serialize}; -use crate::types::storage::{enums, process_tracker::ProcessTracker}; -#[derive(Debug, Clone)] -pub struct ProcessData { - db_name: String, - cache_name: String, - process_tracker: ProcessTracker, -} - #[derive(Serialize, Deserialize, Clone)] pub struct RetryMapping { pub start_after: i32, diff --git a/crates/scheduler/src/consumer/workflows.rs b/crates/scheduler/src/consumer/workflows.rs new file mode 100644 index 0000000000..3b897347be --- /dev/null +++ b/crates/scheduler/src/consumer/workflows.rs @@ -0,0 +1,92 @@ +use async_trait::async_trait; +use common_utils::errors::CustomResult; +pub use diesel_models::process_tracker as storage; + +use crate::{db::process_tracker::ProcessTrackerExt, errors, SchedulerAppState}; + +pub type WorkflowSelectorFn = + fn(&storage::ProcessTracker) -> Result<(), errors::ProcessTrackerError>; + +#[async_trait] +pub trait ProcessTrackerWorkflows: Send + Sync { + // The core execution of the workflow + async fn trigger_workflow<'a>( + &'a self, + _state: &'a T, + _process: storage::ProcessTracker, + ) -> Result<(), errors::ProcessTrackerError> { + Err(errors::ProcessTrackerError::NotImplemented)? + } + async fn execute_workflow<'a>( + &'a self, + operation: Box>, + state: &'a T, + process: storage::ProcessTracker, + ) -> Result<(), errors::ProcessTrackerError> + where + T: SchedulerAppState, + { + let app_state = &state.clone(); + let output = operation.execute_workflow(app_state, process.clone()).await; + match output { + Ok(_) => operation.success_handler(app_state, process).await, + Err(error) => match operation + .error_handler(app_state, process.clone(), error) + .await + { + Ok(_) => (), + Err(_error) => { + // logger::error!(%error, "Failed while handling error"); + let status = process + .finish_with_status( + state.get_db().as_scheduler(), + "GLOBAL_FAILURE".to_string(), + ) + .await; + if let Err(_err) = status { + // logger::error!(%err, "Failed while performing database operation: GLOBAL_FAILURE"); + } + } + }, + }; + Ok(()) + } +} + +#[async_trait] +pub trait ProcessTrackerWorkflow: Send + Sync { + // The core execution of the workflow + async fn execute_workflow<'a>( + &'a self, + _state: &'a T, + _process: storage::ProcessTracker, + ) -> Result<(), errors::ProcessTrackerError> { + Err(errors::ProcessTrackerError::NotImplemented)? + } + // Callback function after successful execution of the `execute_workflow` + async fn success_handler<'a>(&'a self, _state: &'a T, _process: storage::ProcessTracker) {} + // Callback function after error received from `execute_workflow` + async fn error_handler<'a>( + &'a self, + _state: &'a T, + _process: storage::ProcessTracker, + _error: errors::ProcessTrackerError, + ) -> CustomResult<(), errors::ProcessTrackerError> { + Err(errors::ProcessTrackerError::NotImplemented)? + } +} + +// #[cfg(test)] +// mod workflow_tests { +// #![allow(clippy::unwrap_used)] +// use common_utils::ext_traits::StringExt; + +// use super::PTRunner; + +// #[test] +// fn test_enum_to_string() { +// let string_format = "PAYMENTS_SYNC_WORKFLOW".to_string(); +// let enum_format: PTRunner = string_format.parse_enum("PTRunner").unwrap(); +// assert_eq!(enum_format, PTRunner::PaymentsSyncWorkflow) +// } +// } diff --git a/crates/scheduler/src/db/mod.rs b/crates/scheduler/src/db/mod.rs new file mode 100644 index 0000000000..6b6917e627 --- /dev/null +++ b/crates/scheduler/src/db/mod.rs @@ -0,0 +1,2 @@ +pub mod process_tracker; +pub mod queue; diff --git a/crates/router/src/db/process_tracker.rs b/crates/scheduler/src/db/process_tracker.rs similarity index 62% rename from crates/router/src/db/process_tracker.rs rename to crates/scheduler/src/db/process_tracker.rs index f142586185..9cabdf7797 100644 --- a/crates/router/src/db/process_tracker.rs +++ b/crates/scheduler/src/db/process_tracker.rs @@ -1,15 +1,15 @@ -use error_stack::IntoReport; +use common_utils::errors::CustomResult; +pub use diesel_models as storage; +use diesel_models::enums as storage_enums; +use error_stack::{IntoReport, ResultExt}; +use serde::Serialize; +use storage_impl::{connection, errors, MockDb}; use time::PrimitiveDateTime; -use super::{MockDb, Store}; -use crate::{ - connection, - core::errors::{self, CustomResult}, - types::storage::{self, enums}, -}; +use crate::{errors as sch_errors, metrics, scheduler::Store, SchedulerInterface}; #[async_trait::async_trait] -pub trait ProcessTrackerInterface { +pub trait ProcessTrackerInterface: Send + Sync + 'static { async fn reinitialize_limbo_processes( &self, ids: Vec, @@ -47,7 +47,7 @@ pub trait ProcessTrackerInterface { &self, time_lower_limit: PrimitiveDateTime, time_upper_limit: PrimitiveDateTime, - status: enums::ProcessTrackerStatus, + status: storage_enums::ProcessTrackerStatus, limit: Option, ) -> CustomResult, errors::StorageError>; } @@ -81,7 +81,7 @@ impl ProcessTrackerInterface for Store { &self, time_lower_limit: PrimitiveDateTime, time_upper_limit: PrimitiveDateTime, - status: enums::ProcessTrackerStatus, + status: storage_enums::ProcessTrackerStatus, limit: Option, ) -> CustomResult, errors::StorageError> { let conn = connection::pg_connection_read(self).await?; @@ -175,7 +175,7 @@ impl ProcessTrackerInterface for MockDb { &self, _time_lower_limit: PrimitiveDateTime, _time_upper_limit: PrimitiveDateTime, - _status: enums::ProcessTrackerStatus, + _status: storage_enums::ProcessTrackerStatus, _limit: Option, ) -> CustomResult, errors::StorageError> { // [#172]: Implement function for `MockDb` @@ -233,3 +233,125 @@ impl ProcessTrackerInterface for MockDb { Err(errors::StorageError::MockDbError)? } } + +#[async_trait::async_trait] +pub trait ProcessTrackerExt { + fn is_valid_business_status(&self, valid_statuses: &[&str]) -> bool; + + fn make_process_tracker_new<'a, T>( + process_tracker_id: String, + task: &'a str, + runner: &'a str, + tracking_data: T, + schedule_time: PrimitiveDateTime, + ) -> Result + where + T: Serialize; + + async fn reset( + self, + db: &dyn SchedulerInterface, + schedule_time: PrimitiveDateTime, + ) -> Result<(), sch_errors::ProcessTrackerError>; + + async fn retry( + self, + db: &dyn SchedulerInterface, + schedule_time: PrimitiveDateTime, + ) -> Result<(), sch_errors::ProcessTrackerError>; + + async fn finish_with_status( + self, + db: &dyn SchedulerInterface, + status: String, + ) -> Result<(), sch_errors::ProcessTrackerError>; +} + +#[async_trait::async_trait] +impl ProcessTrackerExt for storage::ProcessTracker { + fn is_valid_business_status(&self, valid_statuses: &[&str]) -> bool { + valid_statuses.iter().any(|x| x == &self.business_status) + } + + fn make_process_tracker_new<'a, T>( + process_tracker_id: String, + task: &'a str, + runner: &'a str, + tracking_data: T, + schedule_time: PrimitiveDateTime, + ) -> Result + where + T: Serialize, + { + let current_time = common_utils::date_time::now(); + Ok(storage::ProcessTrackerNew { + id: process_tracker_id, + name: Some(String::from(task)), + tag: vec![String::from("SYNC"), String::from("PAYMENT")], + runner: Some(String::from(runner)), + retry_count: 0, + schedule_time: Some(schedule_time), + rule: String::new(), + tracking_data: serde_json::to_value(tracking_data) + .map_err(|_| sch_errors::ProcessTrackerError::SerializationFailed)?, + business_status: String::from("Pending"), + status: storage_enums::ProcessTrackerStatus::New, + event: vec![], + created_at: current_time, + updated_at: current_time, + }) + } + + async fn reset( + self, + db: &dyn SchedulerInterface, + schedule_time: PrimitiveDateTime, + ) -> Result<(), sch_errors::ProcessTrackerError> { + db.update_process_tracker( + self.clone(), + storage::ProcessTrackerUpdate::StatusRetryUpdate { + status: storage_enums::ProcessTrackerStatus::New, + retry_count: 0, + schedule_time, + }, + ) + .await?; + Ok(()) + } + + async fn retry( + self, + db: &dyn SchedulerInterface, + schedule_time: PrimitiveDateTime, + ) -> Result<(), sch_errors::ProcessTrackerError> { + metrics::TASK_RETRIED.add(&metrics::CONTEXT, 1, &[]); + db.update_process_tracker( + self.clone(), + storage::ProcessTrackerUpdate::StatusRetryUpdate { + status: storage_enums::ProcessTrackerStatus::Pending, + retry_count: self.retry_count + 1, + schedule_time, + }, + ) + .await?; + Ok(()) + } + + async fn finish_with_status( + self, + db: &dyn SchedulerInterface, + status: String, + ) -> Result<(), sch_errors::ProcessTrackerError> { + db.update_process( + self, + storage::ProcessTrackerUpdate::StatusUpdate { + status: storage_enums::ProcessTrackerStatus::Finish, + business_status: Some(status), + }, + ) + .await + .attach_printable("Failed while updating status of the process")?; + metrics::TASK_FINISHED.add(&metrics::CONTEXT, 1, &[]); + Ok(()) + } +} diff --git a/crates/router/src/db/queue.rs b/crates/scheduler/src/db/queue.rs similarity index 93% rename from crates/router/src/db/queue.rs rename to crates/scheduler/src/db/queue.rs index 17dc81b928..870b2b8179 100644 --- a/crates/router/src/db/queue.rs +++ b/crates/scheduler/src/db/queue.rs @@ -1,12 +1,10 @@ +use common_utils::errors::CustomResult; +use diesel_models::process_tracker as storage; use redis_interface::{errors::RedisError, RedisEntryId, SetnxReply}; use router_env::logger; -use storage_impl::redis::kv_store::RedisConnInterface; +use storage_impl::{redis::kv_store::RedisConnInterface, MockDb}; -use super::{MockDb, Store}; -use crate::{ - core::errors::{CustomResult, ProcessTrackerError}, - types::storage, -}; +use crate::{errors::ProcessTrackerError, scheduler::Store}; #[async_trait::async_trait] pub trait QueueInterface { @@ -52,7 +50,7 @@ impl QueueInterface for Store { group_name: &str, consumer_name: &str, ) -> CustomResult, ProcessTrackerError> { - crate::scheduler::consumer::fetch_consumer_tasks( + crate::consumer::fetch_consumer_tasks( self, &self .get_redis_conn() @@ -186,7 +184,7 @@ impl QueueInterface for MockDb { Err(RedisError::StreamAppendFailed)? } - async fn get_key(&self, key: &str) -> CustomResult, RedisError> { - self.redis.get_key(key).await + async fn get_key(&self, _key: &str) -> CustomResult, RedisError> { + Err(RedisError::RedisConnectionError.into()) } } diff --git a/crates/scheduler/src/env.rs b/crates/scheduler/src/env.rs new file mode 100644 index 0000000000..b23b0ab267 --- /dev/null +++ b/crates/scheduler/src/env.rs @@ -0,0 +1,2 @@ +#[doc(inline)] +pub use router_env::*; diff --git a/crates/scheduler/src/errors.rs b/crates/scheduler/src/errors.rs new file mode 100644 index 0000000000..481fae0793 --- /dev/null +++ b/crates/scheduler/src/errors.rs @@ -0,0 +1,113 @@ +pub use common_utils::errors::{ParsingError, ValidationError}; +pub use redis_interface::errors::RedisError; +pub use storage_impl::errors::ApplicationError; +use storage_impl::errors::StorageError; + +use crate::env::logger::{self, error}; + +#[derive(Debug, thiserror::Error)] +pub enum ProcessTrackerError { + #[error("An unexpected flow was specified")] + UnexpectedFlow, + #[error("Failed to serialize object")] + SerializationFailed, + #[error("Failed to deserialize object")] + DeserializationFailed, + #[error("Missing required field")] + MissingRequiredField, + #[error("Failed to insert process batch into stream")] + BatchInsertionFailed, + #[error("Failed to insert process into stream")] + ProcessInsertionFailed, + #[error("The process batch with the specified details was not found")] + BatchNotFound, + #[error("Failed to update process batch in stream")] + BatchUpdateFailed, + #[error("Failed to delete process batch from stream")] + BatchDeleteFailed, + #[error("An error occurred when trying to read process tracker configuration")] + ConfigurationError, + #[error("Failed to update process in database")] + ProcessUpdateFailed, + #[error("Failed to fetch processes from database")] + ProcessFetchingFailed, + #[error("Failed while fetching: {resource_name}")] + ResourceFetchingFailed { resource_name: &'static str }, + #[error("Failed while executing: {flow}")] + FlowExecutionError { flow: &'static str }, + #[error("Not Implemented")] + NotImplemented, + #[error("Job not found")] + JobNotFound, + #[error("Received Error ApiResponseError")] + EApiErrorResponse, + #[error("Received Error ClientError")] + EClientError, + #[error("Received Error StorageError: {0}")] + EStorageError(error_stack::Report), + #[error("Received Error RedisError: {0}")] + ERedisError(error_stack::Report), + #[error("Received Error ParsingError: {0}")] + EParsingError(error_stack::Report), + #[error("Validation Error Received: {0}")] + EValidationError(error_stack::Report), + #[error("Type Conversion error")] + TypeConversionError, +} + +#[macro_export] +macro_rules! error_to_process_tracker_error { + ($($path: ident)::+ < $st: ident >, $($path2:ident)::* ($($inner_path2:ident)::+ <$st2:ident>) ) => { + impl From<$($path)::+ <$st>> for ProcessTrackerError { + fn from(err: $($path)::+ <$st> ) -> Self { + $($path2)::*(err) + } + } + }; + + ($($path: ident)::+ <$($inner_path:ident)::+>, $($path2:ident)::* ($($inner_path2:ident)::+ <$st2:ident>) ) => { + impl<'a> From< $($path)::+ <$($inner_path)::+> > for ProcessTrackerError { + fn from(err: $($path)::+ <$($inner_path)::+> ) -> Self { + $($path2)::*(err) + } + } + }; +} +pub trait PTError: Send + Sync + 'static { + fn to_pt_error(&self) -> ProcessTrackerError; +} + +impl From for ProcessTrackerError { + fn from(value: T) -> Self { + value.to_pt_error() + } +} + +impl From> + for ProcessTrackerError +{ + fn from(error: error_stack::Report) -> Self { + logger::error!(error=%error.current_context()); + error.current_context().to_pt_error() + } +} + +error_to_process_tracker_error!( + error_stack::Report, + ProcessTrackerError::EStorageError(error_stack::Report) +); + +error_to_process_tracker_error!( + error_stack::Report, + ProcessTrackerError::ERedisError(error_stack::Report) +); + +error_to_process_tracker_error!( + error_stack::Report, + ProcessTrackerError::EParsingError(error_stack::Report) +); + +error_to_process_tracker_error!( + error_stack::Report, + ProcessTrackerError::EValidationError(error_stack::Report) +); diff --git a/crates/router/src/scheduler/types/flow.rs b/crates/scheduler/src/flow.rs similarity index 100% rename from crates/router/src/scheduler/types/flow.rs rename to crates/scheduler/src/flow.rs diff --git a/crates/scheduler/src/lib.rs b/crates/scheduler/src/lib.rs new file mode 100644 index 0000000000..e9603bfdc4 --- /dev/null +++ b/crates/scheduler/src/lib.rs @@ -0,0 +1,13 @@ +pub mod configs; +pub mod consumer; +pub mod db; +pub mod env; +pub mod errors; +pub mod flow; +pub mod metrics; +pub mod producer; +pub mod scheduler; +pub mod settings; +pub mod utils; + +pub use self::{consumer::types, flow::*, scheduler::*}; diff --git a/crates/router/src/scheduler/metrics.rs b/crates/scheduler/src/metrics.rs similarity index 86% rename from crates/router/src/scheduler/metrics.rs rename to crates/scheduler/src/metrics.rs index 7337e3ab4f..134f5599b3 100644 --- a/crates/router/src/scheduler/metrics.rs +++ b/crates/scheduler/src/metrics.rs @@ -15,5 +15,3 @@ counter_metric!(TASK_CONSUMED, PT_METER); // Tasks consumed by consumer counter_metric!(TASK_PROCESSED, PT_METER); // Tasks completed processing counter_metric!(TASK_FINISHED, PT_METER); // Tasks finished counter_metric!(TASK_RETRIED, PT_METER); // Tasks added for retries -counter_metric!(TOKENIZED_DATA_COUNT, PT_METER); // Tokenized data added -counter_metric!(RETRIED_DELETE_DATA_COUNT, PT_METER); // Tokenized data retried diff --git a/crates/router/src/scheduler/producer.rs b/crates/scheduler/src/producer.rs similarity index 82% rename from crates/router/src/scheduler/producer.rs rename to crates/scheduler/src/producer.rs index 811156607d..13bdfe98ad 100644 --- a/crates/router/src/scheduler/producer.rs +++ b/crates/scheduler/src/producer.rs @@ -1,27 +1,30 @@ use std::sync::Arc; +use common_utils::errors::CustomResult; +use diesel_models::enums::ProcessTrackerStatus; use error_stack::{report, IntoReport, ResultExt}; use router_env::{instrument, tracing}; use time::Duration; use tokio::sync::mpsc; -use super::metrics; +use super::{ + env::logger::{self, debug, error, warn}, + metrics, +}; use crate::{ - configs::settings::SchedulerSettings, - core::errors::{self, CustomResult}, - db::StorageInterface, - logger::{self, debug, error, warn}, - routes::AppState, - scheduler::{utils::*, SchedulerFlow}, - types::storage::{self, enums::ProcessTrackerStatus}, + configs::settings::SchedulerSettings, errors, flow::SchedulerFlow, + scheduler::SchedulerInterface, utils::*, SchedulerAppState, }; #[instrument(skip_all)] -pub async fn start_producer( - state: &AppState, +pub async fn start_producer( + state: &T, scheduler_settings: Arc, (tx, mut rx): (mpsc::Sender<()>, mpsc::Receiver<()>), -) -> CustomResult<(), errors::ProcessTrackerError> { +) -> CustomResult<(), errors::ProcessTrackerError> +where + T: SchedulerAppState, +{ use rand::Rng; let timeout = rand::thread_rng().gen_range(0..=scheduler_settings.loop_interval); tokio::time::sleep(std::time::Duration::from_millis(timeout)).await; @@ -77,17 +80,26 @@ pub async fn start_producer( } #[instrument(skip_all)] -pub async fn run_producer_flow( - state: &AppState, +pub async fn run_producer_flow( + state: &T, settings: &SchedulerSettings, -) -> CustomResult<(), errors::ProcessTrackerError> { - lock_acquire_release::<_, _>(state, settings, move || async { - let tasks = fetch_producer_tasks(&*state.store, settings).await?; +) -> CustomResult<(), errors::ProcessTrackerError> +where + T: SchedulerAppState, +{ + lock_acquire_release::<_, _, _>(state.get_db().as_scheduler(), settings, move || async { + let tasks = fetch_producer_tasks(state.get_db().as_scheduler(), settings).await?; debug!("Producer count of tasks {}", tasks.len()); // [#268]: Allow task based segregation of tasks - divide_and_append_tasks(state, SchedulerFlow::Producer, tasks, settings).await?; + divide_and_append_tasks( + state.get_db().as_scheduler(), + SchedulerFlow::Producer, + tasks, + settings, + ) + .await?; Ok(()) }) @@ -98,7 +110,7 @@ pub async fn run_producer_flow( #[instrument(skip_all)] pub async fn fetch_producer_tasks( - db: &dyn StorageInterface, + db: &dyn SchedulerInterface, conf: &SchedulerSettings, ) -> CustomResult, errors::ProcessTrackerError> { let upper = conf.producer.upper_fetch_limit; diff --git a/crates/scheduler/src/scheduler.rs b/crates/scheduler/src/scheduler.rs new file mode 100644 index 0000000000..15757c6e77 --- /dev/null +++ b/crates/scheduler/src/scheduler.rs @@ -0,0 +1,77 @@ +use std::sync::Arc; + +use common_utils::errors::CustomResult; +#[cfg(feature = "kv_store")] +use storage_impl::KVRouterStore; +use storage_impl::MockDb; +#[cfg(not(feature = "kv_store"))] +use storage_impl::RouterStore; +use tokio::sync::mpsc; + +use super::env::logger::error; +pub use crate::{ + configs::settings::SchedulerSettings, + consumer::{self, workflows}, + db::{process_tracker::ProcessTrackerInterface, queue::QueueInterface}, + errors, + flow::SchedulerFlow, + producer, +}; + +#[cfg(not(feature = "olap"))] +type StoreType = storage_impl::database::store::Store; +#[cfg(feature = "olap")] +type StoreType = storage_impl::database::store::ReplicaStore; + +#[cfg(not(feature = "kv_store"))] +pub type Store = RouterStore; +#[cfg(feature = "kv_store")] +pub type Store = KVRouterStore; + +pub trait AsSchedulerInterface { + fn as_scheduler(&self) -> &dyn SchedulerInterface; +} + +impl AsSchedulerInterface for T { + fn as_scheduler(&self) -> &dyn SchedulerInterface { + self + } +} + +#[async_trait::async_trait] +pub trait SchedulerInterface: + ProcessTrackerInterface + QueueInterface + AsSchedulerInterface +{ +} + +#[async_trait::async_trait] +impl SchedulerInterface for Store {} + +#[async_trait::async_trait] +impl SchedulerInterface for MockDb {} + +#[async_trait::async_trait] +pub trait SchedulerAppState: Send + Sync + Clone { + fn get_db(&self) -> Box; +} + +pub async fn start_process_tracker( + state: &T, + scheduler_flow: SchedulerFlow, + scheduler_settings: Arc, + channel: (mpsc::Sender<()>, mpsc::Receiver<()>), + runner_from_task: impl workflows::ProcessTrackerWorkflows + 'static + Copy + std::fmt::Debug, +) -> CustomResult<(), errors::ProcessTrackerError> { + match scheduler_flow { + SchedulerFlow::Producer => { + producer::start_producer(state, scheduler_settings, channel).await? + } + SchedulerFlow::Consumer => { + consumer::start_consumer(state, scheduler_settings, runner_from_task, channel).await? + } + SchedulerFlow::Cleaner => { + error!("This flow has not been implemented yet!"); + } + } + Ok(()) +} diff --git a/crates/scheduler/src/settings.rs b/crates/scheduler/src/settings.rs new file mode 100644 index 0000000000..d7d3a9885c --- /dev/null +++ b/crates/scheduler/src/settings.rs @@ -0,0 +1,86 @@ +use common_utils::ext_traits::ConfigExt; +use serde::Deserialize; +use storage_impl::errors::ApplicationError; + +pub use crate::configs::settings::SchedulerSettings; + +#[derive(Debug, Clone, Deserialize)] +pub struct ProducerSettings { + pub upper_fetch_limit: i64, + pub lower_fetch_limit: i64, + + pub lock_key: String, + pub lock_ttl: i64, + pub batch_size: usize, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct ConsumerSettings { + pub disabled: bool, + pub consumer_group: String, +} + +#[cfg(feature = "kv_store")] +#[derive(Debug, Clone, Deserialize)] +pub struct DrainerSettings { + pub stream_name: String, + pub num_partitions: u8, + pub max_read_count: u64, + pub shutdown_interval: u32, // in milliseconds + pub loop_interval: u32, // in milliseconds +} + +impl ProducerSettings { + pub fn validate(&self) -> Result<(), ApplicationError> { + common_utils::fp_utils::when(self.lock_key.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "producer lock key must not be empty".into(), + )) + }) + } +} + +#[cfg(feature = "kv_store")] +impl DrainerSettings { + pub fn validate(&self) -> Result<(), ApplicationError> { + common_utils::fp_utils::when(self.stream_name.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "drainer stream name must not be empty".into(), + )) + }) + } +} + +impl Default for ProducerSettings { + fn default() -> Self { + Self { + upper_fetch_limit: 0, + lower_fetch_limit: 1800, + lock_key: "PRODUCER_LOCKING_KEY".into(), + lock_ttl: 160, + batch_size: 200, + } + } +} + +impl Default for ConsumerSettings { + fn default() -> Self { + Self { + disabled: false, + consumer_group: "SCHEDULER_GROUP".into(), + } + } +} + +#[cfg(feature = "kv_store")] +impl Default for DrainerSettings { + fn default() -> Self { + Self { + stream_name: "DRAINER_STREAM".into(), + num_partitions: 64, + max_read_count: 100, + shutdown_interval: 1000, + loop_interval: 500, + } + } +} diff --git a/crates/router/src/scheduler/utils.rs b/crates/scheduler/src/utils.rs similarity index 87% rename from crates/router/src/scheduler/utils.rs rename to crates/scheduler/src/utils.rs index a58b02561a..d8b9b0fc46 100644 --- a/crates/router/src/scheduler/utils.rs +++ b/crates/scheduler/src/utils.rs @@ -3,33 +3,32 @@ use std::{ time as std_time, }; +use common_utils::errors::CustomResult; +use diesel_models::enums::{self, ProcessTrackerStatus}; +pub use diesel_models::process_tracker as storage; use error_stack::{report, ResultExt}; -#[cfg(not(target_os = "windows"))] -use futures::StreamExt; use redis_interface::{RedisConnectionPool, RedisEntryId}; use router_env::opentelemetry; -use tokio::sync::oneshot; use uuid::Uuid; -use super::{consumer, metrics, process_data, workflows}; +use super::{ + consumer::{self, types::process_data, workflows}, + env::logger, +}; use crate::{ - configs::settings::SchedulerSettings, - core::errors::{self, CustomResult}, - logger, - routes::AppState, - scheduler::{ProcessTrackerBatch, SchedulerFlow}, - types::storage::{ - self, - enums::{self, ProcessTrackerStatus}, - }, + configs::settings::SchedulerSettings, consumer::types::ProcessTrackerBatch, errors, + flow::SchedulerFlow, metrics, SchedulerAppState, SchedulerInterface, }; -pub async fn divide_and_append_tasks( - state: &AppState, +pub async fn divide_and_append_tasks( + state: &T, flow: SchedulerFlow, tasks: Vec, settings: &SchedulerSettings, -) -> CustomResult<(), errors::ProcessTrackerError> { +) -> CustomResult<(), errors::ProcessTrackerError> +where + T: SchedulerInterface + Send + Sync + ?Sized, +{ let batches = divide(tasks, settings); // Safety: Assuming we won't deal with more than `u64::MAX` batches at once #[allow(clippy::as_conversions)] @@ -44,11 +43,14 @@ pub async fn divide_and_append_tasks( Ok(()) } -pub async fn update_status_and_append( - state: &AppState, +pub async fn update_status_and_append( + state: &T, flow: SchedulerFlow, pt_batch: ProcessTrackerBatch, -) -> CustomResult<(), errors::ProcessTrackerError> { +) -> CustomResult<(), errors::ProcessTrackerError> +where + T: SchedulerInterface + Send + Sync + ?Sized, +{ let process_ids: Vec = pt_batch .trackers .iter() @@ -57,7 +59,6 @@ pub async fn update_status_and_append( match flow { SchedulerFlow::Producer => { state - .store .process_tracker_update_process_status_by_ids( process_ids, storage::ProcessTrackerUpdate::StatusUpdate { @@ -75,7 +76,6 @@ pub async fn update_status_and_append( } SchedulerFlow::Cleaner => { let res = state - .store .reinitialize_limbo_processes(process_ids, common_utils::date_time::now()) .await; match res { @@ -99,7 +99,6 @@ pub async fn update_status_and_append( let field_value_pairs = pt_batch.to_redis_field_value_pairs()?; match state - .store .stream_append_entry( &pt_batch.stream_name, &RedisEntryId::AutoGeneratedID, @@ -111,7 +110,6 @@ pub async fn update_status_and_append( Ok(x) => Ok(x), Err(mut err) => { match state - .store .process_tracker_update_process_status_by_ids( pt_batch.trackers.iter().map(|process| process.id.clone()).collect(), storage::ProcessTrackerUpdate::StatusUpdate { @@ -243,15 +241,16 @@ pub fn get_time_from_delta(delta: Option) -> Option( - state: AppState, +pub async fn consumer_operation_handler( + state: T, settings: sync::Arc, error_handler_fun: E, consumer_operation_counter: sync::Arc, - workflow_selector: workflows::WorkflowSelectorFn, + workflow_selector: impl workflows::ProcessTrackerWorkflows + 'static + Copy + std::fmt::Debug, ) where // Error handler function E: FnOnce(error_stack::Report), + T: SchedulerAppState, { consumer_operation_counter.fetch_add(1, atomic::Ordering::Release); let start_time = std_time::Instant::now(); @@ -346,13 +345,14 @@ fn get_delay<'a>( } } -pub(crate) async fn lock_acquire_release( - state: &AppState, +pub(crate) async fn lock_acquire_release( + state: &T, settings: &SchedulerSettings, callback: F, ) -> CustomResult<(), errors::ProcessTrackerError> where F: Fn() -> Fut, + T: SchedulerInterface + Send + Sync + ?Sized, Fut: futures::Future>, { let tag = "PRODUCER_LOCK"; @@ -361,7 +361,6 @@ where let ttl = settings.producer.lock_ttl; if state - .store .acquire_pt_lock(tag, lock_key, lock_val, ttl) .await .change_context(errors::ProcessTrackerError::ERedisError( @@ -370,7 +369,6 @@ where { let result = callback().await; state - .store .release_pt_lock(tag, lock_key) .await .map_err(errors::ProcessTrackerError::ERedisError)?; @@ -380,32 +378,6 @@ where } } -#[cfg(not(target_os = "windows"))] -pub(crate) async fn signal_handler( - mut sig: signal_hook_tokio::Signals, - sender: oneshot::Sender<()>, -) { - if let Some(signal) = sig.next().await { - logger::info!( - "Received signal: {:?}", - signal_hook::low_level::signal_name(signal) - ); - match signal { - signal_hook::consts::SIGTERM | signal_hook::consts::SIGINT => match sender.send(()) { - Ok(_) => { - logger::info!("Request for force shutdown received") - } - Err(_) => { - logger::error!( - "The receiver is closed, a termination call might already be sent" - ) - } - }, - _ => {} - } - } -} - #[cfg(target_os = "windows")] pub(crate) async fn signal_handler( _sig: common_utils::signals::DummySignal, diff --git a/crates/storage_impl/Cargo.toml b/crates/storage_impl/Cargo.toml index 5daf1690a7..778277523a 100644 --- a/crates/storage_impl/Cargo.toml +++ b/crates/storage_impl/Cargo.toml @@ -9,6 +9,7 @@ license.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] +kms = ["external_services/kms"] default = ["olap", "oltp"] oltp = ["data_models/oltp"] olap = ["data_models/olap"] @@ -22,16 +23,25 @@ data_models = { version = "0.1.0", path = "../data_models", default-features = f masking = { version = "0.1.0", path = "../masking" } redis_interface = { version = "0.1.0", path = "../redis_interface" } router_env = { version = "0.1.0", path = "../router_env" } +external_services = { version = "0.1.0", path = "../external_services" } # Third party crates +actix-web = "4.3.1" async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "be3d9bce50051d8c0e0c06078e8066cc27db3001" } async-trait = "0.1.72" bb8 = "0.8.1" +bytes = "1.4.0" +config = { version = "0.13.3", features = ["toml"] } crc32fast = "1.3.2" futures = "0.3.28" diesel = { version = "2.1.0", default-features = false, features = ["postgres"] } dyn-clone = "1.0.12" error-stack = "0.3.1" +http = "0.2.9" +mime = "0.3.17" moka = { version = "0.11.3", features = ["future"] } once_cell = "1.18.0" -tokio = { version = "1.28.2", features = ["rt-multi-thread"] } \ No newline at end of file +ring = "0.16.20" +serde = { version = "1.0.163", features = ["derive"] } +thiserror = "1.0.40" +tokio = { version = "1.28.2", features = ["rt-multi-thread"] } diff --git a/crates/storage_impl/src/connection.rs b/crates/storage_impl/src/connection.rs new file mode 100644 index 0000000000..057e7762fd --- /dev/null +++ b/crates/storage_impl/src/connection.rs @@ -0,0 +1,59 @@ +use bb8::PooledConnection; +use common_utils::errors; +use diesel::PgConnection; +use error_stack::{IntoReport, ResultExt}; + +pub type PgPool = bb8::Pool>; + +pub type PgPooledConn = async_bb8_diesel::Connection; + +#[allow(clippy::expect_used)] +pub async fn redis_connection( + redis: &redis_interface::RedisSettings, +) -> redis_interface::RedisConnectionPool { + redis_interface::RedisConnectionPool::new(redis) + .await + .expect("Failed to create Redis Connection Pool") +} + +pub async fn pg_connection_read( + store: &T, +) -> errors::CustomResult< + PooledConnection<'_, async_bb8_diesel::ConnectionManager>, + crate::errors::StorageError, +> { + // If only OLAP is enabled get replica pool. + #[cfg(all(feature = "olap", not(feature = "oltp")))] + let pool = store.get_replica_pool(); + + // If either one of these are true we need to get master pool. + // 1. Only OLTP is enabled. + // 2. Both OLAP and OLTP is enabled. + // 3. Both OLAP and OLTP is disabled. + #[cfg(any( + all(not(feature = "olap"), feature = "oltp"), + all(feature = "olap", feature = "oltp"), + all(not(feature = "olap"), not(feature = "oltp")) + ))] + let pool = store.get_master_pool(); + + pool.get() + .await + .into_report() + .change_context(crate::errors::StorageError::DatabaseConnectionError) +} + +pub async fn pg_connection_write( + store: &T, +) -> errors::CustomResult< + PooledConnection<'_, async_bb8_diesel::ConnectionManager>, + crate::errors::StorageError, +> { + // Since all writes should happen to master DB only choose master DB. + let pool = store.get_master_pool(); + + pool.get() + .await + .into_report() + .change_context(crate::errors::StorageError::DatabaseConnectionError) +} diff --git a/crates/storage_impl/src/errors.rs b/crates/storage_impl/src/errors.rs new file mode 100644 index 0000000000..bc68986cb8 --- /dev/null +++ b/crates/storage_impl/src/errors.rs @@ -0,0 +1,366 @@ +use std::fmt::Display; + +use actix_web::ResponseError; +use common_utils::errors::ErrorSwitch; +use config::ConfigError; +use data_models::errors::StorageError as DataStorageError; +use http::StatusCode; +pub use redis_interface::errors::RedisError; +use router_env::opentelemetry::metrics::MetricsError; + +use crate::{errors as storage_errors, store::errors::DatabaseError}; + +pub type ApplicationResult = Result; + +macro_rules! impl_error_display { + ($st: ident, $arg: tt) => { + impl Display for $st { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + fmt, + "{{ error_type: {:?}, error_description: {} }}", + self, $arg + ) + } + } + }; +} +macro_rules! impl_error_type { + ($name: ident, $arg: tt) => { + #[derive(Debug)] + pub struct $name; + + impl_error_display!($name, $arg); + + impl std::error::Error for $name {} + }; +} + +#[derive(Debug, thiserror::Error)] +pub enum StorageError { + #[error("DatabaseError: {0:?}")] + DatabaseError(error_stack::Report), + #[error("ValueNotFound: {0}")] + ValueNotFound(String), + #[error("DuplicateValue: {entity} already exists {key:?}")] + DuplicateValue { + entity: &'static str, + key: Option, + }, + #[error("Timed out while trying to connect to the database")] + DatabaseConnectionError, + #[error("KV error")] + KVError, + #[error("Serialization failure")] + SerializationFailed, + #[error("MockDb error")] + MockDbError, + #[error("Customer with this id is Redacted")] + CustomerRedacted, + #[error("Deserialization failure")] + DeserializationFailed, + #[error("Error while encrypting data")] + EncryptionError, + #[error("Error while decrypting data from database")] + DecryptionError, + #[error("RedisError: {0:?}")] + RedisError(error_stack::Report), +} + +impl ErrorSwitch for StorageError { + fn switch(&self) -> DataStorageError { + self.into() + } +} + +#[allow(clippy::from_over_into)] +impl Into for &StorageError { + fn into(self) -> DataStorageError { + match self { + StorageError::DatabaseError(i) => match i.current_context() { + storage_errors::DatabaseError::DatabaseConnectionError => { + DataStorageError::DatabaseConnectionError + } + // TODO: Update this error type to encompass & propagate the missing type (instead of generic `db value not found`) + storage_errors::DatabaseError::NotFound => { + DataStorageError::ValueNotFound(String::from("db value not found")) + } + // TODO: Update this error type to encompass & propagate the duplicate type (instead of generic `db value not found`) + storage_errors::DatabaseError::UniqueViolation => { + DataStorageError::DuplicateValue { + entity: "db entity", + key: None, + } + } + storage_errors::DatabaseError::NoFieldsToUpdate => { + DataStorageError::DatabaseError("No fields to update".to_string()) + } + storage_errors::DatabaseError::QueryGenerationFailed => { + DataStorageError::DatabaseError("Query generation failed".to_string()) + } + storage_errors::DatabaseError::Others => { + DataStorageError::DatabaseError("Unknown database error".to_string()) + } + }, + StorageError::ValueNotFound(i) => DataStorageError::ValueNotFound(i.clone()), + StorageError::DuplicateValue { entity, key } => DataStorageError::DuplicateValue { + entity, + key: key.clone(), + }, + StorageError::DatabaseConnectionError => DataStorageError::DatabaseConnectionError, + StorageError::KVError => DataStorageError::KVError, + StorageError::SerializationFailed => DataStorageError::SerializationFailed, + StorageError::MockDbError => DataStorageError::MockDbError, + StorageError::CustomerRedacted => DataStorageError::CustomerRedacted, + StorageError::DeserializationFailed => DataStorageError::DeserializationFailed, + StorageError::EncryptionError => DataStorageError::EncryptionError, + StorageError::DecryptionError => DataStorageError::DecryptionError, + StorageError::RedisError(i) => match i.current_context() { + // TODO: Update this error type to encompass & propagate the missing type (instead of generic `redis value not found`) + RedisError::NotFound => { + DataStorageError::ValueNotFound("redis value not found".to_string()) + } + RedisError::JsonSerializationFailed => DataStorageError::SerializationFailed, + RedisError::JsonDeserializationFailed => DataStorageError::DeserializationFailed, + i => DataStorageError::RedisError(format!("{:?}", i)), + }, + } + } +} + +impl From> for StorageError { + fn from(err: error_stack::Report) -> Self { + Self::RedisError(err) + } +} + +impl From> for StorageError { + fn from(err: error_stack::Report) -> Self { + Self::DatabaseError(err) + } +} + +impl StorageError { + pub fn is_db_not_found(&self) -> bool { + match self { + Self::DatabaseError(err) => matches!(err.current_context(), DatabaseError::NotFound), + _ => false, + } + } + + pub fn is_db_unique_violation(&self) -> bool { + match self { + Self::DatabaseError(err) => { + matches!(err.current_context(), DatabaseError::UniqueViolation,) + } + _ => false, + } + } +} + +impl_error_type!(EncryptionError, "Encryption error"); + +#[derive(Debug, thiserror::Error)] +pub enum ApplicationError { + // Display's impl can be overridden by the attribute error marco. + // Don't use Debug here, Debug gives error stack in response. + #[error("Application configuration error: {0}")] + ConfigurationError(ConfigError), + + #[error("Invalid configuration value provided: {0}")] + InvalidConfigurationValueError(String), + + #[error("Metrics error: {0}")] + MetricsError(MetricsError), + + #[error("I/O: {0}")] + IoError(std::io::Error), + + #[error("Error while constructing api client: {0}")] + ApiClientError(ApiClientError), +} + +impl From for ApplicationError { + fn from(err: MetricsError) -> Self { + Self::MetricsError(err) + } +} + +impl From for ApplicationError { + fn from(err: std::io::Error) -> Self { + Self::IoError(err) + } +} + +impl From for EncryptionError { + fn from(_: ring::error::Unspecified) -> Self { + Self + } +} + +impl From for ApplicationError { + fn from(err: ConfigError) -> Self { + Self::ConfigurationError(err) + } +} + +fn error_response(err: &T) -> actix_web::HttpResponse { + actix_web::HttpResponse::BadRequest() + .content_type(mime::APPLICATION_JSON) + .body(format!(r#"{{ "error": {{ "message": "{err}" }} }}"#)) +} + +impl ResponseError for ApplicationError { + fn status_code(&self) -> StatusCode { + match self { + Self::MetricsError(_) + | Self::IoError(_) + | Self::ConfigurationError(_) + | Self::InvalidConfigurationValueError(_) + | Self::ApiClientError(_) => StatusCode::INTERNAL_SERVER_ERROR, + } + } + + fn error_response(&self) -> actix_web::HttpResponse { + error_response(self) + } +} + +#[derive(Debug, thiserror::Error, PartialEq, Clone)] +pub enum ApiClientError { + #[error("Header map construction failed")] + HeaderMapConstructionFailed, + #[error("Invalid proxy configuration")] + InvalidProxyConfiguration, + #[error("Client construction failed")] + ClientConstructionFailed, + #[error("Certificate decode failed")] + CertificateDecodeFailed, + #[error("Request body serialization failed")] + BodySerializationFailed, + #[error("Unexpected state reached/Invariants conflicted")] + UnexpectedState, + + #[error("URL encoding of request payload failed")] + UrlEncodingFailed, + #[error("Failed to send request to connector {0}")] + RequestNotSent(String), + #[error("Failed to decode response")] + ResponseDecodingFailed, + + #[error("Server responded with Request Timeout")] + RequestTimeoutReceived, + + #[error("connection closed before a message could complete")] + ConnectionClosed, + + #[error("Server responded with Internal Server Error")] + InternalServerErrorReceived, + #[error("Server responded with Bad Gateway")] + BadGatewayReceived, + #[error("Server responded with Service Unavailable")] + ServiceUnavailableReceived, + #[error("Server responded with Gateway Timeout")] + GatewayTimeoutReceived, + #[error("Server responded with unexpected response")] + UnexpectedServerResponse, +} + +impl ApiClientError { + pub fn is_upstream_timeout(&self) -> bool { + self == &Self::RequestTimeoutReceived + } + pub fn is_connection_closed(&self) -> bool { + self == &Self::ConnectionClosed + } +} + +#[derive(Debug, thiserror::Error, PartialEq)] +pub enum ConnectorError { + #[error("Error while obtaining URL for the integration")] + FailedToObtainIntegrationUrl, + #[error("Failed to encode connector request")] + RequestEncodingFailed, + #[error("Request encoding failed : {0}")] + RequestEncodingFailedWithReason(String), + #[error("Parsing failed")] + ParsingFailed, + #[error("Failed to deserialize connector response")] + ResponseDeserializationFailed, + #[error("Failed to execute a processing step: {0:?}")] + ProcessingStepFailed(Option), + #[error("The connector returned an unexpected response: {0:?}")] + UnexpectedResponseError(bytes::Bytes), + #[error("Failed to parse custom routing rules from merchant account")] + RoutingRulesParsingError, + #[error("Failed to obtain preferred connector from merchant account")] + FailedToObtainPreferredConnector, + #[error("An invalid connector name was provided")] + InvalidConnectorName, + #[error("An invalid Wallet was used")] + InvalidWallet, + #[error("Failed to handle connector response")] + ResponseHandlingFailed, + #[error("Missing required field: {field_name}")] + MissingRequiredField { field_name: &'static str }, + #[error("Missing required fields: {field_names:?}")] + MissingRequiredFields { field_names: Vec<&'static str> }, + #[error("Failed to obtain authentication type")] + FailedToObtainAuthType, + #[error("Failed to obtain certificate")] + FailedToObtainCertificate, + #[error("Connector meta data not found")] + NoConnectorMetaData, + #[error("Failed to obtain certificate key")] + FailedToObtainCertificateKey, + #[error("This step has not been implemented for: {0}")] + NotImplemented(String), + #[error("{message} is not supported by {connector}")] + NotSupported { + message: String, + connector: &'static str, + payment_experience: String, + }, + #[error("{flow} flow not supported by {connector} connector")] + FlowNotSupported { flow: String, connector: String }, + #[error("Capture method not supported")] + CaptureMethodNotSupported, + #[error("Missing connector transaction ID")] + MissingConnectorTransactionID, + #[error("Missing connector refund ID")] + MissingConnectorRefundID, + #[error("Webhooks not implemented for this connector")] + WebhooksNotImplemented, + #[error("Failed to decode webhook event body")] + WebhookBodyDecodingFailed, + #[error("Signature not found for incoming webhook")] + WebhookSignatureNotFound, + #[error("Failed to verify webhook source")] + WebhookSourceVerificationFailed, + #[error("Could not find merchant secret in DB for incoming webhook source verification")] + WebhookVerificationSecretNotFound, + #[error("Incoming webhook object reference ID not found")] + WebhookReferenceIdNotFound, + #[error("Incoming webhook event type not found")] + WebhookEventTypeNotFound, + #[error("Incoming webhook event resource object not found")] + WebhookResourceObjectNotFound, + #[error("Could not respond to the incoming webhook event")] + WebhookResponseEncodingFailed, + #[error("Invalid Date/time format")] + InvalidDateFormat, + #[error("Date Formatting Failed")] + DateFormattingFailed, + #[error("Invalid Data format")] + InvalidDataFormat { field_name: &'static str }, + #[error("Payment Method data / Payment Method Type / Payment Experience Mismatch ")] + MismatchedPaymentData, + #[error("Failed to parse Wallet token")] + InvalidWalletToken, + #[error("Missing Connector Related Transaction ID")] + MissingConnectorRelatedTransactionID { id: String }, + #[error("File Validation failed")] + FileValidationFailed { reason: String }, + #[error("Missing 3DS redirection payload: {field_name}")] + MissingConnectorRedirectionPayload { field_name: &'static str }, +} diff --git a/crates/storage_impl/src/lib.rs b/crates/storage_impl/src/lib.rs index 818c17e004..39ea480a78 100644 --- a/crates/storage_impl/src/lib.rs +++ b/crates/storage_impl/src/lib.rs @@ -1,11 +1,19 @@ use std::sync::Arc; -use data_models::errors::{StorageError, StorageResult}; +use common_utils::errors::CustomResult; +use data_models::{ + errors::{StorageError, StorageResult}, + payments::payment_intent::PaymentIntent, +}; +use diesel_models::{self as store}; use error_stack::ResultExt; +use futures::lock::Mutex; use masking::StrongSecret; use redis::{kv_store::RedisConnInterface, RedisStore}; pub mod config; +pub mod connection; pub mod database; +pub mod errors; pub mod metrics; pub mod payments; pub mod redis; @@ -206,6 +214,58 @@ impl KVRouterStore { } } +#[derive(Clone)] +pub struct MockDb { + pub addresses: Arc>>, + pub configs: Arc>>, + pub merchant_accounts: Arc>>, + pub merchant_connector_accounts: Arc>>, + pub payment_attempts: Arc>>, + pub payment_intents: Arc>>, + pub payment_methods: Arc>>, + pub customers: Arc>>, + pub refunds: Arc>>, + pub processes: Arc>>, + pub connector_response: Arc>>, + // pub redis: Arc, + pub api_keys: Arc>>, + pub ephemeral_keys: Arc>>, + pub cards_info: Arc>>, + pub events: Arc>>, + pub disputes: Arc>>, + pub lockers: Arc>>, + pub mandates: Arc>>, + pub captures: Arc>>, + pub merchant_key_store: Arc>>, +} + +impl MockDb { + pub async fn new() -> Self { + Self { + addresses: Default::default(), + configs: Default::default(), + merchant_accounts: Default::default(), + merchant_connector_accounts: Default::default(), + payment_attempts: Default::default(), + payment_intents: Default::default(), + payment_methods: Default::default(), + customers: Default::default(), + refunds: Default::default(), + processes: Default::default(), + connector_response: Default::default(), + // redis: Arc::new(crate::connection::redis_connection(&redis).await), + api_keys: Default::default(), + ephemeral_keys: Default::default(), + cards_info: Default::default(), + events: Default::default(), + disputes: Default::default(), + lockers: Default::default(), + mandates: Default::default(), + captures: Default::default(), + merchant_key_store: Default::default(), + } + } +} // TODO: This should not be used beyond this crate // Remove the pub modified once StorageScheme usage is completed pub trait DataModelExt { @@ -232,6 +292,14 @@ impl DataModelExt for data_models::MerchantStorageScheme { } } +impl RedisConnInterface for MockDb { + fn get_redis_conn( + &self, + ) -> Result, error_stack::Report> { + Err(RedisError::RedisConnectionError.into()) + } +} + pub(crate) fn diesel_error_to_data_error( diesel_error: &diesel_models::errors::DatabaseError, ) -> StorageError { diff --git a/crates/storage_impl/src/payments/payment_intent.rs b/crates/storage_impl/src/payments/payment_intent.rs index f30f74e6db..5b10d3f0a5 100644 --- a/crates/storage_impl/src/payments/payment_intent.rs +++ b/crates/storage_impl/src/payments/payment_intent.rs @@ -36,7 +36,7 @@ use router_env::logger; use crate::{ redis::kv_store::{PartitionKey, RedisConnInterface}, utils::{pg_connection_read, pg_connection_write}, - DataModelExt, DatabaseStore, KVRouterStore, + CustomResult, DataModelExt, DatabaseStore, KVRouterStore, MockDb, }; #[async_trait::async_trait] @@ -662,6 +662,137 @@ impl PaymentIntentInterface for crate::RouterStore { } } +#[async_trait::async_trait] +impl PaymentIntentInterface for MockDb { + #[cfg(feature = "olap")] + async fn filter_payment_intent_by_constraints( + &self, + _merchant_id: &str, + _filters: &PaymentIntentFetchConstraints, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult, StorageError> { + // [#172]: Implement function for `MockDb` + Err(StorageError::MockDbError)? + } + #[cfg(feature = "olap")] + async fn filter_payment_intents_by_time_range_constraints( + &self, + _merchant_id: &str, + _time_range: &api_models::payments::TimeRange, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult, StorageError> { + // [#172]: Implement function for `MockDb` + Err(StorageError::MockDbError)? + } + #[cfg(feature = "olap")] + async fn get_filtered_active_attempt_ids_for_total_count( + &self, + _merchant_id: &str, + _constraints: &PaymentIntentFetchConstraints, + _storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result, StorageError> { + // [#172]: Implement function for `MockDb` + Err(StorageError::MockDbError)? + } + #[cfg(feature = "olap")] + async fn get_filtered_payment_intents_attempt( + &self, + _merchant_id: &str, + _constraints: &PaymentIntentFetchConstraints, + _storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result, StorageError> { + // [#172]: Implement function for `MockDb` + Err(StorageError::MockDbError)? + } + + #[allow(clippy::panic)] + async fn insert_payment_intent( + &self, + new: PaymentIntentNew, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let mut payment_intents = self.payment_intents.lock().await; + let time = common_utils::date_time::now(); + let payment_intent = PaymentIntent { + #[allow(clippy::as_conversions)] + id: payment_intents + .len() + .try_into() + .into_report() + .change_context(StorageError::MockDbError)?, + payment_id: new.payment_id, + merchant_id: new.merchant_id, + status: new.status, + amount: new.amount, + currency: new.currency, + amount_captured: new.amount_captured, + customer_id: new.customer_id, + description: new.description, + return_url: new.return_url, + metadata: new.metadata, + connector_id: new.connector_id, + shipping_address_id: new.shipping_address_id, + billing_address_id: new.billing_address_id, + statement_descriptor_name: new.statement_descriptor_name, + statement_descriptor_suffix: new.statement_descriptor_suffix, + created_at: new.created_at.unwrap_or(time), + modified_at: new.modified_at.unwrap_or(time), + last_synced: new.last_synced, + setup_future_usage: new.setup_future_usage, + off_session: new.off_session, + client_secret: new.client_secret, + business_country: new.business_country, + business_label: new.business_label, + active_attempt_id: new.active_attempt_id.to_owned(), + order_details: new.order_details, + allowed_payment_method_types: new.allowed_payment_method_types, + connector_metadata: new.connector_metadata, + feature_metadata: new.feature_metadata, + attempt_count: new.attempt_count, + profile_id: new.profile_id, + merchant_decision: new.merchant_decision, + }; + payment_intents.push(payment_intent.clone()); + Ok(payment_intent) + } + + // safety: only used for testing + #[allow(clippy::unwrap_used)] + async fn update_payment_intent( + &self, + this: PaymentIntent, + update: PaymentIntentUpdate, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let mut payment_intents = self.payment_intents.lock().await; + let payment_intent = payment_intents + .iter_mut() + .find(|item| item.id == this.id) + .unwrap(); + *payment_intent = update.apply_changeset(this); + Ok(payment_intent.clone()) + } + + // safety: only used for testing + #[allow(clippy::unwrap_used)] + async fn find_payment_intent_by_payment_id_merchant_id( + &self, + payment_id: &str, + merchant_id: &str, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let payment_intents = self.payment_intents.lock().await; + + Ok(payment_intents + .iter() + .find(|payment_intent| { + payment_intent.payment_id == payment_id && payment_intent.merchant_id == merchant_id + }) + .cloned() + .unwrap()) + } +} + impl DataModelExt for PaymentIntentNew { type StorageModel = DieselPaymentIntentNew;