diff --git a/Cargo.lock b/Cargo.lock index 54d37da3a8..5f1ab2cd34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -426,12 +426,6 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" -[[package]] -name = "arcstr" -version = "1.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f907281554a3d0312bb7aab855a8e0ef6cbf1614d06de54105039ca8b34460e" - [[package]] name = "argon2" version = "0.5.2" @@ -542,26 +536,6 @@ dependencies = [ "tokio 1.32.0", ] -[[package]] -name = "async-io" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" -dependencies = [ - "async-lock", - "autocfg", - "cfg-if 1.0.0", - "concurrent-queue", - "futures-lite", - "log", - "parking", - "polling", - "rustix 0.37.25", - "slab", - "socket2 0.4.9", - "waker-fn", -] - [[package]] name = "async-lock" version = "2.8.0" @@ -2558,16 +2532,14 @@ dependencies = [ [[package]] name = "fred" -version = "6.3.2" +version = "7.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a15cc18b56395b8b15ffcdcea7fe8586e3a3ccb3d9dc3b9408800d9814efb08e" +checksum = "f2e8094c30c33132e948eb7e1b740cfdaa5a6702610bd3a2744002ec3575cd68" dependencies = [ "arc-swap", - "arcstr", "async-trait", "bytes 1.5.0", "bytes-utils", - "cfg-if 1.0.0", "float-cmp", "futures 0.3.28", "lazy_static", @@ -2576,7 +2548,7 @@ dependencies = [ "rand 0.8.5", "redis-protocol", "semver 1.0.19", - "sha-1 0.10.1", + "socket2 0.5.4", "tokio 1.32.0", "tokio-stream", "tokio-util", @@ -3278,17 +3250,6 @@ dependencies = [ "cfg-if 1.0.0", ] -[[package]] -name = "io-lifetimes" -version = "1.0.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" -dependencies = [ - "hermit-abi", - "libc", - "windows-sys", -] - [[package]] name = "iovec" version = "0.1.4" @@ -3311,7 +3272,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi", - "rustix 0.38.17", + "rustix", "windows-sys", ] @@ -3497,12 +3458,6 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" -[[package]] -name = "linux-raw-sys" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" - [[package]] name = "linux-raw-sys" version = "0.4.8" @@ -3790,12 +3745,12 @@ dependencies = [ [[package]] name = "moka" -version = "0.11.3" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa6e72583bf6830c956235bff0d5afec8cf2952f579ebad18ae7821a917d950f" +checksum = "d8017ec3548ffe7d4cef7ac0e12b044c01164a74c0f3119420faeaf13490ad8b" dependencies = [ - "async-io", "async-lock", + "async-trait", "crossbeam-channel", "crossbeam-epoch 0.9.15", "crossbeam-utils 0.8.16", @@ -3804,7 +3759,6 @@ dependencies = [ "parking_lot 0.12.1", "quanta", "rustc_version 0.4.0", - "scheduled-thread-pool", "skeptic", "smallvec 1.11.1", "tagptr", @@ -4484,22 +4438,6 @@ dependencies = [ "miniz_oxide 0.3.7", ] -[[package]] -name = "polling" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" -dependencies = [ - "autocfg", - "bitflags 1.3.2", - "cfg-if 1.0.0", - "concurrent-queue", - "libc", - "log", - "pin-project-lite", - "windows-sys", -] - [[package]] name = "ppv-lite86" version = "0.2.17" @@ -4874,6 +4812,7 @@ dependencies = [ "serde", "thiserror", "tokio 1.32.0", + "tokio-stream", ] [[package]] @@ -5168,7 +5107,7 @@ dependencies = [ "serde_urlencoded", "serde_with", "serial_test", - "sha-1 0.9.8", + "sha-1", "sqlx", "storage_impl", "strum 0.25.0", @@ -5348,20 +5287,6 @@ dependencies = [ "nom", ] -[[package]] -name = "rustix" -version = "0.37.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4eb579851244c2c03e7c24f501c3432bed80b8f720af1d6e5b0e0f01555a035" -dependencies = [ - "bitflags 1.3.2", - "errno", - "io-lifetimes", - "libc", - "linux-raw-sys 0.3.8", - "windows-sys", -] - [[package]] name = "rustix" version = "0.38.17" @@ -5371,7 +5296,7 @@ dependencies = [ "bitflags 2.4.0", "errno", "libc", - "linux-raw-sys 0.4.8", + "linux-raw-sys", "windows-sys", ] @@ -5774,17 +5699,6 @@ dependencies = [ "opaque-debug", ] -[[package]] -name = "sha-1" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" -dependencies = [ - "cfg-if 1.0.0", - "cpufeatures", - "digest 0.10.7", -] - [[package]] name = "sha1" version = "0.10.6" @@ -6263,7 +6177,7 @@ dependencies = [ "cfg-if 1.0.0", "fastrand 2.0.1", "redox_syscall 0.3.5", - "rustix 0.38.17", + "rustix", "windows-sys", ] @@ -6641,6 +6555,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio 1.32.0", + "tokio-util", ] [[package]] diff --git a/config/config.example.toml b/config/config.example.toml index eb2574c92e..335433077b 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -52,6 +52,11 @@ default_ttl = 300 # Default TTL for entries, in seconds default_hash_ttl = 900 # Default TTL for hashes entries, in seconds use_legacy_version = false # Resp protocol for fred crate (set this to true if using RESPv2 or redis version < 6) stream_read_count = 1 # Default number of entries to read from stream if not provided in stream read options +auto_pipeline = true # Whether or not the client should automatically pipeline commands across tasks when possible. +disable_auto_backpressure = false # Whether or not to disable the automatic backpressure features when pipelining is enabled. +max_in_flight_commands = 5000 # The maximum number of in-flight commands (per connection) before backpressure will be applied. +default_command_timeout = 0 # An optional timeout to apply to all commands. +max_feed_count = 200 # The maximum number of frames that will be fed to a socket before flushing. # This section provides configs for currency conversion api [forex_api] diff --git a/config/development.toml b/config/development.toml index dc54233662..090751b2ea 100644 --- a/config/development.toml +++ b/config/development.toml @@ -31,6 +31,23 @@ dbname = "hyperswitch_db" pool_size = 5 connection_timeout = 10 +[redis] +host = "127.0.0.1" +port = 6379 +pool_size = 5 +reconnect_max_attempts = 5 +reconnect_delay = 5 +default_ttl = 300 +default_hash_ttl = 900 +use_legacy_version = false +stream_read_count = 1 +auto_pipeline = true +disable_auto_backpressure = false +max_in_flight_commands = 5000 +default_command_timeout = 0 +max_feed_count = 200 + + [server] # HTTP Request body limit. Defaults to 32kB request_body_limit = 32768 diff --git a/config/docker_compose.toml b/config/docker_compose.toml index 437df22e30..dc42a5b44c 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -73,6 +73,19 @@ host = "redis-standalone" port = 6379 cluster_enabled = false cluster_urls = ["redis-cluster:6379"] +pool_size = 5 +reconnect_max_attempts = 5 +reconnect_delay = 5 +default_ttl = 300 +default_hash_ttl = 900 +use_legacy_version = false +stream_read_count = 1 +auto_pipeline = true +disable_auto_backpressure = false +max_in_flight_commands = 5000 +default_command_timeout = 0 +max_feed_count = 200 + [refund] max_attempts = 10 diff --git a/crates/redis_interface/Cargo.toml b/crates/redis_interface/Cargo.toml index fb8976323f..1a6bc96a7f 100644 --- a/crates/redis_interface/Cargo.toml +++ b/crates/redis_interface/Cargo.toml @@ -9,11 +9,12 @@ license.workspace = true [dependencies] error-stack = "0.3.1" -fred = { version = "6.3.0", features = ["metrics", "partial-tracing", "subscriber-client"] } +fred = { version = "7.0.0", features = ["metrics", "partial-tracing", "subscriber-client"] } futures = "0.3" serde = { version = "1.0.193", features = ["derive"] } thiserror = "1.0.40" tokio = "1.28.2" +tokio-stream = {version = "0.1.14", features = ["sync"]} # First party crates common_utils = { version = "0.1.0", path = "../common_utils", features = ["async_ext"] } diff --git a/crates/redis_interface/src/commands.rs b/crates/redis_interface/src/commands.rs index ca85d19d38..ce2b138d92 100644 --- a/crates/redis_interface/src/commands.rs +++ b/crates/redis_interface/src/commands.rs @@ -383,6 +383,7 @@ impl super::RedisConnectionPool { ) -> CustomResult, errors::RedisError> { Ok(self .pool + .next() .hscan::<&str, &str>(key, pattern, count) .filter_map(|value| async move { match value { @@ -562,7 +563,7 @@ impl super::RedisConnectionPool { .await .into_report() .map_err(|err| match err.current_context().kind() { - RedisErrorKind::NotFound => { + RedisErrorKind::NotFound | RedisErrorKind::Parse => { err.change_context(errors::RedisError::StreamEmptyOrNotAvailable) } _ => err.change_context(errors::RedisError::StreamReadFailed), diff --git a/crates/redis_interface/src/lib.rs b/crates/redis_interface/src/lib.rs index bdc7956011..7111869a5c 100644 --- a/crates/redis_interface/src/lib.rs +++ b/crates/redis_interface/src/lib.rs @@ -24,14 +24,14 @@ use std::sync::{atomic, Arc}; use common_utils::errors::CustomResult; use error_stack::{IntoReport, ResultExt}; -use fred::interfaces::ClientLike; pub use fred::interfaces::PubsubInterface; +use fred::{interfaces::ClientLike, prelude::EventInterface}; use router_env::logger; pub use self::{commands::*, types::*}; pub struct RedisConnectionPool { - pub pool: fred::pool::RedisPool, + pub pool: fred::prelude::RedisPool, config: RedisConfig, pub subscriber: SubscriberClient, pub publisher: RedisClient, @@ -53,8 +53,10 @@ impl RedisClient { pub async fn new( config: fred::types::RedisConfig, reconnect_policy: fred::types::ReconnectPolicy, + perf: fred::types::PerformanceConfig, ) -> CustomResult { - let client = fred::prelude::RedisClient::new(config, None, Some(reconnect_policy)); + let client = + fred::prelude::RedisClient::new(config, Some(perf), None, Some(reconnect_policy)); client.connect(); client .wait_for_connect() @@ -73,8 +75,10 @@ impl SubscriberClient { pub async fn new( config: fred::types::RedisConfig, reconnect_policy: fred::types::ReconnectPolicy, + perf: fred::types::PerformanceConfig, ) -> CustomResult { - let client = fred::clients::SubscriberClient::new(config, None, Some(reconnect_policy)); + let client = + fred::clients::SubscriberClient::new(config, Some(perf), None, Some(reconnect_policy)); client.connect(); client .wait_for_connect() @@ -117,6 +121,17 @@ impl RedisConnectionPool { .into_report() .change_context(errors::RedisError::RedisConnectionError)?; + let perf = fred::types::PerformanceConfig { + auto_pipeline: conf.auto_pipeline, + default_command_timeout: std::time::Duration::from_secs(conf.default_command_timeout), + max_feed_count: conf.max_feed_count, + backpressure: fred::types::BackpressureConfig { + disable_auto_backpressure: conf.disable_auto_backpressure, + max_in_flight_commands: conf.max_in_flight_commands, + policy: fred::types::BackpressurePolicy::Drain, + }, + }; + if !conf.use_legacy_version { config.version = fred::types::RespVersion::RESP3; } @@ -127,13 +142,21 @@ impl RedisConnectionPool { conf.reconnect_delay, ); - let subscriber = SubscriberClient::new(config.clone(), reconnect_policy.clone()).await?; + let subscriber = + SubscriberClient::new(config.clone(), reconnect_policy.clone(), perf.clone()).await?; - let publisher = RedisClient::new(config.clone(), reconnect_policy.clone()).await?; + let publisher = + RedisClient::new(config.clone(), reconnect_policy.clone(), perf.clone()).await?; - let pool = fred::pool::RedisPool::new(config, None, Some(reconnect_policy), conf.pool_size) - .into_report() - .change_context(errors::RedisError::RedisConnectionError)?; + let pool = fred::prelude::RedisPool::new( + config, + Some(perf), + None, + Some(reconnect_policy), + conf.pool_size, + ) + .into_report() + .change_context(errors::RedisError::RedisConnectionError)?; pool.connect(); pool.wait_for_connect() @@ -153,16 +176,28 @@ impl RedisConnectionPool { } pub async fn on_error(&self, tx: tokio::sync::oneshot::Sender<()>) { - while let Ok(redis_error) = self.pool.on_error().recv().await { - logger::error!(?redis_error, "Redis protocol or connection error"); - logger::error!("current state: {:#?}", self.pool.state()); - if self.pool.state() == fred::types::ClientState::Disconnected { - if tx.send(()).is_err() { - logger::error!("The redis shutdown signal sender failed to signal"); + use futures::StreamExt; + use tokio_stream::wrappers::BroadcastStream; + + let error_rxs: Vec> = self + .pool + .clients() + .iter() + .map(|client| BroadcastStream::new(client.error_rx())) + .collect(); + + let mut error_rx = futures::stream::select_all(error_rxs); + loop { + if let Some(Ok(error)) = error_rx.next().await { + logger::error!(?error, "Redis protocol or connection error"); + if self.pool.state() == fred::types::ClientState::Disconnected { + if tx.send(()).is_err() { + logger::error!("The redis shutdown signal sender failed to signal"); + } + self.is_redis_available + .store(false, atomic::Ordering::SeqCst); + break; } - self.is_redis_available - .store(false, atomic::Ordering::SeqCst); - break; } } } diff --git a/crates/redis_interface/src/types.rs b/crates/redis_interface/src/types.rs index 5dc9307001..364fcfabc1 100644 --- a/crates/redis_interface/src/types.rs +++ b/crates/redis_interface/src/types.rs @@ -52,6 +52,11 @@ pub struct RedisSettings { /// TTL for hash-tables in seconds pub default_hash_ttl: u32, pub stream_read_count: u64, + pub auto_pipeline: bool, + pub disable_auto_backpressure: bool, + pub max_in_flight_commands: u64, + pub default_command_timeout: u64, + pub max_feed_count: u64, } impl RedisSettings { @@ -89,6 +94,11 @@ impl Default for RedisSettings { default_ttl: 300, stream_read_count: 1, default_hash_ttl: 900, + auto_pipeline: true, + disable_auto_backpressure: false, + max_in_flight_commands: 5000, + default_command_timeout: 0, + max_feed_count: 200, } } } diff --git a/crates/router/src/db/api_keys.rs b/crates/router/src/db/api_keys.rs index 4ba9e47e9a..94edac9690 100644 --- a/crates/router/src/db/api_keys.rs +++ b/crates/router/src/db/api_keys.rs @@ -535,6 +535,7 @@ mod tests { merchant_id, hashed_api_key.into_inner() ),) + .await .is_none() ) } diff --git a/crates/router/src/db/cache.rs b/crates/router/src/db/cache.rs index 0688665f0c..4bda08e22c 100644 --- a/crates/router/src/db/cache.rs +++ b/crates/router/src/db/cache.rs @@ -63,7 +63,7 @@ where F: FnOnce() -> Fut + Send, Fut: futures::Future> + Send, { - let cache_val = cache.get_val::(key); + let cache_val = cache.get_val::(key).await; if let Some(val) = cache_val { Ok(val) } else { diff --git a/crates/router/src/db/merchant_connector_account.rs b/crates/router/src/db/merchant_connector_account.rs index 4fbb8f19cc..3718b962b4 100644 --- a/crates/router/src/db/merchant_connector_account.rs +++ b/crates/router/src/db/merchant_connector_account.rs @@ -890,6 +890,7 @@ mod merchant_connector_account_cache_tests { "{}_{}", merchant_id, connector_label ),) + .await .is_none()) } } diff --git a/crates/router/tests/cache.rs b/crates/router/tests/cache.rs index 4de45c7132..040e0dddf9 100644 --- a/crates/router/tests/cache.rs +++ b/crates/router/tests/cache.rs @@ -50,8 +50,8 @@ async fn invalidate_existing_cache_success() { let response_body = response.body().await; println!("invalidate Cache: {response:?} : {response_body:?}"); assert_eq!(response.status(), awc::http::StatusCode::OK); - assert!(cache::CONFIG_CACHE.get(&cache_key).is_none()); - assert!(cache::ACCOUNTS_CACHE.get(&cache_key).is_none()); + assert!(cache::CONFIG_CACHE.get(&cache_key).await.is_none()); + assert!(cache::ACCOUNTS_CACHE.get(&cache_key).await.is_none()); } #[actix_web::test] diff --git a/crates/storage_impl/Cargo.toml b/crates/storage_impl/Cargo.toml index d7f45c91d6..0155980d9f 100644 --- a/crates/storage_impl/Cargo.toml +++ b/crates/storage_impl/Cargo.toml @@ -38,7 +38,7 @@ error-stack = "0.3.1" futures = "0.3.28" http = "0.2.9" mime = "0.3.17" -moka = { version = "0.11.3", features = ["future"] } +moka = { version = "0.12", features = ["future"] } once_cell = "1.18.0" ring = "0.16.20" serde = { version = "1.0.193", features = ["derive"] } diff --git a/crates/storage_impl/src/redis/cache.rs b/crates/storage_impl/src/redis/cache.rs index cd60666276..a960261f86 100644 --- a/crates/storage_impl/src/redis/cache.rs +++ b/crates/storage_impl/src/redis/cache.rs @@ -109,7 +109,6 @@ impl Cache { /// `max_capacity`: Max size in MB's that the cache can hold pub fn new(time_to_live: u64, time_to_idle: u64, max_capacity: Option) -> Self { let mut cache_builder = MokaCache::builder() - .eviction_listener_with_queued_delivery_mode(|_, _, _| {}) .time_to_live(std::time::Duration::from_secs(time_to_live)) .time_to_idle(std::time::Duration::from_secs(time_to_idle)); @@ -126,8 +125,8 @@ impl Cache { self.insert(key, Arc::new(val)).await; } - pub fn get_val(&self, key: &str) -> Option { - let val = self.get(key)?; + pub async fn get_val(&self, key: &str) -> Option { + let val = self.get(key).await?; (*val).as_any().downcast_ref::().cloned() } @@ -188,7 +187,7 @@ where F: FnOnce() -> Fut + Send, Fut: futures::Future> + Send, { - let cache_val = cache.get_val::(key); + let cache_val = cache.get_val::(key).await; if let Some(val) = cache_val { Ok(val) } else { @@ -266,14 +265,17 @@ mod cache_tests { async fn construct_and_get_cache() { let cache = Cache::new(1800, 1800, None); cache.push("key".to_string(), "val".to_string()).await; - assert_eq!(cache.get_val::("key"), Some(String::from("val"))); + assert_eq!( + cache.get_val::("key").await, + Some(String::from("val")) + ); } #[tokio::test] async fn eviction_on_size_test() { let cache = Cache::new(2, 2, Some(0)); cache.push("key".to_string(), "val".to_string()).await; - assert_eq!(cache.get_val::("key"), None); + assert_eq!(cache.get_val::("key").await, None); } #[tokio::test] @@ -283,7 +285,7 @@ mod cache_tests { cache.remove("key").await; - assert_eq!(cache.get_val::("key"), None); + assert_eq!(cache.get_val::("key").await, None); } #[tokio::test] @@ -291,6 +293,6 @@ mod cache_tests { let cache = Cache::new(2, 2, None); cache.push("key".to_string(), "val".to_string()).await; tokio::time::sleep(std::time::Duration::from_secs(3)).await; - assert_eq!(cache.get_val::("key"), None); + assert_eq!(cache.get_val::("key").await, None); } }