From 5fda7de6d87ec2d3611412edd2d34fa17c250def Mon Sep 17 00:00:00 2001 From: Anurag Thakur Date: Thu, 19 Feb 2026 20:59:55 +0530 Subject: [PATCH] chore(redis_interfaces): Upgrade fred crate to version 8.0.6 (#11331) --- Cargo.lock | 6 +++--- config/config.example.toml | 2 ++ config/deployments/env_specific.toml | 2 ++ config/development.toml | 2 ++ config/docker_compose.toml | 2 ++ crates/redis_interface/Cargo.toml | 2 +- crates/redis_interface/src/lib.rs | 14 +++++++------- crates/redis_interface/src/types.rs | 4 ++++ crates/storage_impl/src/redis/pub_sub.rs | 6 +++--- 9 files changed, 26 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7c4e46c403..b976e9a365 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3505,17 +3505,17 @@ checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619" [[package]] name = "fred" -version = "7.1.2" +version = "8.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b99c2b48934cd02a81032dd7428b7ae831a27794275bc94eba367418db8a9e55" +checksum = "4b8e3a1339ed45ad8fde94530c4bdcbd5f371a3c6bd3bf57682923792830aa37" dependencies = [ "arc-swap", "async-trait", "bytes 1.10.1", "bytes-utils", + "crossbeam-queue 0.3.12", "float-cmp", "futures 0.3.31", - "lazy_static", "log", "parking_lot 0.12.3", "rand 0.8.5", diff --git a/config/config.example.toml b/config/config.example.toml index 3441625fd2..a54b06c47a 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -79,10 +79,12 @@ default_hash_ttl = 900 # Default TTL for hashes entries, in seconds use_legacy_version = false # Resp protocol for fred crate (set this to true if using RESPv2 or redis version < 6) stream_read_count = 1 # Default number of entries to read from stream if not provided in stream read options auto_pipeline = true # Whether or not the client should automatically pipeline commands across tasks when possible. +broadcast_channel_capacity = 32 # The default capacity used when creating tokio broadcast channels disable_auto_backpressure = false # Whether or not to disable the automatic backpressure features when pipelining is enabled. max_in_flight_commands = 5000 # The maximum number of in-flight commands (per connection) before backpressure will be applied. default_command_timeout = 30 # An optional timeout to apply to all commands. In seconds unresponsive_timeout = 10 # An optional timeout for Unresponsive commands in seconds. This should be less than default_command_timeout. +unresponsive_check_interval = 2 # The frequency at which the client checks for unresponsive connections. In seconds. This value should usually be less than half of `unresponsive_timeout` and always more than 1 ms. max_feed_count = 200 # The maximum number of frames that will be fed to a socket before flushing. # This section provides configs for currency conversion api diff --git a/config/deployments/env_specific.toml b/config/deployments/env_specific.toml index e2cee8ffdb..fdec562d29 100644 --- a/config/deployments/env_specific.toml +++ b/config/deployments/env_specific.toml @@ -226,6 +226,8 @@ default_hash_ttl = 900 # Default TTL for hashes entries, in seconds use_legacy_version = false # RESP protocol for fred crate (set this to true if using RESPv2 or redis version < 6) stream_read_count = 1 # Default number of entries to read from stream if not provided in stream read options auto_pipeline = true # Whether or not the client should automatically pipeline commands across tasks when possible. +broadcast_channel_capacity = 32 # The default capacity used when creating tokio broadcast channels +unresponsive_check_interval = 2 # The frequency at which the client checks for unresponsive connections. In seconds. This value should usually be less than half of `unresponsive_timeout` and always more than 1 ms. disable_auto_backpressure = false # Whether or not to disable the automatic backpressure features when pipelining is enabled. max_in_flight_commands = 5000 # The maximum number of in-flight commands (per connection) before backpressure will be applied. default_command_timeout = 30 # An optional timeout to apply to all commands. In seconds diff --git a/config/development.toml b/config/development.toml index c184817438..ad93619a6e 100644 --- a/config/development.toml +++ b/config/development.toml @@ -49,10 +49,12 @@ default_hash_ttl = 900 use_legacy_version = false stream_read_count = 1 auto_pipeline = true +broadcast_channel_capacity = 32 disable_auto_backpressure = false max_in_flight_commands = 5000 default_command_timeout = 30 unresponsive_timeout = 10 +unresponsive_check_interval = 2 max_feed_count = 200 [trace_header] diff --git a/config/docker_compose.toml b/config/docker_compose.toml index de81a4a530..1b0a04f9d4 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -98,10 +98,12 @@ default_hash_ttl = 900 use_legacy_version = false stream_read_count = 1 auto_pipeline = true +broadcast_channel_capacity = 32 # The default capacity used when creating tokio broadcast channels disable_auto_backpressure = false max_in_flight_commands = 5000 default_command_timeout = 30 unresponsive_timeout = 10 +unresponsive_check_interval = 2 # The frequency at which the client checks for unresponsive connections. In seconds. This value should usually be less than half of `unresponsive_timeout` and always more than 1 ms. max_feed_count = 200 [key_manager] diff --git a/crates/redis_interface/Cargo.toml b/crates/redis_interface/Cargo.toml index 67a86c5b4c..13ff26589e 100644 --- a/crates/redis_interface/Cargo.toml +++ b/crates/redis_interface/Cargo.toml @@ -12,7 +12,7 @@ multitenancy_fallback = [] [dependencies] error-stack = "0.4.1" -fred = { version = "7.1.2", features = ["metrics", "partial-tracing", "subscriber-client", "check-unresponsive"] } +fred = { version = "8.0.6", features = ["metrics", "partial-tracing", "subscriber-client"] } futures = "0.3" serde = { version = "1.0.219", features = ["derive"] } thiserror = "1.0.69" diff --git a/crates/redis_interface/src/lib.rs b/crates/redis_interface/src/lib.rs index 1a6c191c18..f9d3a6861e 100644 --- a/crates/redis_interface/src/lib.rs +++ b/crates/redis_interface/src/lib.rs @@ -23,12 +23,8 @@ use std::sync::{atomic, Arc}; use common_utils::errors::CustomResult; use error_stack::ResultExt; -pub use fred::interfaces::PubsubInterface; -use fred::{ - clients::Transaction, - interfaces::ClientLike, - prelude::{EventInterface, TransactionInterface}, -}; +pub use fred::interfaces::{EventInterface, PubsubInterface}; +use fred::{clients::Transaction, interfaces::ClientLike, prelude::TransactionInterface}; pub use self::types::*; @@ -134,10 +130,14 @@ impl RedisConnectionPool { max_in_flight_commands: conf.max_in_flight_commands, policy: fred::types::BackpressurePolicy::Drain, }, + broadcast_channel_capacity: conf.broadcast_channel_capacity, }; let connection_config = fred::types::ConnectionConfig { - unresponsive_timeout: std::time::Duration::from_secs(conf.unresponsive_timeout), + unresponsive: fred::types::UnresponsiveConfig { + max_timeout: Some(std::time::Duration::from_secs(conf.unresponsive_timeout)), + interval: std::time::Duration::from_secs(conf.unresponsive_check_interval), + }, ..fred::types::ConnectionConfig::default() }; diff --git a/crates/redis_interface/src/types.rs b/crates/redis_interface/src/types.rs index 4f966dc90a..9ba19c210b 100644 --- a/crates/redis_interface/src/types.rs +++ b/crates/redis_interface/src/types.rs @@ -67,6 +67,8 @@ pub struct RedisSettings { pub default_command_timeout: u64, pub max_feed_count: u64, pub unresponsive_timeout: u64, + pub unresponsive_check_interval: u64, + pub broadcast_channel_capacity: usize, } impl RedisSettings { @@ -118,6 +120,8 @@ impl Default for RedisSettings { default_command_timeout: 30, max_feed_count: 200, unresponsive_timeout: 10, + unresponsive_check_interval: 2, + broadcast_channel_capacity: 32, } } } diff --git a/crates/storage_impl/src/redis/pub_sub.rs b/crates/storage_impl/src/redis/pub_sub.rs index 28b7614876..5e7c20a4ad 100644 --- a/crates/storage_impl/src/redis/pub_sub.rs +++ b/crates/storage_impl/src/redis/pub_sub.rs @@ -1,7 +1,7 @@ use std::sync::atomic; use error_stack::ResultExt; -use redis_interface::{errors as redis_errors, PubsubInterface, RedisValue}; +use redis_interface::{errors as redis_errors, EventInterface, PubsubInterface, RedisValue}; use router_env::{logger, tracing::Instrument}; use crate::redis::cache::{ @@ -32,7 +32,7 @@ impl PubSubInterface for std::sync::Arc { self.subscriber.manage_subscriptions(); self.subscriber - .subscribe::<(), &str>(channel) + .subscribe::<_>(channel) .await .change_context(redis_errors::RedisError::SubscribeError)?; @@ -85,7 +85,7 @@ impl PubSubInterface for std::sync::Arc { #[inline] async fn on_message(&self) -> error_stack::Result<(), redis_errors::RedisError> { logger::debug!("Started on message"); - let mut rx = self.subscriber.on_message(); + let mut rx = self.subscriber.message_rx(); while let Ok(message) = rx.recv().await { let channel_name = message.channel.to_string(); logger::debug!("Received message on channel: {channel_name}");