diff --git a/Cargo.lock b/Cargo.lock index 2c9293c170..b466a16109 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2610,9 +2610,9 @@ dependencies = [ [[package]] name = "fred" -version = "7.1.0" +version = "7.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9282e65613822eea90c99872c51afa1de61542215cb11f91456a93f50a5a131a" +checksum = "b99c2b48934cd02a81032dd7428b7ae831a27794275bc94eba367418db8a9e55" dependencies = [ "arc-swap", "async-trait", diff --git a/config/config.example.toml b/config/config.example.toml index abe3f6b4e0..028f689d79 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -54,9 +54,10 @@ use_legacy_version = false # Resp protocol for fred crate (set this to tr stream_read_count = 1 # Default number of entries to read from stream if not provided in stream read options auto_pipeline = true # Whether or not the client should automatically pipeline commands across tasks when possible. disable_auto_backpressure = false # Whether or not to disable the automatic backpressure features when pipelining is enabled. -max_in_flight_commands = 5000 # The maximum number of in-flight commands (per connection) before backpressure will be applied. -default_command_timeout = 0 # An optional timeout to apply to all commands. -max_feed_count = 200 # The maximum number of frames that will be fed to a socket before flushing. +max_in_flight_commands = 5000 # The maximum number of in-flight commands (per connection) before backpressure will be applied. +default_command_timeout = 30 # An optional timeout to apply to all commands. In seconds +unresponsive_timeout = 10 # An optional timeout for Unresponsive commands in seconds. This should be less than default_command_timeout. +max_feed_count = 200 # The maximum number of frames that will be fed to a socket before flushing. # This section provides configs for currency conversion api [forex_api] diff --git a/config/deployments/env_specific.toml b/config/deployments/env_specific.toml index c8c118d982..990796c79b 100644 --- a/config/deployments/env_specific.toml +++ b/config/deployments/env_specific.toml @@ -169,9 +169,10 @@ use_legacy_version = false # RESP p stream_read_count = 1 # Default number of entries to read from stream if not provided in stream read options auto_pipeline = true # Whether or not the client should automatically pipeline commands across tasks when possible. disable_auto_backpressure = false # Whether or not to disable the automatic backpressure features when pipelining is enabled. -max_in_flight_commands = 5000 # The maximum number of in-flight commands (per connection) before backpressure will be applied. -default_command_timeout = 0 # An optional timeout to apply to all commands. -max_feed_count = 200 # The maximum number of frames that will be fed to a socket before flushing. +max_in_flight_commands = 5000 # The maximum number of in-flight commands (per connection) before backpressure will be applied. +default_command_timeout = 30 # An optional timeout to apply to all commands. In seconds +unresponsive_timeout = 10 # An optional timeout for Unresponsive commands in seconds. This should be less than default_command_timeout. +max_feed_count = 200 # The maximum number of frames that will be fed to a socket before flushing. cluster_enabled = true # boolean cluster_urls = ["redis.cluster.uri-1:8080", "redis.cluster.uri-2:4115"] # List of redis cluster urls diff --git a/config/development.toml b/config/development.toml index 59d73a4b8b..39ab8c1eca 100644 --- a/config/development.toml +++ b/config/development.toml @@ -44,7 +44,8 @@ stream_read_count = 1 auto_pipeline = true disable_auto_backpressure = false max_in_flight_commands = 5000 -default_command_timeout = 0 +default_command_timeout = 30 +unresponsive_timeout = 10 max_feed_count = 200 diff --git a/config/docker_compose.toml b/config/docker_compose.toml index 8170132bb8..e0de31dfbb 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -78,7 +78,8 @@ stream_read_count = 1 auto_pipeline = true disable_auto_backpressure = false max_in_flight_commands = 5000 -default_command_timeout = 0 +default_command_timeout = 30 +unresponsive_timeout = 10 max_feed_count = 200 [cors] diff --git a/crates/redis_interface/Cargo.toml b/crates/redis_interface/Cargo.toml index 32e850a073..85cfef3df5 100644 --- a/crates/redis_interface/Cargo.toml +++ b/crates/redis_interface/Cargo.toml @@ -9,7 +9,7 @@ license.workspace = true [dependencies] error-stack = "0.3.1" -fred = { version = "7.0.0", features = ["metrics", "partial-tracing", "subscriber-client"] } +fred = { version = "7.1.2", features = ["metrics", "partial-tracing", "subscriber-client", "check-unresponsive"] } futures = "0.3" serde = { version = "1.0.193", features = ["derive"] } thiserror = "1.0.40" diff --git a/crates/redis_interface/src/lib.rs b/crates/redis_interface/src/lib.rs index 33d40ebe15..b46e4aec19 100644 --- a/crates/redis_interface/src/lib.rs +++ b/crates/redis_interface/src/lib.rs @@ -132,6 +132,11 @@ impl RedisConnectionPool { }, }; + let connection_config = fred::types::ConnectionConfig { + unresponsive_timeout: std::time::Duration::from_secs(conf.unresponsive_timeout), + ..fred::types::ConnectionConfig::default() + }; + if !conf.use_legacy_version { config.version = fred::types::RespVersion::RESP3; } @@ -151,7 +156,7 @@ impl RedisConnectionPool { let pool = fred::prelude::RedisPool::new( config, Some(perf), - None, + Some(connection_config), Some(reconnect_policy), conf.pool_size, ) @@ -201,6 +206,15 @@ impl RedisConnectionPool { } } } + + pub async fn on_unresponsive(&self) { + let _ = self.pool.clients().iter().map(|client| { + client.on_unresponsive(|server| { + logger::warn!(redis_server =?server.host, "Redis server is unresponsive"); + Ok(()) + }) + }); + } } struct RedisConfig { diff --git a/crates/redis_interface/src/types.rs b/crates/redis_interface/src/types.rs index 4ebb620c36..cb883fc169 100644 --- a/crates/redis_interface/src/types.rs +++ b/crates/redis_interface/src/types.rs @@ -57,6 +57,7 @@ pub struct RedisSettings { pub max_in_flight_commands: u64, pub default_command_timeout: u64, pub max_feed_count: u64, + pub unresponsive_timeout: u64, } impl RedisSettings { @@ -76,7 +77,17 @@ impl RedisSettings { "Redis `cluster_urls` must be specified if `cluster_enabled` is `true`".into(), )) .into_report() - }) + })?; + + when( + self.default_command_timeout < self.unresponsive_timeout, + || { + Err(errors::RedisError::InvalidConfiguration( + "Unresponsive timeout cannot be greater than the command timeout".into(), + )) + .into_report() + }, + ) } } @@ -97,8 +108,9 @@ impl Default for RedisSettings { auto_pipeline: true, disable_auto_backpressure: false, max_in_flight_commands: 5000, - default_command_timeout: 0, + default_command_timeout: 30, max_feed_count: 200, + unresponsive_timeout: 10, } } }