chore(redis_interfaces): Upgrade fred crate to version 8.0.6 (#11331)

This commit is contained in:
Anurag Thakur
2026-02-19 20:59:55 +05:30
committed by GitHub
parent 2994c6d92c
commit 5fda7de6d8
9 changed files with 26 additions and 14 deletions

6
Cargo.lock generated
View File

@@ -3505,17 +3505,17 @@ checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619"
[[package]]
name = "fred"
version = "7.1.2"
version = "8.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b99c2b48934cd02a81032dd7428b7ae831a27794275bc94eba367418db8a9e55"
checksum = "4b8e3a1339ed45ad8fde94530c4bdcbd5f371a3c6bd3bf57682923792830aa37"
dependencies = [
"arc-swap",
"async-trait",
"bytes 1.10.1",
"bytes-utils",
"crossbeam-queue 0.3.12",
"float-cmp",
"futures 0.3.31",
"lazy_static",
"log",
"parking_lot 0.12.3",
"rand 0.8.5",

View File

@@ -79,10 +79,12 @@ default_hash_ttl = 900 # Default TTL for hashes entries, in seconds
use_legacy_version = false # Resp protocol for fred crate (set this to true if using RESPv2 or redis version < 6)
stream_read_count = 1 # Default number of entries to read from stream if not provided in stream read options
auto_pipeline = true # Whether or not the client should automatically pipeline commands across tasks when possible.
broadcast_channel_capacity = 32 # The default capacity used when creating tokio broadcast channels
disable_auto_backpressure = false # Whether or not to disable the automatic backpressure features when pipelining is enabled.
max_in_flight_commands = 5000 # The maximum number of in-flight commands (per connection) before backpressure will be applied.
default_command_timeout = 30 # An optional timeout to apply to all commands. In seconds
unresponsive_timeout = 10 # An optional timeout for Unresponsive commands in seconds. This should be less than default_command_timeout.
unresponsive_check_interval = 2 # The frequency at which the client checks for unresponsive connections. In seconds. This value should usually be less than half of `unresponsive_timeout` and always more than 1 ms.
max_feed_count = 200 # The maximum number of frames that will be fed to a socket before flushing.
# This section provides configs for currency conversion api

View File

@@ -226,6 +226,8 @@ default_hash_ttl = 900 # Default TTL for hashes entries, in seconds
use_legacy_version = false # RESP protocol for fred crate (set this to true if using RESPv2 or redis version < 6)
stream_read_count = 1 # Default number of entries to read from stream if not provided in stream read options
auto_pipeline = true # Whether or not the client should automatically pipeline commands across tasks when possible.
broadcast_channel_capacity = 32 # The default capacity used when creating tokio broadcast channels
unresponsive_check_interval = 2 # The frequency at which the client checks for unresponsive connections. In seconds. This value should usually be less than half of `unresponsive_timeout` and always more than 1 ms.
disable_auto_backpressure = false # Whether or not to disable the automatic backpressure features when pipelining is enabled.
max_in_flight_commands = 5000 # The maximum number of in-flight commands (per connection) before backpressure will be applied.
default_command_timeout = 30 # An optional timeout to apply to all commands. In seconds

View File

@@ -49,10 +49,12 @@ default_hash_ttl = 900
use_legacy_version = false
stream_read_count = 1
auto_pipeline = true
broadcast_channel_capacity = 32
disable_auto_backpressure = false
max_in_flight_commands = 5000
default_command_timeout = 30
unresponsive_timeout = 10
unresponsive_check_interval = 2
max_feed_count = 200
[trace_header]

View File

@@ -98,10 +98,12 @@ default_hash_ttl = 900
use_legacy_version = false
stream_read_count = 1
auto_pipeline = true
broadcast_channel_capacity = 32 # The default capacity used when creating tokio broadcast channels
disable_auto_backpressure = false
max_in_flight_commands = 5000
default_command_timeout = 30
unresponsive_timeout = 10
unresponsive_check_interval = 2 # The frequency at which the client checks for unresponsive connections. In seconds. This value should usually be less than half of `unresponsive_timeout` and always more than 1 ms.
max_feed_count = 200
[key_manager]

View File

@@ -12,7 +12,7 @@ multitenancy_fallback = []
[dependencies]
error-stack = "0.4.1"
fred = { version = "7.1.2", features = ["metrics", "partial-tracing", "subscriber-client", "check-unresponsive"] }
fred = { version = "8.0.6", features = ["metrics", "partial-tracing", "subscriber-client"] }
futures = "0.3"
serde = { version = "1.0.219", features = ["derive"] }
thiserror = "1.0.69"

View File

@@ -23,12 +23,8 @@ use std::sync::{atomic, Arc};
use common_utils::errors::CustomResult;
use error_stack::ResultExt;
pub use fred::interfaces::PubsubInterface;
use fred::{
clients::Transaction,
interfaces::ClientLike,
prelude::{EventInterface, TransactionInterface},
};
pub use fred::interfaces::{EventInterface, PubsubInterface};
use fred::{clients::Transaction, interfaces::ClientLike, prelude::TransactionInterface};
pub use self::types::*;
@@ -134,10 +130,14 @@ impl RedisConnectionPool {
max_in_flight_commands: conf.max_in_flight_commands,
policy: fred::types::BackpressurePolicy::Drain,
},
broadcast_channel_capacity: conf.broadcast_channel_capacity,
};
let connection_config = fred::types::ConnectionConfig {
unresponsive_timeout: std::time::Duration::from_secs(conf.unresponsive_timeout),
unresponsive: fred::types::UnresponsiveConfig {
max_timeout: Some(std::time::Duration::from_secs(conf.unresponsive_timeout)),
interval: std::time::Duration::from_secs(conf.unresponsive_check_interval),
},
..fred::types::ConnectionConfig::default()
};

View File

@@ -67,6 +67,8 @@ pub struct RedisSettings {
pub default_command_timeout: u64,
pub max_feed_count: u64,
pub unresponsive_timeout: u64,
pub unresponsive_check_interval: u64,
pub broadcast_channel_capacity: usize,
}
impl RedisSettings {
@@ -118,6 +120,8 @@ impl Default for RedisSettings {
default_command_timeout: 30,
max_feed_count: 200,
unresponsive_timeout: 10,
unresponsive_check_interval: 2,
broadcast_channel_capacity: 32,
}
}
}

View File

@@ -1,7 +1,7 @@
use std::sync::atomic;
use error_stack::ResultExt;
use redis_interface::{errors as redis_errors, PubsubInterface, RedisValue};
use redis_interface::{errors as redis_errors, EventInterface, PubsubInterface, RedisValue};
use router_env::{logger, tracing::Instrument};
use crate::redis::cache::{
@@ -32,7 +32,7 @@ impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
self.subscriber.manage_subscriptions();
self.subscriber
.subscribe::<(), &str>(channel)
.subscribe::<_>(channel)
.await
.change_context(redis_errors::RedisError::SubscribeError)?;
@@ -85,7 +85,7 @@ impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
#[inline]
async fn on_message(&self) -> error_stack::Result<(), redis_errors::RedisError> {
logger::debug!("Started on message");
let mut rx = self.subscriber.on_message();
let mut rx = self.subscriber.message_rx();
while let Ok(message) = rx.recv().await {
let channel_name = message.channel.to_string();
logger::debug!("Received message on channel: {channel_name}");