mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-27 19:46:48 +08:00
build(deps): bump fred from 5.2.0 to 6.0.0 (#869)
This commit is contained in:
@ -18,7 +18,7 @@ use fred::{
|
||||
interfaces::{HashesInterface, KeysInterface, StreamsInterface},
|
||||
types::{
|
||||
Expiration, FromRedis, MultipleIDs, MultipleKeys, MultipleOrderedPairs, MultipleStrings,
|
||||
RedisKey, RedisMap, RedisValue, SetOptions, XCap, XReadResponse,
|
||||
RedisKey, RedisMap, RedisValue, Scanner, SetOptions, XCap, XReadResponse,
|
||||
},
|
||||
};
|
||||
use futures::StreamExt;
|
||||
@ -33,8 +33,8 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn set_key<V>(&self, key: &str, value: V) -> CustomResult<(), errors::RedisError>
|
||||
where
|
||||
V: TryInto<RedisValue> + Debug,
|
||||
V::Error: Into<fred::error::RedisError>,
|
||||
V: TryInto<RedisValue> + Debug + Send + Sync,
|
||||
V::Error: Into<fred::error::RedisError> + Send + Sync,
|
||||
{
|
||||
self.pool
|
||||
.set(
|
||||
@ -54,8 +54,8 @@ impl super::RedisConnectionPool {
|
||||
value: V,
|
||||
) -> CustomResult<MsetnxReply, errors::RedisError>
|
||||
where
|
||||
V: TryInto<RedisMap> + Debug,
|
||||
V::Error: Into<fred::error::RedisError>,
|
||||
V: TryInto<RedisMap> + Debug + Send + Sync,
|
||||
V::Error: Into<fred::error::RedisError> + Send + Sync,
|
||||
{
|
||||
self.pool
|
||||
.msetnx(value)
|
||||
@ -138,8 +138,8 @@ impl super::RedisConnectionPool {
|
||||
seconds: i64,
|
||||
) -> CustomResult<(), errors::RedisError>
|
||||
where
|
||||
V: TryInto<RedisValue> + Debug,
|
||||
V::Error: Into<fred::error::RedisError>,
|
||||
V: TryInto<RedisValue> + Debug + Send + Sync,
|
||||
V::Error: Into<fred::error::RedisError> + Send + Sync,
|
||||
{
|
||||
self.pool
|
||||
.set(key, value, Some(Expiration::EX(seconds)), None, false)
|
||||
@ -155,8 +155,8 @@ impl super::RedisConnectionPool {
|
||||
value: V,
|
||||
) -> CustomResult<SetnxReply, errors::RedisError>
|
||||
where
|
||||
V: TryInto<RedisValue> + Debug,
|
||||
V::Error: Into<fred::error::RedisError>,
|
||||
V: TryInto<RedisValue> + Debug + Send + Sync,
|
||||
V::Error: Into<fred::error::RedisError> + Send + Sync,
|
||||
{
|
||||
self.pool
|
||||
.set(
|
||||
@ -204,8 +204,8 @@ impl super::RedisConnectionPool {
|
||||
values: V,
|
||||
) -> CustomResult<(), errors::RedisError>
|
||||
where
|
||||
V: TryInto<RedisMap> + Debug,
|
||||
V::Error: Into<fred::error::RedisError>,
|
||||
V: TryInto<RedisMap> + Debug + Send + Sync,
|
||||
V::Error: Into<fred::error::RedisError> + Send + Sync,
|
||||
{
|
||||
let output: Result<(), _> = self
|
||||
.pool
|
||||
@ -228,8 +228,8 @@ impl super::RedisConnectionPool {
|
||||
value: V,
|
||||
) -> CustomResult<HsetnxReply, errors::RedisError>
|
||||
where
|
||||
V: TryInto<RedisValue> + Debug,
|
||||
V::Error: Into<fred::error::RedisError>,
|
||||
V: TryInto<RedisValue> + Debug + Send + Sync,
|
||||
V::Error: Into<fred::error::RedisError> + Send + Sync,
|
||||
{
|
||||
let output: Result<HsetnxReply, _> = self
|
||||
.pool
|
||||
@ -378,8 +378,8 @@ impl super::RedisConnectionPool {
|
||||
fields: F,
|
||||
) -> CustomResult<(), errors::RedisError>
|
||||
where
|
||||
F: TryInto<MultipleOrderedPairs> + Debug,
|
||||
F::Error: Into<fred::error::RedisError>,
|
||||
F: TryInto<MultipleOrderedPairs> + Debug + Send + Sync,
|
||||
F::Error: Into<fred::error::RedisError> + Send + Sync,
|
||||
{
|
||||
self.pool
|
||||
.xadd(stream, false, None, entry_id, fields)
|
||||
@ -395,7 +395,7 @@ impl super::RedisConnectionPool {
|
||||
ids: Ids,
|
||||
) -> CustomResult<usize, errors::RedisError>
|
||||
where
|
||||
Ids: Into<MultipleStrings> + Debug,
|
||||
Ids: Into<MultipleStrings> + Debug + Send + Sync,
|
||||
{
|
||||
self.pool
|
||||
.xdel(stream, ids)
|
||||
@ -411,8 +411,8 @@ impl super::RedisConnectionPool {
|
||||
xcap: C,
|
||||
) -> CustomResult<usize, errors::RedisError>
|
||||
where
|
||||
C: TryInto<XCap> + Debug,
|
||||
C::Error: Into<fred::error::RedisError>,
|
||||
C: TryInto<XCap> + Debug + Send + Sync,
|
||||
C::Error: Into<fred::error::RedisError> + Send + Sync,
|
||||
{
|
||||
self.pool
|
||||
.xtrim(stream, xcap)
|
||||
@ -429,7 +429,7 @@ impl super::RedisConnectionPool {
|
||||
ids: Ids,
|
||||
) -> CustomResult<usize, errors::RedisError>
|
||||
where
|
||||
Ids: Into<MultipleIDs> + Debug,
|
||||
Ids: Into<MultipleIDs> + Debug + Send + Sync,
|
||||
{
|
||||
self.pool
|
||||
.xack(stream, group, ids)
|
||||
@ -441,7 +441,7 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn stream_get_length<K>(&self, stream: K) -> CustomResult<usize, errors::RedisError>
|
||||
where
|
||||
K: Into<RedisKey> + Debug,
|
||||
K: Into<RedisKey> + Debug + Send + Sync,
|
||||
{
|
||||
self.pool
|
||||
.xlen(stream)
|
||||
@ -458,8 +458,8 @@ impl super::RedisConnectionPool {
|
||||
read_count: Option<u64>,
|
||||
) -> CustomResult<XReadResponse<String, String, String, String>, errors::RedisError>
|
||||
where
|
||||
K: Into<MultipleKeys> + Debug,
|
||||
Ids: Into<MultipleIDs> + Debug,
|
||||
K: Into<MultipleKeys> + Debug + Send + Sync,
|
||||
Ids: Into<MultipleIDs> + Debug + Send + Sync,
|
||||
{
|
||||
self.pool
|
||||
.xread_map(
|
||||
@ -483,8 +483,8 @@ impl super::RedisConnectionPool {
|
||||
group: Option<(&str, &str)>, // (group_name, consumer_name)
|
||||
) -> CustomResult<XReadResponse<String, String, String, Option<String>>, errors::RedisError>
|
||||
where
|
||||
K: Into<MultipleKeys> + Debug,
|
||||
Ids: Into<MultipleIDs> + Debug,
|
||||
K: Into<MultipleKeys> + Debug + Send + Sync,
|
||||
Ids: Into<MultipleIDs> + Debug + Send + Sync,
|
||||
{
|
||||
match group {
|
||||
Some((group_name, consumer_name)) => {
|
||||
@ -574,7 +574,7 @@ impl super::RedisConnectionPool {
|
||||
ids: Ids,
|
||||
) -> CustomResult<R, errors::RedisError>
|
||||
where
|
||||
Ids: Into<MultipleIDs> + Debug,
|
||||
Ids: Into<MultipleIDs> + Debug + Send + Sync,
|
||||
R: FromRedis + Unpin + Send + 'static,
|
||||
{
|
||||
self.pool
|
||||
|
||||
@ -27,7 +27,6 @@ use common_utils::errors::CustomResult;
|
||||
use error_stack::{IntoReport, ResultExt};
|
||||
use fred::interfaces::ClientLike;
|
||||
pub use fred::interfaces::PubsubInterface;
|
||||
use futures::StreamExt;
|
||||
use router_env::logger;
|
||||
|
||||
pub use self::{commands::*, types::*};
|
||||
@ -55,10 +54,10 @@ impl std::ops::Deref for RedisClient {
|
||||
impl RedisClient {
|
||||
pub async fn new(
|
||||
config: fred::types::RedisConfig,
|
||||
policy: fred::types::ReconnectPolicy,
|
||||
reconnect_policy: fred::types::ReconnectPolicy,
|
||||
) -> CustomResult<Self, errors::RedisError> {
|
||||
let client = fred::prelude::RedisClient::new(config);
|
||||
client.connect(Some(policy));
|
||||
let client = fred::prelude::RedisClient::new(config, None, Some(reconnect_policy));
|
||||
client.connect();
|
||||
client
|
||||
.wait_for_connect()
|
||||
.await
|
||||
@ -96,22 +95,22 @@ impl RedisConnectionPool {
|
||||
if !conf.use_legacy_version {
|
||||
config.version = fred::types::RespVersion::RESP3;
|
||||
}
|
||||
config.tracing = true;
|
||||
config.tracing = fred::types::TracingConfig::new(true);
|
||||
config.blocking = fred::types::Blocking::Error;
|
||||
let policy = fred::types::ReconnectPolicy::new_constant(
|
||||
let reconnect_policy = fred::types::ReconnectPolicy::new_constant(
|
||||
conf.reconnect_max_attempts,
|
||||
conf.reconnect_delay,
|
||||
);
|
||||
|
||||
let subscriber = RedisClient::new(config.clone(), policy.clone()).await?;
|
||||
let subscriber = RedisClient::new(config.clone(), reconnect_policy.clone()).await?;
|
||||
|
||||
let publisher = RedisClient::new(config.clone(), policy.clone()).await?;
|
||||
let publisher = RedisClient::new(config.clone(), reconnect_policy.clone()).await?;
|
||||
|
||||
let pool = fred::pool::RedisPool::new(config, conf.pool_size)
|
||||
let pool = fred::pool::RedisPool::new(config, None, Some(reconnect_policy), conf.pool_size)
|
||||
.into_report()
|
||||
.change_context(errors::RedisError::RedisConnectionError)?;
|
||||
|
||||
let join_handles = pool.connect(Some(policy));
|
||||
let join_handles = pool.connect();
|
||||
pool.wait_for_connect()
|
||||
.await
|
||||
.into_report()
|
||||
@ -140,17 +139,13 @@ impl RedisConnectionPool {
|
||||
}
|
||||
}
|
||||
pub async fn on_error(&self) {
|
||||
self.pool
|
||||
.on_error()
|
||||
.for_each(|err| {
|
||||
logger::error!("{err:?}");
|
||||
if self.pool.state() == fred::types::ClientState::Disconnected {
|
||||
self.is_redis_available
|
||||
.store(false, atomic::Ordering::SeqCst);
|
||||
}
|
||||
futures::future::ready(())
|
||||
})
|
||||
.await;
|
||||
while let Ok(redis_error) = self.pool.on_error().recv().await {
|
||||
logger::error!(?redis_error, "Redis protocol or connection error");
|
||||
if self.pool.state() == fred::types::ClientState::Disconnected {
|
||||
self.is_redis_available
|
||||
.store(false, atomic::Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user