diff --git a/Cargo.lock b/Cargo.lock index e1edaedfd0..b2225445c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,11 +19,11 @@ dependencies = [ "futures-util", "log", "once_cell", - "parking_lot 0.12.1", + "parking_lot", "pin-project-lite", "smallvec", "tokio", - "tokio-util 0.7.7", + "tokio-util", ] [[package]] @@ -40,7 +40,7 @@ dependencies = [ "memchr", "pin-project-lite", "tokio", - "tokio-util 0.7.7", + "tokio-util", ] [[package]] @@ -92,7 +92,7 @@ dependencies = [ "sha1", "smallvec", "tokio", - "tokio-util 0.7.7", + "tokio-util", "tracing", "zstd", ] @@ -175,7 +175,7 @@ dependencies = [ "log", "pin-project-lite", "tokio-rustls", - "tokio-util 0.7.7", + "tokio-util", "webpki-roots", ] @@ -763,7 +763,7 @@ dependencies = [ "pin-project-lite", "pin-utils", "tokio", - "tokio-util 0.7.7", + "tokio-util", "tracing", ] @@ -916,7 +916,7 @@ dependencies = [ "async-trait", "futures-channel", "futures-util", - "parking_lot 0.12.1", + "parking_lot", "tokio", ] @@ -952,16 +952,7 @@ dependencies = [ "cc", "cfg-if", "constant_time_eq", - "digest 0.10.6", -] - -[[package]] -name = "block-buffer" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" -dependencies = [ - "generic-array", + "digest", ] [[package]] @@ -1423,7 +1414,7 @@ dependencies = [ "hashbrown", "lock_api", "once_cell", - "parking_lot_core 0.9.7", + "parking_lot_core", ] [[package]] @@ -1497,22 +1488,13 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "digest" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" -dependencies = [ - "generic-array", -] - [[package]] name = "digest" version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f" dependencies = [ - "block-buffer 0.10.4", + "block-buffer", "crypto-common", "subtle", ] @@ -1692,9 +1674,9 @@ dependencies = [ [[package]] name = "float-cmp" -version = "0.8.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1267f4ac4f343772758f7b1bdcbe767c218bbab93bb432acbf5162bbf85a6c4" +checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" dependencies = [ "num-traits", ] @@ -1731,9 +1713,9 @@ dependencies = [ [[package]] name = "fred" -version = "5.2.0" +version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6be0137d9045288f9c0a0659da3b74c196ad0263d2eafa0f5a73785a907bad14" +checksum = "c52d60e6d9b2a715da13ec520d7da6e095121e25da5027cdac56b8f96684654b" dependencies = [ "arc-swap", "arcstr", @@ -1745,17 +1727,15 @@ dependencies = [ "futures", "lazy_static", "log", - "native-tls", - "parking_lot 0.11.2", + "parking_lot", "pretty_env_logger", "rand 0.8.5", "redis-protocol", "semver", "sha-1", "tokio", - "tokio-native-tls", "tokio-stream", - "tokio-util 0.6.10", + "tokio-util", "tracing", "tracing-futures", "url", @@ -2011,7 +1991,7 @@ dependencies = [ "indexmap", "slab", "tokio", - "tokio-util 0.7.7", + "tokio-util", "tracing", ] @@ -2060,7 +2040,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" dependencies = [ - "digest 0.10.6", + "digest", ] [[package]] @@ -2624,7 +2604,7 @@ dependencies = [ "futures-util", "num_cpus", "once_cell", - "parking_lot 0.12.1", + "parking_lot", "quanta", "rustc_version", "scheduled-thread-pool", @@ -2730,12 +2710,6 @@ version = "1.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" -[[package]] -name = "opaque-debug" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" - [[package]] name = "openssl" version = "0.10.45" @@ -2889,17 +2863,6 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" -[[package]] -name = "parking_lot" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" -dependencies = [ - "instant", - "lock_api", - "parking_lot_core 0.8.6", -] - [[package]] name = "parking_lot" version = "0.12.1" @@ -2907,21 +2870,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core 0.9.7", -] - -[[package]] -name = "parking_lot_core" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" -dependencies = [ - "cfg-if", - "instant", - "libc", - "redox_syscall", - "smallvec", - "winapi", + "parking_lot_core", ] [[package]] @@ -3225,7 +3174,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" dependencies = [ "log", - "parking_lot 0.12.1", + "parking_lot", "scheduled-thread-pool", ] @@ -3423,7 +3372,7 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-native-tls", - "tokio-util 0.7.7", + "tokio-util", "tower-service", "url", "wasm-bindgen", @@ -3731,7 +3680,7 @@ version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" dependencies = [ - "parking_lot 0.12.1", + "parking_lot", ] [[package]] @@ -3910,7 +3859,7 @@ dependencies = [ "futures", "lazy_static", "log", - "parking_lot 0.12.1", + "parking_lot", "serial_test_derive", ] @@ -3927,15 +3876,13 @@ dependencies = [ [[package]] name = "sha-1" -version = "0.9.8" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6" +checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" dependencies = [ - "block-buffer 0.9.0", "cfg-if", "cpufeatures", - "digest 0.9.0", - "opaque-debug", + "digest", ] [[package]] @@ -3946,7 +3893,7 @@ checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" dependencies = [ "cfg-if", "cpufeatures", - "digest 0.10.6", + "digest", ] [[package]] @@ -3957,7 +3904,7 @@ checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0" dependencies = [ "cfg-if", "cpufeatures", - "digest 0.10.6", + "digest", ] [[package]] @@ -4264,7 +4211,7 @@ dependencies = [ "memchr", "mio", "num_cpus", - "parking_lot 0.12.1", + "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", @@ -4325,20 +4272,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-util" -version = "0.6.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" -dependencies = [ - "bytes", - "futures-core", - "futures-sink", - "log", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-util" version = "0.7.7" @@ -4420,7 +4353,7 @@ dependencies = [ "prost-derive", "tokio", "tokio-stream", - "tokio-util 0.7.7", + "tokio-util", "tower", "tower-layer", "tower-service", @@ -4442,7 +4375,7 @@ dependencies = [ "rand 0.8.5", "slab", "tokio", - "tokio-util 0.7.7", + "tokio-util", "tower-layer", "tower-service", "tracing", diff --git a/crates/redis_interface/Cargo.toml b/crates/redis_interface/Cargo.toml index de248ff6b7..725b3beda9 100644 --- a/crates/redis_interface/Cargo.toml +++ b/crates/redis_interface/Cargo.toml @@ -10,7 +10,7 @@ license = "Apache-2.0" [dependencies] async-trait = "0.1.66" error-stack = "0.3.1" -fred = { version = "5.2.0", features = ["metrics", "partial-tracing"] } +fred = { version = "6.0.0", features = ["metrics", "partial-tracing"] } futures = "0.3" serde = { version = "1.0.155", features = ["derive"] } thiserror = "1.0.39" diff --git a/crates/redis_interface/src/commands.rs b/crates/redis_interface/src/commands.rs index 68fa952e9b..e000326d23 100644 --- a/crates/redis_interface/src/commands.rs +++ b/crates/redis_interface/src/commands.rs @@ -18,7 +18,7 @@ use fred::{ interfaces::{HashesInterface, KeysInterface, StreamsInterface}, types::{ Expiration, FromRedis, MultipleIDs, MultipleKeys, MultipleOrderedPairs, MultipleStrings, - RedisKey, RedisMap, RedisValue, SetOptions, XCap, XReadResponse, + RedisKey, RedisMap, RedisValue, Scanner, SetOptions, XCap, XReadResponse, }, }; use futures::StreamExt; @@ -33,8 +33,8 @@ impl super::RedisConnectionPool { #[instrument(level = "DEBUG", skip(self))] pub async fn set_key(&self, key: &str, value: V) -> CustomResult<(), errors::RedisError> where - V: TryInto + Debug, - V::Error: Into, + V: TryInto + Debug + Send + Sync, + V::Error: Into + Send + Sync, { self.pool .set( @@ -54,8 +54,8 @@ impl super::RedisConnectionPool { value: V, ) -> CustomResult where - V: TryInto + Debug, - V::Error: Into, + V: TryInto + Debug + Send + Sync, + V::Error: Into + Send + Sync, { self.pool .msetnx(value) @@ -138,8 +138,8 @@ impl super::RedisConnectionPool { seconds: i64, ) -> CustomResult<(), errors::RedisError> where - V: TryInto + Debug, - V::Error: Into, + V: TryInto + Debug + Send + Sync, + V::Error: Into + Send + Sync, { self.pool .set(key, value, Some(Expiration::EX(seconds)), None, false) @@ -155,8 +155,8 @@ impl super::RedisConnectionPool { value: V, ) -> CustomResult where - V: TryInto + Debug, - V::Error: Into, + V: TryInto + Debug + Send + Sync, + V::Error: Into + Send + Sync, { self.pool .set( @@ -204,8 +204,8 @@ impl super::RedisConnectionPool { values: V, ) -> CustomResult<(), errors::RedisError> where - V: TryInto + Debug, - V::Error: Into, + V: TryInto + Debug + Send + Sync, + V::Error: Into + Send + Sync, { let output: Result<(), _> = self .pool @@ -228,8 +228,8 @@ impl super::RedisConnectionPool { value: V, ) -> CustomResult where - V: TryInto + Debug, - V::Error: Into, + V: TryInto + Debug + Send + Sync, + V::Error: Into + Send + Sync, { let output: Result = self .pool @@ -378,8 +378,8 @@ impl super::RedisConnectionPool { fields: F, ) -> CustomResult<(), errors::RedisError> where - F: TryInto + Debug, - F::Error: Into, + F: TryInto + Debug + Send + Sync, + F::Error: Into + Send + Sync, { self.pool .xadd(stream, false, None, entry_id, fields) @@ -395,7 +395,7 @@ impl super::RedisConnectionPool { ids: Ids, ) -> CustomResult where - Ids: Into + Debug, + Ids: Into + Debug + Send + Sync, { self.pool .xdel(stream, ids) @@ -411,8 +411,8 @@ impl super::RedisConnectionPool { xcap: C, ) -> CustomResult where - C: TryInto + Debug, - C::Error: Into, + C: TryInto + Debug + Send + Sync, + C::Error: Into + Send + Sync, { self.pool .xtrim(stream, xcap) @@ -429,7 +429,7 @@ impl super::RedisConnectionPool { ids: Ids, ) -> CustomResult where - Ids: Into + Debug, + Ids: Into + Debug + Send + Sync, { self.pool .xack(stream, group, ids) @@ -441,7 +441,7 @@ impl super::RedisConnectionPool { #[instrument(level = "DEBUG", skip(self))] pub async fn stream_get_length(&self, stream: K) -> CustomResult where - K: Into + Debug, + K: Into + Debug + Send + Sync, { self.pool .xlen(stream) @@ -458,8 +458,8 @@ impl super::RedisConnectionPool { read_count: Option, ) -> CustomResult, errors::RedisError> where - K: Into + Debug, - Ids: Into + Debug, + K: Into + Debug + Send + Sync, + Ids: Into + Debug + Send + Sync, { self.pool .xread_map( @@ -483,8 +483,8 @@ impl super::RedisConnectionPool { group: Option<(&str, &str)>, // (group_name, consumer_name) ) -> CustomResult>, errors::RedisError> where - K: Into + Debug, - Ids: Into + Debug, + K: Into + Debug + Send + Sync, + Ids: Into + Debug + Send + Sync, { match group { Some((group_name, consumer_name)) => { @@ -574,7 +574,7 @@ impl super::RedisConnectionPool { ids: Ids, ) -> CustomResult where - Ids: Into + Debug, + Ids: Into + Debug + Send + Sync, R: FromRedis + Unpin + Send + 'static, { self.pool diff --git a/crates/redis_interface/src/lib.rs b/crates/redis_interface/src/lib.rs index f4fc517d12..dce353c8f5 100644 --- a/crates/redis_interface/src/lib.rs +++ b/crates/redis_interface/src/lib.rs @@ -27,7 +27,6 @@ use common_utils::errors::CustomResult; use error_stack::{IntoReport, ResultExt}; use fred::interfaces::ClientLike; pub use fred::interfaces::PubsubInterface; -use futures::StreamExt; use router_env::logger; pub use self::{commands::*, types::*}; @@ -55,10 +54,10 @@ impl std::ops::Deref for RedisClient { impl RedisClient { pub async fn new( config: fred::types::RedisConfig, - policy: fred::types::ReconnectPolicy, + reconnect_policy: fred::types::ReconnectPolicy, ) -> CustomResult { - let client = fred::prelude::RedisClient::new(config); - client.connect(Some(policy)); + let client = fred::prelude::RedisClient::new(config, None, Some(reconnect_policy)); + client.connect(); client .wait_for_connect() .await @@ -96,22 +95,22 @@ impl RedisConnectionPool { if !conf.use_legacy_version { config.version = fred::types::RespVersion::RESP3; } - config.tracing = true; + config.tracing = fred::types::TracingConfig::new(true); config.blocking = fred::types::Blocking::Error; - let policy = fred::types::ReconnectPolicy::new_constant( + let reconnect_policy = fred::types::ReconnectPolicy::new_constant( conf.reconnect_max_attempts, conf.reconnect_delay, ); - let subscriber = RedisClient::new(config.clone(), policy.clone()).await?; + let subscriber = RedisClient::new(config.clone(), reconnect_policy.clone()).await?; - let publisher = RedisClient::new(config.clone(), policy.clone()).await?; + let publisher = RedisClient::new(config.clone(), reconnect_policy.clone()).await?; - let pool = fred::pool::RedisPool::new(config, conf.pool_size) + let pool = fred::pool::RedisPool::new(config, None, Some(reconnect_policy), conf.pool_size) .into_report() .change_context(errors::RedisError::RedisConnectionError)?; - let join_handles = pool.connect(Some(policy)); + let join_handles = pool.connect(); pool.wait_for_connect() .await .into_report() @@ -140,17 +139,13 @@ impl RedisConnectionPool { } } pub async fn on_error(&self) { - self.pool - .on_error() - .for_each(|err| { - logger::error!("{err:?}"); - if self.pool.state() == fred::types::ClientState::Disconnected { - self.is_redis_available - .store(false, atomic::Ordering::SeqCst); - } - futures::future::ready(()) - }) - .await; + while let Ok(redis_error) = self.pool.on_error().recv().await { + logger::error!(?redis_error, "Redis protocol or connection error"); + if self.pool.state() == fred::types::ClientState::Disconnected { + self.is_redis_available + .store(false, atomic::Ordering::SeqCst); + } + } } } diff --git a/crates/router/src/services.rs b/crates/router/src/services.rs index 74c4a71c6d..eba3269394 100644 --- a/crates/router/src/services.rs +++ b/crates/router/src/services.rs @@ -6,7 +6,6 @@ pub mod logger; use std::sync::{atomic, Arc}; use error_stack::{IntoReport, ResultExt}; -use futures::StreamExt; use redis_interface::{errors as redis_errors, PubsubInterface}; pub use self::{api::*, encryption::*}; @@ -24,11 +23,13 @@ pub trait PubSubInterface { &self, channel: &str, ) -> errors::CustomResult; + async fn publish( &self, channel: &str, key: &str, ) -> errors::CustomResult; + async fn on_message(&self) -> errors::CustomResult<(), redis_errors::RedisError>; } @@ -45,6 +46,7 @@ impl PubSubInterface for redis_interface::RedisConnectionPool { .into_report() .change_context(redis_errors::RedisError::SubscribeError) } + #[inline] async fn publish( &self, @@ -57,11 +59,13 @@ impl PubSubInterface for redis_interface::RedisConnectionPool { .into_report() .change_context(redis_errors::RedisError::SubscribeError) } + #[inline] async fn on_message(&self) -> errors::CustomResult<(), redis_errors::RedisError> { - let mut message = self.subscriber.on_message(); - while let Some((_, key)) = message.next().await { - let key = key + let mut rx = self.subscriber.on_message(); + while let Ok(message) = rx.recv().await { + let key = message + .value .as_string() .ok_or::(redis_errors::RedisError::DeleteFailed)?;