chore(deps): update fred and moka (#3088)

This commit is contained in:
Kartikeya Hegde
2023-12-11 15:21:23 +05:30
committed by GitHub
parent 2c4599a1cd
commit 129b1e55bd
14 changed files with 130 additions and 129 deletions

View File

@ -9,11 +9,12 @@ license.workspace = true
[dependencies]
error-stack = "0.3.1"
fred = { version = "6.3.0", features = ["metrics", "partial-tracing", "subscriber-client"] }
fred = { version = "7.0.0", features = ["metrics", "partial-tracing", "subscriber-client"] }
futures = "0.3"
serde = { version = "1.0.193", features = ["derive"] }
thiserror = "1.0.40"
tokio = "1.28.2"
tokio-stream = {version = "0.1.14", features = ["sync"]}
# First party crates
common_utils = { version = "0.1.0", path = "../common_utils", features = ["async_ext"] }

View File

@ -383,6 +383,7 @@ impl super::RedisConnectionPool {
) -> CustomResult<Vec<String>, errors::RedisError> {
Ok(self
.pool
.next()
.hscan::<&str, &str>(key, pattern, count)
.filter_map(|value| async move {
match value {
@ -562,7 +563,7 @@ impl super::RedisConnectionPool {
.await
.into_report()
.map_err(|err| match err.current_context().kind() {
RedisErrorKind::NotFound => {
RedisErrorKind::NotFound | RedisErrorKind::Parse => {
err.change_context(errors::RedisError::StreamEmptyOrNotAvailable)
}
_ => err.change_context(errors::RedisError::StreamReadFailed),

View File

@ -24,14 +24,14 @@ use std::sync::{atomic, Arc};
use common_utils::errors::CustomResult;
use error_stack::{IntoReport, ResultExt};
use fred::interfaces::ClientLike;
pub use fred::interfaces::PubsubInterface;
use fred::{interfaces::ClientLike, prelude::EventInterface};
use router_env::logger;
pub use self::{commands::*, types::*};
pub struct RedisConnectionPool {
pub pool: fred::pool::RedisPool,
pub pool: fred::prelude::RedisPool,
config: RedisConfig,
pub subscriber: SubscriberClient,
pub publisher: RedisClient,
@ -53,8 +53,10 @@ impl RedisClient {
pub async fn new(
config: fred::types::RedisConfig,
reconnect_policy: fred::types::ReconnectPolicy,
perf: fred::types::PerformanceConfig,
) -> CustomResult<Self, errors::RedisError> {
let client = fred::prelude::RedisClient::new(config, None, Some(reconnect_policy));
let client =
fred::prelude::RedisClient::new(config, Some(perf), None, Some(reconnect_policy));
client.connect();
client
.wait_for_connect()
@ -73,8 +75,10 @@ impl SubscriberClient {
pub async fn new(
config: fred::types::RedisConfig,
reconnect_policy: fred::types::ReconnectPolicy,
perf: fred::types::PerformanceConfig,
) -> CustomResult<Self, errors::RedisError> {
let client = fred::clients::SubscriberClient::new(config, None, Some(reconnect_policy));
let client =
fred::clients::SubscriberClient::new(config, Some(perf), None, Some(reconnect_policy));
client.connect();
client
.wait_for_connect()
@ -117,6 +121,17 @@ impl RedisConnectionPool {
.into_report()
.change_context(errors::RedisError::RedisConnectionError)?;
let perf = fred::types::PerformanceConfig {
auto_pipeline: conf.auto_pipeline,
default_command_timeout: std::time::Duration::from_secs(conf.default_command_timeout),
max_feed_count: conf.max_feed_count,
backpressure: fred::types::BackpressureConfig {
disable_auto_backpressure: conf.disable_auto_backpressure,
max_in_flight_commands: conf.max_in_flight_commands,
policy: fred::types::BackpressurePolicy::Drain,
},
};
if !conf.use_legacy_version {
config.version = fred::types::RespVersion::RESP3;
}
@ -127,13 +142,21 @@ impl RedisConnectionPool {
conf.reconnect_delay,
);
let subscriber = SubscriberClient::new(config.clone(), reconnect_policy.clone()).await?;
let subscriber =
SubscriberClient::new(config.clone(), reconnect_policy.clone(), perf.clone()).await?;
let publisher = RedisClient::new(config.clone(), reconnect_policy.clone()).await?;
let publisher =
RedisClient::new(config.clone(), reconnect_policy.clone(), perf.clone()).await?;
let pool = fred::pool::RedisPool::new(config, None, Some(reconnect_policy), conf.pool_size)
.into_report()
.change_context(errors::RedisError::RedisConnectionError)?;
let pool = fred::prelude::RedisPool::new(
config,
Some(perf),
None,
Some(reconnect_policy),
conf.pool_size,
)
.into_report()
.change_context(errors::RedisError::RedisConnectionError)?;
pool.connect();
pool.wait_for_connect()
@ -153,16 +176,28 @@ impl RedisConnectionPool {
}
pub async fn on_error(&self, tx: tokio::sync::oneshot::Sender<()>) {
while let Ok(redis_error) = self.pool.on_error().recv().await {
logger::error!(?redis_error, "Redis protocol or connection error");
logger::error!("current state: {:#?}", self.pool.state());
if self.pool.state() == fred::types::ClientState::Disconnected {
if tx.send(()).is_err() {
logger::error!("The redis shutdown signal sender failed to signal");
use futures::StreamExt;
use tokio_stream::wrappers::BroadcastStream;
let error_rxs: Vec<BroadcastStream<fred::error::RedisError>> = self
.pool
.clients()
.iter()
.map(|client| BroadcastStream::new(client.error_rx()))
.collect();
let mut error_rx = futures::stream::select_all(error_rxs);
loop {
if let Some(Ok(error)) = error_rx.next().await {
logger::error!(?error, "Redis protocol or connection error");
if self.pool.state() == fred::types::ClientState::Disconnected {
if tx.send(()).is_err() {
logger::error!("The redis shutdown signal sender failed to signal");
}
self.is_redis_available
.store(false, atomic::Ordering::SeqCst);
break;
}
self.is_redis_available
.store(false, atomic::Ordering::SeqCst);
break;
}
}
}

View File

@ -52,6 +52,11 @@ pub struct RedisSettings {
/// TTL for hash-tables in seconds
pub default_hash_ttl: u32,
pub stream_read_count: u64,
pub auto_pipeline: bool,
pub disable_auto_backpressure: bool,
pub max_in_flight_commands: u64,
pub default_command_timeout: u64,
pub max_feed_count: u64,
}
impl RedisSettings {
@ -89,6 +94,11 @@ impl Default for RedisSettings {
default_ttl: 300,
stream_read_count: 1,
default_hash_ttl: 900,
auto_pipeline: true,
disable_auto_backpressure: false,
max_in_flight_commands: 5000,
default_command_timeout: 0,
max_feed_count: 200,
}
}
}

View File

@ -535,6 +535,7 @@ mod tests {
merchant_id,
hashed_api_key.into_inner()
),)
.await
.is_none()
)
}

View File

@ -63,7 +63,7 @@ where
F: FnOnce() -> Fut + Send,
Fut: futures::Future<Output = CustomResult<T, errors::StorageError>> + Send,
{
let cache_val = cache.get_val::<T>(key);
let cache_val = cache.get_val::<T>(key).await;
if let Some(val) = cache_val {
Ok(val)
} else {

View File

@ -890,6 +890,7 @@ mod merchant_connector_account_cache_tests {
"{}_{}",
merchant_id, connector_label
),)
.await
.is_none())
}
}

View File

@ -50,8 +50,8 @@ async fn invalidate_existing_cache_success() {
let response_body = response.body().await;
println!("invalidate Cache: {response:?} : {response_body:?}");
assert_eq!(response.status(), awc::http::StatusCode::OK);
assert!(cache::CONFIG_CACHE.get(&cache_key).is_none());
assert!(cache::ACCOUNTS_CACHE.get(&cache_key).is_none());
assert!(cache::CONFIG_CACHE.get(&cache_key).await.is_none());
assert!(cache::ACCOUNTS_CACHE.get(&cache_key).await.is_none());
}
#[actix_web::test]

View File

@ -38,7 +38,7 @@ error-stack = "0.3.1"
futures = "0.3.28"
http = "0.2.9"
mime = "0.3.17"
moka = { version = "0.11.3", features = ["future"] }
moka = { version = "0.12", features = ["future"] }
once_cell = "1.18.0"
ring = "0.16.20"
serde = { version = "1.0.193", features = ["derive"] }

View File

@ -109,7 +109,6 @@ impl Cache {
/// `max_capacity`: Max size in MB's that the cache can hold
pub fn new(time_to_live: u64, time_to_idle: u64, max_capacity: Option<u64>) -> Self {
let mut cache_builder = MokaCache::builder()
.eviction_listener_with_queued_delivery_mode(|_, _, _| {})
.time_to_live(std::time::Duration::from_secs(time_to_live))
.time_to_idle(std::time::Duration::from_secs(time_to_idle));
@ -126,8 +125,8 @@ impl Cache {
self.insert(key, Arc::new(val)).await;
}
pub fn get_val<T: Clone + Cacheable>(&self, key: &str) -> Option<T> {
let val = self.get(key)?;
pub async fn get_val<T: Clone + Cacheable>(&self, key: &str) -> Option<T> {
let val = self.get(key).await?;
(*val).as_any().downcast_ref::<T>().cloned()
}
@ -188,7 +187,7 @@ where
F: FnOnce() -> Fut + Send,
Fut: futures::Future<Output = CustomResult<T, StorageError>> + Send,
{
let cache_val = cache.get_val::<T>(key);
let cache_val = cache.get_val::<T>(key).await;
if let Some(val) = cache_val {
Ok(val)
} else {
@ -266,14 +265,17 @@ mod cache_tests {
async fn construct_and_get_cache() {
let cache = Cache::new(1800, 1800, None);
cache.push("key".to_string(), "val".to_string()).await;
assert_eq!(cache.get_val::<String>("key"), Some(String::from("val")));
assert_eq!(
cache.get_val::<String>("key").await,
Some(String::from("val"))
);
}
#[tokio::test]
async fn eviction_on_size_test() {
let cache = Cache::new(2, 2, Some(0));
cache.push("key".to_string(), "val".to_string()).await;
assert_eq!(cache.get_val::<String>("key"), None);
assert_eq!(cache.get_val::<String>("key").await, None);
}
#[tokio::test]
@ -283,7 +285,7 @@ mod cache_tests {
cache.remove("key").await;
assert_eq!(cache.get_val::<String>("key"), None);
assert_eq!(cache.get_val::<String>("key").await, None);
}
#[tokio::test]
@ -291,6 +293,6 @@ mod cache_tests {
let cache = Cache::new(2, 2, None);
cache.push("key".to_string(), "val".to_string()).await;
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
assert_eq!(cache.get_val::<String>("key"), None);
assert_eq!(cache.get_val::<String>("key").await, None);
}
}