From 10003cd6fe0b4979d08385d0043b50a5bdac2738 Mon Sep 17 00:00:00 2001 From: Nishant Joshi Date: Mon, 28 Nov 2022 11:40:13 +0530 Subject: [PATCH] refactor(redis_interface): separating redis functionality and dependent functionalities outside router crate (#15) Co-authored-by: Sanchith Hegde --- Cargo.lock | 34 ++- crates/common_utils/Cargo.toml | 16 + crates/common_utils/README.md | 11 + crates/common_utils/src/errors.rs | 40 +++ crates/common_utils/src/ext_traits.rs | 279 ++++++++++++++++++ crates/common_utils/src/lib.rs | 17 ++ crates/redis_interface/Cargo.toml | 18 ++ .../redis => redis_interface/src}/commands.rs | 13 +- crates/redis_interface/src/errors.rs | 39 +++ .../redis.rs => redis_interface/src/lib.rs} | 24 +- .../redis => redis_interface/src}/types.rs | 20 ++ crates/router/Cargo.toml | 6 +- crates/router/src/configs/settings.rs | 19 +- crates/router/src/connection.rs | 6 +- crates/router/src/core/errors.rs | 41 +-- crates/router/src/db/payment_attempt.rs | 3 +- crates/router/src/db/payment_intent.rs | 3 +- crates/router/src/scheduler/consumer.rs | 9 +- crates/router/src/scheduler/utils.rs | 17 +- .../src/scheduler/workflows/payment_sync.rs | 14 +- crates/router/src/services.rs | 3 +- crates/router/src/utils/ext_traits.rs | 214 +------------- 22 files changed, 544 insertions(+), 302 deletions(-) create mode 100644 crates/common_utils/Cargo.toml create mode 100644 crates/common_utils/README.md create mode 100644 crates/common_utils/src/errors.rs create mode 100644 crates/common_utils/src/ext_traits.rs create mode 100644 crates/common_utils/src/lib.rs create mode 100644 crates/redis_interface/Cargo.toml rename crates/{router/src/services/redis => redis_interface/src}/commands.rs (97%) create mode 100644 crates/redis_interface/src/errors.rs rename crates/{router/src/services/redis.rs => redis_interface/src/lib.rs} (83%) rename crates/{router/src/services/redis => redis_interface/src}/types.rs (79%) diff --git a/Cargo.lock b/Cargo.lock index 3afc67e00e..8472b79d44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -830,6 +830,19 @@ dependencies = [ "vec_map", ] +[[package]] +name = "common_utils" +version = "0.1.0" +dependencies = [ + "bytes", + "error-stack", + "masking", + "router_env", + "serde", + "serde_json", + "serde_urlencoded", +] + [[package]] name = "config" version = "0.13.2" @@ -1080,9 +1093,9 @@ dependencies = [ [[package]] name = "error-stack" -version = "0.2.1" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c04879c877b85178ad32202703102bcdc1e7c4cb0065b270e55a2c1baff65f2" +checksum = "859d224e04b2d93d974c08e375dac9b8d1a513846e44c6666450a57b1ed963f9" dependencies = [ "anyhow", "owo-colors", @@ -2308,6 +2321,21 @@ dependencies = [ "nom", ] +[[package]] +name = "redis_interface" +version = "0.1.0" +dependencies = [ + "bytes", + "common_utils", + "error-stack", + "fred", + "router_env", + "serde", + "serde_json", + "serde_urlencoded", + "thiserror", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -2431,6 +2459,7 @@ dependencies = [ "base64", "bb8", "bytes", + "common_utils", "config", "crc32fast", "diesel", @@ -2448,6 +2477,7 @@ dependencies = [ "nanoid", "once_cell", "rand", + "redis_interface", "regex", "reqwest", "ring", diff --git a/crates/common_utils/Cargo.toml b/crates/common_utils/Cargo.toml new file mode 100644 index 0000000000..245bba3545 --- /dev/null +++ b/crates/common_utils/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "common_utils" +version = "0.1.0" +edition = "2021" +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bytes = "1.2.1" +error-stack = "0.2.1" +serde = { version = "1.0.145", features = ["derive"] } +serde_json = "1.0.85" +serde_urlencoded = "0.7.1" + +# First party crates +masking = { version = "0.1.0", path = "../masking" } +router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] } \ No newline at end of file diff --git a/crates/common_utils/README.md b/crates/common_utils/README.md new file mode 100644 index 0000000000..db32154723 --- /dev/null +++ b/crates/common_utils/README.md @@ -0,0 +1,11 @@ +# Common Utils + +Common functionality required by internal crates + +## Files Tree Layout + +```text +└── src : source code + └── errors : common error specific types + └── ext_traits : traits for extending type functionalities +``` \ No newline at end of file diff --git a/crates/common_utils/src/errors.rs b/crates/common_utils/src/errors.rs new file mode 100644 index 0000000000..97ef47f89d --- /dev/null +++ b/crates/common_utils/src/errors.rs @@ -0,0 +1,40 @@ +//! +//! errors and error specific types for universal use + +/// Custom Result +/// A custom datatype that wraps the error variant into a report, allowing +/// error_stack::Report specific extendability +/// +/// Effectively, equivalent to `Result>` +/// +pub type CustomResult = error_stack::Result; + +macro_rules! impl_error_display { + ($st: ident, $arg: tt) => { + impl std::fmt::Display for $st { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fmt.write_str(&format!( + "{{ error_type: {:?}, error_description: {} }}", + self, $arg + )) + } + } + }; +} + +macro_rules! impl_error_type { + ($name: ident, $arg: tt) => { + #[doc = ""] + #[doc = stringify!(Error variant $name)] + #[doc = stringify!(Custom error variant for $arg)] + #[doc = ""] + #[derive(Debug)] + pub struct $name; + + impl_error_display!($name, $arg); + + impl std::error::Error for $name {} + }; +} + +impl_error_type!(ParsingError, "Parsing error"); diff --git a/crates/common_utils/src/ext_traits.rs b/crates/common_utils/src/ext_traits.rs new file mode 100644 index 0000000000..83770e6ca8 --- /dev/null +++ b/crates/common_utils/src/ext_traits.rs @@ -0,0 +1,279 @@ +//! +//! This module holds traits for extending functionalities for existing datatypes +//! & inbuilt datatypes. +//! + +use error_stack::{IntoReport, ResultExt}; +use masking::{ExposeInterface, Secret, Strategy}; +use serde::{Deserialize, Serialize}; + +use crate::errors::{self, CustomResult}; + +/// +/// Encode interface +/// An interface for performing type conversions and serialization +/// +pub trait Encode<'e, P> +where + Self: 'e + std::fmt::Debug, +{ + // If needed get type information/custom error implementation. + /// + /// Converting `Self` into an intermediate representation `

` + /// and then performing encoding operation using the `Serialize` trait from `serde` + /// Specifically to convert into json, by using `serde_json` + /// + fn convert_and_encode(&'e self) -> CustomResult + where + P: TryFrom<&'e Self> + Serialize, + Result>::Error>: error_stack::ResultExt, + >::Error> as ResultExt>::Ok: Serialize; + + /// + /// Converting `Self` into an intermediate representation `

` + /// and then performing encoding operation using the `Serialize` trait from `serde` + /// Specifically, to convert into urlencoded, by using `serde_urlencoded` + /// + fn convert_and_url_encode(&'e self) -> CustomResult + where + P: TryFrom<&'e Self> + Serialize, + Result>::Error>: error_stack::ResultExt, + >::Error> as ResultExt>::Ok: Serialize; + + /// + /// Functionality, for specifically encoding `Self` into `String` + /// after serialization by using `serde::Serialize` + /// + fn encode(&'e self) -> CustomResult + where + Self: Serialize; + + /// + /// Functionality, for specifically encoding `Self` into `String` + /// after serialization by using `serde::Serialize` + /// specifically, to convert into JSON `String`. + /// + fn encode_to_string_of_json(&'e self) -> CustomResult + where + Self: Serialize; + + /// + /// Functionality, for specifically encoding `Self` into `serde_json::Value` + /// after serialization by using `serde::Serialize` + /// + fn encode_to_value(&'e self) -> CustomResult + where + Self: Serialize; + + /// + /// Functionality, for specifically encoding `Self` into `Vec` + /// after serialization by using `serde::Serialize` + /// + fn encode_to_vec(&'e self) -> CustomResult, errors::ParsingError> + where + Self: Serialize; +} + +impl<'e, P, A> Encode<'e, P> for A +where + Self: 'e + std::fmt::Debug, +{ + fn convert_and_encode(&'e self) -> CustomResult + where + P: TryFrom<&'e Self> + Serialize, + Result>::Error>: error_stack::ResultExt, + >::Error> as ResultExt>::Ok: Serialize, + { + serde_json::to_string(&P::try_from(self).change_context(errors::ParsingError)?) + .into_report() + .change_context(errors::ParsingError) + .attach_printable_lazy(|| format!("Unable to convert {:?} to a request", self)) + } + + fn convert_and_url_encode(&'e self) -> CustomResult + where + P: TryFrom<&'e Self> + Serialize, + Result>::Error>: error_stack::ResultExt, + >::Error> as ResultExt>::Ok: Serialize, + { + serde_urlencoded::to_string(&P::try_from(self).change_context(errors::ParsingError)?) + .into_report() + .change_context(errors::ParsingError) + .attach_printable_lazy(|| format!("Unable to convert {:?} to a request", self)) + } + + // Check without two functions can we combine this + fn encode(&'e self) -> CustomResult + where + Self: Serialize, + { + serde_urlencoded::to_string(self) + .into_report() + .change_context(errors::ParsingError) + .attach_printable_lazy(|| format!("Unable to convert {:?} to a request", self)) + } + + fn encode_to_string_of_json(&'e self) -> CustomResult + where + Self: Serialize, + { + serde_json::to_string(self) + .into_report() + .change_context(errors::ParsingError) + .attach_printable_lazy(|| format!("Unable to convert {:?} to a request", self)) + } + + fn encode_to_value(&'e self) -> CustomResult + where + Self: Serialize, + { + serde_json::to_value(self) + .into_report() + .change_context(errors::ParsingError) + .attach_printable_lazy(|| format!("Unable to convert {:?} to a value", self)) + } + + fn encode_to_vec(&'e self) -> CustomResult, errors::ParsingError> + where + Self: Serialize, + { + serde_json::to_vec(self) + .into_report() + .change_context(errors::ParsingError) + .attach_printable_lazy(|| format!("Unable to convert {:?} to a value", self)) + } +} + +/// +/// Extending functionalities of `bytes::Bytes` +/// +pub trait BytesExt { + /// + /// Convert `bytes::Bytes` into type `` using `serde::Deserialize` + /// + fn parse_struct<'de>(&'de self, type_name: &str) -> CustomResult + where + T: Deserialize<'de>; +} + +impl BytesExt for bytes::Bytes { + fn parse_struct<'de>(&'de self, type_name: &str) -> CustomResult + where + T: Deserialize<'de>, + { + use bytes::Buf; + + serde_json::from_slice::(self.chunk()) + .into_report() + .change_context(errors::ParsingError) + .attach_printable_lazy(|| format!("Unable to parse {type_name} from bytes")) + } +} + +/// +/// Extending functionalities of `[u8]` for performing parsing +/// +pub trait ByteSliceExt { + /// + /// Convert `[u8]` into type `` by using `serde::Deserialize` + /// + fn parse_struct<'de>(&'de self, type_name: &str) -> CustomResult + where + T: Deserialize<'de>; +} + +impl ByteSliceExt for [u8] { + fn parse_struct<'de>(&'de self, type_name: &str) -> CustomResult + where + T: Deserialize<'de>, + { + serde_json::from_slice(self) + .into_report() + .change_context(errors::ParsingError) + .attach_printable_lazy(|| format!("Unable to parse {type_name} from &[u8]")) + } +} + +/// +/// Extending functionalities of `serde_json::Value` for performing parsing +/// +pub trait ValueExt { + /// + /// Convert `serde_json::Value` into type `` by using `serde::Deserialize` + /// + fn parse_value(self, type_name: &str) -> CustomResult + where + T: serde::de::DeserializeOwned; +} + +impl ValueExt for serde_json::Value { + fn parse_value(self, type_name: &str) -> CustomResult + where + T: serde::de::DeserializeOwned, + { + let debug = format!( + "Unable to parse {type_name} from serde_json::Value: {:?}", + &self + ); + serde_json::from_value::(self) + .into_report() + .change_context(errors::ParsingError) + .attach_printable_lazy(|| debug) + } +} + +impl ValueExt for Secret +where + MaskingStrategy: Strategy, +{ + fn parse_value(self, type_name: &str) -> CustomResult + where + T: serde::de::DeserializeOwned, + { + self.expose().parse_value(type_name) + } +} + +/// +/// Extending functionalities of `String` for performing parsing +/// +pub trait StringExt { + /// + /// Convert `String` into type `` (which being an `enum`) + /// + fn parse_enum(self, enum_name: &str) -> CustomResult + where + T: std::str::FromStr, + // Requirement for converting the `Err` variant of `FromStr` to `Report` + ::Err: std::error::Error + Send + Sync + 'static; + + /// + /// Convert `serde_json::Value` into type `` by using `serde::Deserialize` + /// + fn parse_struct<'de>(&'de self, type_name: &str) -> CustomResult + where + T: Deserialize<'de>; +} + +impl StringExt for String { + fn parse_enum(self, enum_name: &str) -> CustomResult + where + T: std::str::FromStr, + ::Err: std::error::Error + Send + Sync + 'static, + { + T::from_str(&self) + .into_report() + .change_context(errors::ParsingError) + .attach_printable_lazy(|| format!("Invalid enum variant {self:?} for enum {enum_name}")) + } + + fn parse_struct<'de>(&'de self, type_name: &str) -> CustomResult + where + T: Deserialize<'de>, + { + serde_json::from_str::(self) + .into_report() + .change_context(errors::ParsingError) + .attach_printable_lazy(|| format!("Unable to parse {type_name} from string")) + } +} diff --git a/crates/common_utils/src/lib.rs b/crates/common_utils/src/lib.rs new file mode 100644 index 0000000000..24569a8c1c --- /dev/null +++ b/crates/common_utils/src/lib.rs @@ -0,0 +1,17 @@ +#![warn( + missing_docs, + rust_2018_idioms, + missing_debug_implementations, + clippy::expect_used, + clippy::missing_panics_doc, + clippy::panic, + clippy::panic_in_result_fn, + clippy::panicking_unwrap, + clippy::unreachable, + clippy::unwrap_in_result, + clippy::unwrap_used +)] +#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR" ), "/", "README.md"))] + +pub mod errors; +pub mod ext_traits; diff --git a/crates/redis_interface/Cargo.toml b/crates/redis_interface/Cargo.toml new file mode 100644 index 0000000000..cc48a6f63f --- /dev/null +++ b/crates/redis_interface/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "redis_interface" +version = "0.1.0" +edition = "2021" +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bytes = "1.2.1" +error-stack = "0.2.1" +fred = { version = "5.2.0", features = ["metrics", "partial-tracing"] } +serde = { version = "1.0.145", features = ["derive"] } +serde_json = "1.0.85" +serde_urlencoded = "0.7.1" +thiserror = "1.0.37" + +# First party crates +common_utils = { version = "0.1.0", path = "../common_utils" } +router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] } \ No newline at end of file diff --git a/crates/router/src/services/redis/commands.rs b/crates/redis_interface/src/commands.rs similarity index 97% rename from crates/router/src/services/redis/commands.rs rename to crates/redis_interface/src/commands.rs index 220cac6959..98027c1844 100644 --- a/crates/router/src/services/redis/commands.rs +++ b/crates/redis_interface/src/commands.rs @@ -1,5 +1,13 @@ +//! +//! An interface to abstract the `fred` commands +//! + use std::fmt::Debug; +use common_utils::{ + errors::CustomResult, + ext_traits::{ByteSliceExt, Encode}, +}; use error_stack::{IntoReport, ResultExt}; use fred::{ interfaces::{KeysInterface, StreamsInterface}, @@ -11,9 +19,8 @@ use fred::{ use router_env::{tracing, tracing::instrument}; use crate::{ - core::errors::{self, CustomResult}, - services::redis::types::{RedisEntryId, SetNXReply}, - utils::{ByteSliceExt, Encode}, + errors, + types::{RedisEntryId, SetNXReply}, }; impl super::RedisConnectionPool { diff --git a/crates/redis_interface/src/errors.rs b/crates/redis_interface/src/errors.rs new file mode 100644 index 0000000000..9402b72577 --- /dev/null +++ b/crates/redis_interface/src/errors.rs @@ -0,0 +1,39 @@ +//! +//! Errors specific to this custom redis interface +//! + +#[derive(Debug, thiserror::Error)] +pub enum RedisError { + #[error("Failed to set key value in Redis")] + SetFailed, + #[error("Failed to set key value with expiry in Redis")] + SetExFailed, + #[error("Failed to set expiry for key value in Redis")] + SetExpiryFailed, + #[error("Failed to get key value in Redis")] + GetFailed, + #[error("Failed to delete key value in Redis")] + DeleteFailed, + #[error("Failed to append entry to Redis stream")] + StreamAppendFailed, + #[error("Failed to read entries from Redis stream")] + StreamReadFailed, + #[error("Failed to delete entries from Redis stream")] + StreamDeleteFailed, + #[error("Failed to acknowledge Redis stream entry")] + StreamAcknowledgeFailed, + #[error("Failed to create Redis consumer group")] + ConsumerGroupCreateFailed, + #[error("Failed to destroy Redis consumer group")] + ConsumerGroupDestroyFailed, + #[error("Failed to delete consumer from consumer group")] + ConsumerGroupRemoveConsumerFailed, + #[error("Failed to set last ID on consumer group")] + ConsumerGroupSetIdFailed, + #[error("Failed to set Redis stream message owner")] + ConsumerGroupClaimFailed, + #[error("Failed to serialize application type to JSON")] + JsonSerializationFailed, + #[error("Failed to deserialize application type from JSON")] + JsonDeserializationFailed, +} diff --git a/crates/router/src/services/redis.rs b/crates/redis_interface/src/lib.rs similarity index 83% rename from crates/router/src/services/redis.rs rename to crates/redis_interface/src/lib.rs index 0980e31261..ae23b53b99 100644 --- a/crates/router/src/services/redis.rs +++ b/crates/redis_interface/src/lib.rs @@ -1,8 +1,12 @@ +// TODO: Add crate & modules documentation for this crate + pub mod commands; +pub mod errors; pub mod types; +use router_env::logger; + pub use self::{commands::*, types::*}; -use crate::logger; pub struct RedisConnectionPool { pub pool: fred::pool::RedisPool, @@ -17,7 +21,7 @@ impl RedisConnectionPool { /// /// Panics if a connection to Redis is not successful. #[allow(clippy::expect_used)] - pub(crate) async fn new(conf: &crate::configs::settings::Redis) -> Self { + pub async fn new(conf: &types::RedisSettings) -> Self { let redis_connection_url = match conf.cluster_enabled { // Fred relies on this format for specifying cluster where the host port is ignored & only query parameters are used for node addresses // redis-cluster://username:password@host:port?node=bar.com:30002&node=baz.com:30003 @@ -79,11 +83,23 @@ struct RedisConfig { default_stream_read_count: u64, } -impl From<&crate::configs::settings::Redis> for RedisConfig { - fn from(config: &crate::configs::settings::Redis) -> Self { +impl From<&types::RedisSettings> for RedisConfig { + fn from(config: &types::RedisSettings) -> Self { Self { default_ttl: config.default_ttl, default_stream_read_count: config.stream_read_count, } } } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_redis_error() { + let x = errors::RedisError::ConsumerGroupClaimFailed.to_string(); + + assert_eq!(x, "Failed to set redis stream message owner".to_string()) + } +} diff --git a/crates/router/src/services/redis/types.rs b/crates/redis_interface/src/types.rs similarity index 79% rename from crates/router/src/services/redis/types.rs rename to crates/redis_interface/src/types.rs index f3b3460720..129c587298 100644 --- a/crates/router/src/services/redis/types.rs +++ b/crates/redis_interface/src/types.rs @@ -1,3 +1,23 @@ +//! +//! Data types and type conversions +//! from `fred`'s internal data-types to custom data-types +//! +#[derive(Debug, serde::Deserialize, Clone)] +pub struct RedisSettings { + pub host: String, + pub port: u16, + pub cluster_urls: Vec, + pub cluster_enabled: bool, + pub use_legacy_version: bool, + pub pool_size: usize, + pub reconnect_max_attempts: u32, + /// Reconnect delay in milliseconds + pub reconnect_delay: u32, + /// TTL in seconds + pub default_ttl: u32, + pub stream_read_count: u64, +} + #[derive(Debug)] pub enum RedisEntryId { UserSpecifiedID { diff --git a/crates/router/Cargo.toml b/crates/router/Cargo.toml index 982c113020..0c30c95847 100644 --- a/crates/router/Cargo.toml +++ b/crates/router/Cargo.toml @@ -16,7 +16,7 @@ stripe = ["dep:serde_qs"] sandbox = ["kms", "stripe"] olap = [] production = [] -kv_store = [] +kv_store = ["dep:fred"] [dependencies] @@ -37,7 +37,7 @@ diesel = { git = "https://github.com/juspay/diesel", features = ["postgres", "se dyn-clone = "1.0.9" encoding_rs = "0.8.31" error-stack = "0.2.1" -fred = { version = "5.2.0", features = ["metrics", "partial-tracing"] } +fred = { version = "5.2.0", features = ["metrics", "partial-tracing"] , optional = true } hex = "0.4.3" http = "0.2.8" literally = "0.1.3" @@ -64,7 +64,9 @@ uuid = { version = "1.1.2", features = ["serde", "v4"] } futures = "0.3" # First party crates +common_utils = { version = "0.1.0", path = "../common_utils" } masking = { version = "0.1.0", path = "../masking" } +redis_interface = { version = "0.1.0", path = "../redis_interface" } router_derive = { version = "0.1.0", path = "../router_derive" } router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] } diff --git a/crates/router/src/configs/settings.rs b/crates/router/src/configs/settings.rs index 6d1dd47e98..d6350b09ff 100644 --- a/crates/router/src/configs/settings.rs +++ b/crates/router/src/configs/settings.rs @@ -1,6 +1,7 @@ use std::path::PathBuf; use config::{Environment, File, FileFormat}; +use redis_interface::RedisSettings; pub use router_env::config::{Log, LogConsole, LogFile, LogTelemetry}; use serde::Deserialize; use structopt::StructOpt; @@ -27,7 +28,7 @@ pub struct Settings { pub master_database: Database, #[cfg(feature = "olap")] pub replica_database: Database, - pub redis: Redis, + pub redis: RedisSettings, pub log: Log, pub keys: Keys, pub locker: Locker, @@ -76,22 +77,6 @@ pub struct Database { pub pool_size: u32, } -#[derive(Debug, Deserialize, Clone)] -pub struct Redis { - pub host: String, - pub port: u16, - pub cluster_urls: Vec, - pub cluster_enabled: bool, - pub use_legacy_version: bool, - pub pool_size: usize, - pub reconnect_max_attempts: u32, - /// Reconnect delay in milliseconds - pub reconnect_delay: u32, - /// TTL in seconds - pub default_ttl: u32, - pub stream_read_count: u64, -} - #[derive(Debug, Deserialize, Clone)] pub struct Connectors { pub aci: ConnectorParams, diff --git a/crates/router/src/connection.rs b/crates/router/src/connection.rs index 481b39a9a8..7e5dd4ca94 100644 --- a/crates/router/src/connection.rs +++ b/crates/router/src/connection.rs @@ -6,7 +6,7 @@ use crate::configs::settings::{Database, Settings}; pub type PgPool = bb8::Pool>; pub type PgPooledConn = async_bb8_diesel::Connection; -pub type RedisPool = std::sync::Arc; +pub type RedisPool = std::sync::Arc; #[derive(Debug)] struct TestTransaction; @@ -25,8 +25,8 @@ impl CustomizeConnection for TestTransaction { } } -pub async fn redis_connection(conf: &Settings) -> crate::services::redis::RedisConnectionPool { - crate::services::redis::RedisConnectionPool::new(&conf.redis).await +pub async fn redis_connection(conf: &Settings) -> redis_interface::RedisConnectionPool { + redis_interface::RedisConnectionPool::new(&conf.redis).await } #[allow(clippy::expect_used)] diff --git a/crates/router/src/core/errors.rs b/crates/router/src/core/errors.rs index 4b3567cab8..81cdee3b24 100644 --- a/crates/router/src/core/errors.rs +++ b/crates/router/src/core/errors.rs @@ -5,15 +5,15 @@ pub(crate) mod utils; use std::fmt::Display; use actix_web::{body::BoxBody, http::StatusCode, HttpResponse, ResponseError}; +pub use common_utils::errors::{CustomResult, ParsingError}; use config::ConfigError; use error_stack; +pub use redis_interface::errors::RedisError; use router_env::opentelemetry::metrics::MetricsError; pub use self::api_error_response::ApiErrorResponse; pub(crate) use self::utils::{ApiClientErrorExt, ConnectorErrorExt, StorageErrorExt}; use crate::services; - -pub type CustomResult = error_stack::Result; pub type RouterResult = CustomResult; pub type RouterResponse = CustomResult, ApiErrorResponse>; @@ -97,7 +97,6 @@ pub enum DatabaseError { impl_error_type!(AuthenticationError, "Authentication error"); impl_error_type!(AuthorisationError, "Authorisation error"); impl_error_type!(EncryptionError, "Encryption error"); -impl_error_type!(ParsingError, "Parsing error"); impl_error_type!(UnexpectedError, "Unexpected error"); impl_error_type!(ValidateError, "validation failed"); @@ -418,42 +417,6 @@ error_to_process_tracker_error!( ProcessTrackerError::EValidationError(error_stack::Report) ); -#[derive(Debug, thiserror::Error)] -pub enum RedisError { - #[error("Failed to set key value in Redis")] - SetFailed, - #[error("Failed to set key value with expiry in Redis")] - SetExFailed, - #[error("Failed to set expiry for key value in Redis")] - SetExpiryFailed, - #[error("Failed to get key value in Redis")] - GetFailed, - #[error("Failed to delete key value in Redis")] - DeleteFailed, - #[error("Failed to append entry to redis stream")] - StreamAppendFailed, - #[error("Failed to read entries from redis stream")] - StreamReadFailed, - #[error("Failed to delete entries from redis stream")] - StreamDeleteFailed, - #[error("Failed to acknowledge redis stream entry")] - StreamAcknowledgeFailed, - #[error("Failed to create redis consumer group")] - ConsumerGroupCreateFailed, - #[error("Failed to destroy redis consumer group")] - ConsumerGroupDestroyFailed, - #[error("Failed to delete consumer from consumer group")] - ConsumerGroupRemoveConsumerFailed, - #[error("Failed to set last ID on consumer group")] - ConsumerGroupSetIdFailed, - #[error("Failed to set redis stream message owner")] - ConsumerGroupClaimFailed, - #[error("Failed to serialize application type to json")] - JsonSerializationFailed, - #[error("Failed to deserialize application type from json")] - JsonDeserializationFailed, -} - #[derive(Debug, thiserror::Error)] pub enum ValidationError { #[error("Missing required field: {field_name}")] diff --git a/crates/router/src/db/payment_attempt.rs b/crates/router/src/db/payment_attempt.rs index 4040bb9923..e7682cebfe 100644 --- a/crates/router/src/db/payment_attempt.rs +++ b/crates/router/src/db/payment_attempt.rs @@ -148,12 +148,13 @@ mod storage { mod storage { use error_stack::{IntoReport, ResultExt}; use fred::prelude::*; + use redis_interface::RedisEntryId; use super::IPaymentAttempt; use crate::{ connection::pg_connection, core::errors::{self, CustomResult}, - services::{redis::RedisEntryId, Store}, + services::Store, types::storage::{enums, payment_attempt::*}, utils::{date_time, storage_partitioning::KvStorePartition}, }; diff --git a/crates/router/src/db/payment_intent.rs b/crates/router/src/db/payment_intent.rs index 229789d9a2..72dd3025cd 100644 --- a/crates/router/src/db/payment_intent.rs +++ b/crates/router/src/db/payment_intent.rs @@ -36,12 +36,13 @@ pub trait IPaymentIntent { mod storage { use error_stack::{IntoReport, ResultExt}; use fred::prelude::{RedisErrorKind, *}; + use redis_interface::RedisEntryId; use super::IPaymentIntent; use crate::{ connection::pg_connection, core::errors::{self, CustomResult}, - services::{redis::RedisEntryId, Store}, + services::Store, types::{api, storage::payment_intent::*}, utils::{date_time, storage_partitioning::KvStorePartition}, }; diff --git a/crates/router/src/scheduler/consumer.rs b/crates/router/src/scheduler/consumer.rs index c837f4c97c..eb4398bf67 100644 --- a/crates/router/src/scheduler/consumer.rs +++ b/crates/router/src/scheduler/consumer.rs @@ -22,7 +22,6 @@ use crate::{ logger::{error, info}, routes::AppState, scheduler::utils as pt_utils, - services::redis::*, types::storage::{self, enums}, utils::date_time, }; @@ -93,7 +92,11 @@ pub async fn consumer_operations( .store .redis_conn .clone() - .consumer_group_create(&stream_name, &group_name, &RedisEntryId::AfterLastID) + .consumer_group_create( + &stream_name, + &group_name, + &redis_interface::RedisEntryId::AfterLastID, + ) .await; if group_created.is_err() { info!("Consumer group already exists"); @@ -132,7 +135,7 @@ pub async fn consumer_operations( #[instrument(skip(db, redis_conn))] pub async fn fetch_consumer_tasks( db: &dyn Db, - redis_conn: &RedisConnectionPool, + redis_conn: &redis_interface::RedisConnectionPool, stream_name: &str, group_name: &str, consumer_name: &str, diff --git a/crates/router/src/scheduler/utils.rs b/crates/router/src/scheduler/utils.rs index 26fd0a4308..d64955ab4a 100644 --- a/crates/router/src/scheduler/utils.rs +++ b/crates/router/src/scheduler/utils.rs @@ -15,7 +15,6 @@ use crate::{ logger, routes::AppState, scheduler::{ProcessTrackerBatch, SchedulerFlow}, - services::redis::*, types::storage::{self, enums::ProcessTrackerStatus}, utils::{self, date_time, OptionExt, StringExt}, }; @@ -30,7 +29,7 @@ pub async fn acquire_pt_lock( let conn = state.store.redis_conn.clone(); let is_lock_acquired = conn.set_key_if_not_exist(lock_key, lock_val).await; match is_lock_acquired { - Ok(SetNXReply::KeySet) => match conn.set_expiry(lock_key, ttl).await { + Ok(redis_interface::SetNXReply::KeySet) => match conn.set_expiry(lock_key, ttl).await { Ok(()) => true, #[allow(unused_must_use)] @@ -40,7 +39,7 @@ pub async fn acquire_pt_lock( false } }, - Ok(SetNXReply::KeyNotSet) => { + Ok(redis_interface::SetNXReply::KeyNotSet) => { logger::error!(%tag, "Lock not acquired, previous fetch still in progress"); false } @@ -51,7 +50,11 @@ pub async fn acquire_pt_lock( } } -pub async fn release_pt_lock(redis_conn: &RedisConnectionPool, tag: &str, lock_key: &str) -> bool { +pub async fn release_pt_lock( + redis_conn: &redis_interface::RedisConnectionPool, + tag: &str, + lock_key: &str, +) -> bool { let is_lock_released = redis_conn.delete_key(lock_key).await; match is_lock_released { Ok(()) => true, @@ -143,7 +146,7 @@ pub async fn update_status_and_append( redis_conn .stream_append_entry( &pt_batch.stream_name, - &RedisEntryId::AutoGeneratedID, + &redis_interface::RedisEntryId::AutoGeneratedID, field_value_pairs, ) .await @@ -186,7 +189,7 @@ pub fn divide_into_batches( } pub async fn get_batches( - conn: &RedisConnectionPool, + conn: &redis_interface::RedisConnectionPool, stream_name: &str, group_name: &str, consumer_name: &str, @@ -194,7 +197,7 @@ pub async fn get_batches( let response = conn .stream_read_with_options( stream_name, - RedisEntryId::UndeliveredEntryID, + redis_interface::RedisEntryId::UndeliveredEntryID, // Update logic for collecting to Vec and flattening, if count > 1 is provided Some(1), None, diff --git a/crates/router/src/scheduler/workflows/payment_sync.rs b/crates/router/src/scheduler/workflows/payment_sync.rs index 2b826100d3..0a9d27bc4e 100644 --- a/crates/router/src/scheduler/workflows/payment_sync.rs +++ b/crates/router/src/scheduler/workflows/payment_sync.rs @@ -1,5 +1,8 @@ use std::sync; +use redis_interface as redis; +use redis_interface::errors as redis_errors; + use super::{PaymentsSyncWorkflow, ProcessTrackerWorkflow}; use crate::{ core::payments::{self as payment_flows, operations}, @@ -7,7 +10,6 @@ use crate::{ errors, routes::AppState, scheduler::{consumer, process_data, utils as pt_utils}, - services::redis, types::{ api, storage::{self, enums}, @@ -93,10 +95,12 @@ pub async fn get_sync_process_schedule_time( redis: sync::Arc, retry_count: i32, ) -> Result, errors::ProcessTrackerError> { - let redis_mapping: errors::CustomResult = - redis - .get_and_deserialize_key(&format!("pt_mapping_{}", connector), "ConnectorPTMapping") - .await; + let redis_mapping: errors::CustomResult< + process_data::ConnectorPTMapping, + redis_errors::RedisError, + > = redis + .get_and_deserialize_key(&format!("pt_mapping_{}", connector), "ConnectorPTMapping") + .await; let mapping = match redis_mapping { Ok(x) => x, Err(_) => process_data::ConnectorPTMapping::default(), diff --git a/crates/router/src/services.rs b/crates/router/src/services.rs index defec62bc1..a60dbe08ef 100644 --- a/crates/router/src/services.rs +++ b/crates/router/src/services.rs @@ -1,7 +1,6 @@ pub mod api; pub mod encryption; pub mod logger; -pub mod redis; use std::sync::Arc; @@ -12,7 +11,7 @@ pub struct Store { pub master_pool: crate::db::SqlDb, #[cfg(feature = "olap")] pub replica_pool: crate::db::SqlDb, - pub redis_conn: Arc, + pub redis_conn: Arc, #[cfg(feature = "kv_store")] pub(crate) config: StoreConfig, } diff --git a/crates/router/src/utils/ext_traits.rs b/crates/router/src/utils/ext_traits.rs index de55938191..26dc3dace1 100644 --- a/crates/router/src/utils/ext_traits.rs +++ b/crates/router/src/utils/ext_traits.rs @@ -1,12 +1,11 @@ +pub use common_utils::ext_traits::{ByteSliceExt, BytesExt, Encode, StringExt, ValueExt}; use error_stack::{report, IntoReport, Report, ResultExt}; use once_cell::sync::Lazy; use regex::Regex; -use serde::{Deserialize, Serialize}; use crate::{ core::errors::{self, ApiErrorResponse, CustomResult, RouterResult, ValidateError}, logger, - pii::{ExposeInterface, Secret, Strategy}, types::api::AddressDetails, utils::when, }; @@ -89,217 +88,6 @@ where } } -pub(crate) trait StringExt { - fn parse_enum(self, enum_name: &str) -> CustomResult - where - T: std::str::FromStr, - // Requirement for converting the `Err` variant of `FromStr` to `Report` - ::Err: std::error::Error + Send + Sync + 'static; - - fn parse_struct<'de>(&'de self, type_name: &str) -> CustomResult - where - T: Deserialize<'de>; -} - -impl StringExt for String { - fn parse_enum(self, enum_name: &str) -> CustomResult - where - T: std::str::FromStr, - ::Err: std::error::Error + Send + Sync + 'static, - { - T::from_str(&self) - .into_report() - .change_context(errors::ParsingError) - .attach_printable_lazy(|| format!("Invalid enum variant {self:?} for enum {enum_name}")) - } - - fn parse_struct<'de>(&'de self, type_name: &str) -> CustomResult - where - T: Deserialize<'de>, - { - serde_json::from_str::(self) - .into_report() - .change_context(errors::ParsingError) - .attach_printable_lazy(|| format!("Unable to parse {type_name} from string")) - } -} - -pub(crate) trait BytesExt { - fn parse_struct<'de>(&'de self, type_name: &str) -> CustomResult - where - T: Deserialize<'de>; -} - -impl BytesExt for bytes::Bytes { - fn parse_struct<'de>(&'de self, type_name: &str) -> CustomResult - where - T: Deserialize<'de>, - { - use bytes::Buf; - - serde_json::from_slice::(self.chunk()) - .into_report() - .change_context(errors::ParsingError) - .attach_printable_lazy(|| format!("Unable to parse {type_name} from bytes")) - } -} - -pub(crate) trait ByteSliceExt { - fn parse_struct<'de>(&'de self, type_name: &str) -> CustomResult - where - T: Deserialize<'de>; -} - -impl ByteSliceExt for [u8] { - fn parse_struct<'de>(&'de self, type_name: &str) -> CustomResult - where - T: Deserialize<'de>, - { - serde_json::from_slice(self) - .into_report() - .change_context(errors::ParsingError) - .attach_printable_lazy(|| format!("Unable to parse {type_name} from &[u8]")) - } -} - -pub(crate) trait ValueExt { - fn parse_value(self, type_name: &str) -> CustomResult - where - T: serde::de::DeserializeOwned; -} - -impl ValueExt for serde_json::Value { - fn parse_value(self, type_name: &str) -> CustomResult - where - T: serde::de::DeserializeOwned, - { - let debug = format!( - "Unable to parse {type_name} from serde_json::Value: {:?}", - &self - ); - serde_json::from_value::(self) - .into_report() - .change_context(errors::ParsingError) - .attach_printable_lazy(|| debug) - } -} - -impl ValueExt for Secret -where - MaskingStrategy: Strategy, -{ - fn parse_value(self, type_name: &str) -> CustomResult - where - T: serde::de::DeserializeOwned, - { - self.expose().parse_value(type_name) - } -} - -pub trait Encode<'e, P> -where - Self: 'e + std::fmt::Debug, -{ - // If needed get type information/custom error implementation. - fn convert_and_encode(&'e self) -> CustomResult - where - P: TryFrom<&'e Self> + Serialize, - Result>::Error>: error_stack::ResultExt, - >::Error> as ResultExt>::Ok: Serialize; - - fn convert_and_url_encode(&'e self) -> CustomResult - where - P: TryFrom<&'e Self> + Serialize, - Result>::Error>: error_stack::ResultExt, - >::Error> as ResultExt>::Ok: Serialize; - - fn encode(&'e self) -> CustomResult - where - Self: Serialize; - - fn encode_to_string_of_json(&'e self) -> CustomResult - where - Self: Serialize; - - fn encode_to_value(&'e self) -> CustomResult - where - Self: Serialize; - - fn encode_to_vec(&'e self) -> CustomResult, errors::ParsingError> - where - Self: Serialize; -} - -impl<'e, P, A> Encode<'e, P> for A -where - Self: 'e + std::fmt::Debug, -{ - fn convert_and_encode(&'e self) -> CustomResult - where - P: TryFrom<&'e Self> + Serialize, - Result>::Error>: error_stack::ResultExt, - >::Error> as ResultExt>::Ok: Serialize, - { - serde_json::to_string(&P::try_from(self).change_context(errors::ParsingError)?) - .into_report() - .change_context(errors::ParsingError) - .attach_printable_lazy(|| format!("Unable to convert {:?} to a request", self)) - } - - fn convert_and_url_encode(&'e self) -> CustomResult - where - P: TryFrom<&'e Self> + Serialize, - Result>::Error>: error_stack::ResultExt, - >::Error> as ResultExt>::Ok: Serialize, - { - serde_urlencoded::to_string(&P::try_from(self).change_context(errors::ParsingError)?) - .into_report() - .change_context(errors::ParsingError) - .attach_printable_lazy(|| format!("Unable to convert {:?} to a request", self)) - } - - // Check without two functions can we combine this - fn encode(&'e self) -> CustomResult - where - Self: Serialize, - { - serde_urlencoded::to_string(self) - .into_report() - .change_context(errors::ParsingError) - .attach_printable_lazy(|| format!("Unable to convert {:?} to a request", self)) - } - - fn encode_to_string_of_json(&'e self) -> CustomResult - where - Self: Serialize, - { - serde_json::to_string(self) - .into_report() - .change_context(errors::ParsingError) - .attach_printable_lazy(|| format!("Unable to convert {:?} to a request", self)) - } - - fn encode_to_value(&'e self) -> CustomResult - where - Self: Serialize, - { - serde_json::to_value(self) - .into_report() - .change_context(errors::ParsingError) - .attach_printable_lazy(|| format!("Unable to convert {:?} to a value", self)) - } - - fn encode_to_vec(&'e self) -> CustomResult, errors::ParsingError> - where - Self: Serialize, - { - serde_json::to_vec(self) - .into_report() - .change_context(errors::ParsingError) - .attach_printable_lazy(|| format!("Unable to convert {:?} to a value", self)) - } -} - #[allow(dead_code)] /// Merge two `serde_json::Value` instances. Will need to be updated to handle merging arrays. pub(crate) fn merge_json_values(a: &mut serde_json::Value, b: &serde_json::Value) {