fix: throw 500 error when redis goes down (#531)

This commit is contained in:
Kartikeya Hegde
2023-02-14 17:13:17 +05:30
committed by GitHub
parent eaf98e66bc
commit aafb115acb
12 changed files with 169 additions and 86 deletions

View File

@ -21,8 +21,12 @@ pub mod commands;
pub mod errors; pub mod errors;
pub mod types; pub mod types;
use std::sync::{atomic, Arc};
use common_utils::errors::CustomResult; use common_utils::errors::CustomResult;
use error_stack::{IntoReport, ResultExt}; use error_stack::{IntoReport, ResultExt};
use fred::interfaces::ClientLike;
use futures::StreamExt;
use router_env::logger; use router_env::logger;
pub use self::{commands::*, types::*}; pub use self::{commands::*, types::*};
@ -31,6 +35,7 @@ pub struct RedisConnectionPool {
pub pool: fred::pool::RedisPool, pub pool: fred::pool::RedisPool,
config: RedisConfig, config: RedisConfig,
join_handles: Vec<fred::types::ConnectHandle>, join_handles: Vec<fred::types::ConnectHandle>,
pub is_redis_available: Arc<atomic::AtomicBool>,
} }
impl RedisConnectionPool { impl RedisConnectionPool {
@ -62,6 +67,7 @@ impl RedisConnectionPool {
config.version = fred::types::RespVersion::RESP3; config.version = fred::types::RespVersion::RESP3;
} }
config.tracing = true; config.tracing = true;
config.blocking = fred::types::Blocking::Error;
let policy = fred::types::ReconnectPolicy::new_constant( let policy = fred::types::ReconnectPolicy::new_constant(
conf.reconnect_max_attempts, conf.reconnect_max_attempts,
conf.reconnect_delay, conf.reconnect_delay,
@ -82,6 +88,7 @@ impl RedisConnectionPool {
pool, pool,
config, config,
join_handles, join_handles,
is_redis_available: Arc::new(atomic::AtomicBool::new(true)),
}) })
} }
@ -95,6 +102,19 @@ impl RedisConnectionPool {
}; };
} }
} }
pub async fn on_error(&self) {
self.pool
.on_error()
.for_each(|err| {
logger::error!("{err:?}");
if self.pool.state() == fred::types::ClientState::Disconnected {
self.is_redis_available
.store(false, atomic::Ordering::SeqCst);
}
futures::future::ready(())
})
.await;
}
} }
struct RedisConfig { struct RedisConfig {

View File

@ -69,6 +69,14 @@ pub enum StorageError {
CustomerRedacted, CustomerRedacted,
#[error("Deserialization failure")] #[error("Deserialization failure")]
DeserializationFailed, DeserializationFailed,
#[error("Received Error RedisError: {0}")]
ERedisError(error_stack::Report<RedisError>),
}
impl From<error_stack::Report<RedisError>> for StorageError {
fn from(err: error_stack::Report<RedisError>) -> Self {
Self::ERedisError(err)
}
} }
impl From<error_stack::Report<storage_errors::DatabaseError>> for StorageError { impl From<error_stack::Report<storage_errors::DatabaseError>> for StorageError {

View File

@ -14,7 +14,9 @@ where
Fut: futures::Future<Output = CustomResult<T, errors::StorageError>> + Send, Fut: futures::Future<Output = CustomResult<T, errors::StorageError>> + Send,
{ {
let type_name = std::any::type_name::<T>(); let type_name = std::any::type_name::<T>();
let redis = &store.redis_conn; let redis = &store
.redis_conn()
.map_err(Into::<errors::StorageError>::into)?;
let redis_val = redis.get_and_deserialize_key::<T>(key, type_name).await; let redis_val = redis.get_and_deserialize_key::<T>(key, type_name).await;
match redis_val { match redis_val {
Err(err) => match err.current_context() { Err(err) => match err.current_context() {
@ -46,7 +48,8 @@ where
{ {
let data = fun().await?; let data = fun().await?;
store store
.redis_conn .redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.delete_key(key) .delete_key(key)
.await .await
.change_context(errors::StorageError::KVError)?; .change_context(errors::StorageError::KVError)?;

View File

@ -56,7 +56,8 @@ mod storage {
}; };
match self match self
.redis_conn .redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.serialize_and_set_multiple_hash_field_if_not_exist( .serialize_and_set_multiple_hash_field_if_not_exist(
&[(&secret_key, &created_ek), (&id_key, &created_ek)], &[(&secret_key, &created_ek), (&id_key, &created_ek)],
"ephkey", "ephkey",
@ -72,11 +73,13 @@ mod storage {
} }
Ok(_) => { Ok(_) => {
let expire_at = expires.assume_utc().unix_timestamp(); let expire_at = expires.assume_utc().unix_timestamp();
self.redis_conn self.redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.set_expire_at(&secret_key, expire_at) .set_expire_at(&secret_key, expire_at)
.await .await
.change_context(errors::StorageError::KVError)?; .change_context(errors::StorageError::KVError)?;
self.redis_conn self.redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.set_expire_at(&id_key, expire_at) .set_expire_at(&id_key, expire_at)
.await .await
.change_context(errors::StorageError::KVError)?; .change_context(errors::StorageError::KVError)?;
@ -90,7 +93,8 @@ mod storage {
key: &str, key: &str,
) -> CustomResult<EphemeralKey, errors::StorageError> { ) -> CustomResult<EphemeralKey, errors::StorageError> {
let key = format!("epkey_{key}"); let key = format!("epkey_{key}");
self.redis_conn self.redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.get_hash_field_and_deserialize(&key, "ephkey", "EphemeralKey") .get_hash_field_and_deserialize(&key, "ephkey", "EphemeralKey")
.await .await
.change_context(errors::StorageError::KVError) .change_context(errors::StorageError::KVError)
@ -101,12 +105,14 @@ mod storage {
) -> CustomResult<EphemeralKey, errors::StorageError> { ) -> CustomResult<EphemeralKey, errors::StorageError> {
let ek = self.get_ephemeral_key(id).await?; let ek = self.get_ephemeral_key(id).await?;
self.redis_conn self.redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.delete_key(&format!("epkey_{}", &ek.id)) .delete_key(&format!("epkey_{}", &ek.id))
.await .await
.change_context(errors::StorageError::KVError)?; .change_context(errors::StorageError::KVError)?;
self.redis_conn self.redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.delete_key(&format!("epkey_{}", &ek.secret)) .delete_key(&format!("epkey_{}", &ek.secret))
.await .await
.change_context(errors::StorageError::KVError)?; .change_context(errors::StorageError::KVError)?;

View File

@ -38,7 +38,8 @@ impl ConnectorAccessToken for Store {
// being refreshed by other request then wait till it finishes and use the same access token // being refreshed by other request then wait till it finishes and use the same access token
let key = format!("access_token_{merchant_id}_{connector_name}"); let key = format!("access_token_{merchant_id}_{connector_name}");
let maybe_token = self let maybe_token = self
.redis_conn .redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.get_key::<Option<Vec<u8>>>(&key) .get_key::<Option<Vec<u8>>>(&key)
.await .await
.change_context(errors::StorageError::KVError) .change_context(errors::StorageError::KVError)
@ -63,7 +64,8 @@ impl ConnectorAccessToken for Store {
let serialized_access_token = let serialized_access_token =
Encode::<types::AccessToken>::encode_to_string_of_json(&access_token) Encode::<types::AccessToken>::encode_to_string_of_json(&access_token)
.change_context(errors::StorageError::SerializationFailed)?; .change_context(errors::StorageError::SerializationFailed)?;
self.redis_conn self.redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.set_key_with_expiry(&key, serialized_access_token, access_token.expires) .set_key_with_expiry(&key, serialized_access_token, access_token.expires)
.await .await
.map_err(|error| { .map_err(|error| {

View File

@ -387,7 +387,8 @@ mod storage {
let field = format!("pa_{}", created_attempt.attempt_id); let field = format!("pa_{}", created_attempt.attempt_id);
match self match self
.redis_conn .redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.serialize_and_set_hash_field_if_not_exist(&key, &field, &created_attempt) .serialize_and_set_hash_field_if_not_exist(&key, &field, &created_attempt)
.await .await
{ {
@ -462,7 +463,8 @@ mod storage {
.change_context(errors::StorageError::KVError)?; .change_context(errors::StorageError::KVError)?;
let field = format!("pa_{}", updated_attempt.attempt_id); let field = format!("pa_{}", updated_attempt.attempt_id);
let updated_attempt = self let updated_attempt = self
.redis_conn .redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.set_hash_fields(&key, (&field, &redis_value)) .set_hash_fields(&key, (&field, &redis_value))
.await .await
.map(|_| updated_attempt) .map(|_| updated_attempt)
@ -538,11 +540,13 @@ mod storage {
.into_report()?; .into_report()?;
db_utils::try_redis_get_else_try_database_get( db_utils::try_redis_get_else_try_database_get(
self.redis_conn.get_hash_field_and_deserialize( self.redis_conn()
&lookup.pk_id, .map_err(Into::<errors::StorageError>::into)?
&lookup.sk_id, .get_hash_field_and_deserialize(
"PaymentAttempt", &lookup.pk_id,
), &lookup.sk_id,
"PaymentAttempt",
),
database_call, database_call,
) )
.await .await
@ -582,11 +586,9 @@ mod storage {
let key = &lookup.pk_id; let key = &lookup.pk_id;
db_utils::try_redis_get_else_try_database_get( db_utils::try_redis_get_else_try_database_get(
self.redis_conn.get_hash_field_and_deserialize( self.redis_conn()
key, .map_err(Into::<errors::StorageError>::into)?
&lookup.sk_id, .get_hash_field_and_deserialize(key, &lookup.sk_id, "PaymentAttempt"),
"PaymentAttempt",
),
database_call, database_call,
) )
.await .await
@ -645,11 +647,9 @@ mod storage {
let key = &lookup.pk_id; let key = &lookup.pk_id;
db_utils::try_redis_get_else_try_database_get( db_utils::try_redis_get_else_try_database_get(
self.redis_conn.get_hash_field_and_deserialize( self.redis_conn()
key, .map_err(Into::<errors::StorageError>::into)?
&lookup.sk_id, .get_hash_field_and_deserialize(key, &lookup.sk_id, "PaymentAttempt"),
"PaymentAttempt",
),
database_call, database_call,
) )
.await .await
@ -682,11 +682,9 @@ mod storage {
.into_report()?; .into_report()?;
let key = &lookup.pk_id; let key = &lookup.pk_id;
db_utils::try_redis_get_else_try_database_get( db_utils::try_redis_get_else_try_database_get(
self.redis_conn.get_hash_field_and_deserialize( self.redis_conn()
key, .map_err(Into::<errors::StorageError>::into)?
&lookup.sk_id, .get_hash_field_and_deserialize(key, &lookup.sk_id, "PaymentAttempt"),
"PaymentAttempt",
),
database_call, database_call,
) )
.await .await

View File

@ -95,7 +95,8 @@ mod storage {
}; };
match self match self
.redis_conn .redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.serialize_and_set_hash_field_if_not_exist(&key, "pi", &created_intent) .serialize_and_set_hash_field_if_not_exist(&key, "pi", &created_intent)
.await .await
{ {
@ -152,7 +153,8 @@ mod storage {
.change_context(errors::StorageError::SerializationFailed)?; .change_context(errors::StorageError::SerializationFailed)?;
let updated_intent = self let updated_intent = self
.redis_conn .redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.set_hash_fields(&key, ("pi", &redis_value)) .set_hash_fields(&key, ("pi", &redis_value))
.await .await
.map(|_| updated_intent) .map(|_| updated_intent)
@ -201,7 +203,8 @@ mod storage {
enums::MerchantStorageScheme::RedisKv => { enums::MerchantStorageScheme::RedisKv => {
let key = format!("{merchant_id}_{payment_id}"); let key = format!("{merchant_id}_{payment_id}");
db_utils::try_redis_get_else_try_database_get( db_utils::try_redis_get_else_try_database_get(
self.redis_conn self.redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.get_hash_field_and_deserialize(&key, "pi", "PaymentIntent"), .get_hash_field_and_deserialize(&key, "pi", "PaymentIntent"),
database_call, database_call,
) )

View File

@ -23,9 +23,15 @@ pub trait QueueInterface {
id: &RedisEntryId, id: &RedisEntryId,
) -> CustomResult<(), RedisError>; ) -> CustomResult<(), RedisError>;
async fn acquire_pt_lock(&self, tag: &str, lock_key: &str, lock_val: &str, ttl: i64) -> bool; async fn acquire_pt_lock(
&self,
tag: &str,
lock_key: &str,
lock_val: &str,
ttl: i64,
) -> CustomResult<bool, RedisError>;
async fn release_pt_lock(&self, tag: &str, lock_key: &str) -> bool; async fn release_pt_lock(&self, tag: &str, lock_key: &str) -> CustomResult<bool, RedisError>;
async fn stream_append_entry( async fn stream_append_entry(
&self, &self,
@ -47,7 +53,10 @@ impl QueueInterface for Store {
) -> CustomResult<Vec<storage::ProcessTracker>, ProcessTrackerError> { ) -> CustomResult<Vec<storage::ProcessTracker>, ProcessTrackerError> {
crate::scheduler::consumer::fetch_consumer_tasks( crate::scheduler::consumer::fetch_consumer_tasks(
self, self,
&self.redis_conn.clone(), &self
.redis_conn()
.map_err(ProcessTrackerError::ERedisError)?
.clone(),
stream_name, stream_name,
group_name, group_name,
consumer_name, consumer_name,
@ -61,15 +70,21 @@ impl QueueInterface for Store {
group: &str, group: &str,
id: &RedisEntryId, id: &RedisEntryId,
) -> CustomResult<(), RedisError> { ) -> CustomResult<(), RedisError> {
self.redis_conn self.redis_conn()?
.consumer_group_create(stream, group, id) .consumer_group_create(stream, group, id)
.await .await
} }
async fn acquire_pt_lock(&self, tag: &str, lock_key: &str, lock_val: &str, ttl: i64) -> bool { async fn acquire_pt_lock(
let conn = self.redis_conn.clone(); &self,
tag: &str,
lock_key: &str,
lock_val: &str,
ttl: i64,
) -> CustomResult<bool, RedisError> {
let conn = self.redis_conn()?.clone();
let is_lock_acquired = conn.set_key_if_not_exist(lock_key, lock_val).await; let is_lock_acquired = conn.set_key_if_not_exist(lock_key, lock_val).await;
match is_lock_acquired { Ok(match is_lock_acquired {
Ok(SetnxReply::KeySet) => match conn.set_expiry(lock_key, ttl).await { Ok(SetnxReply::KeySet) => match conn.set_expiry(lock_key, ttl).await {
Ok(()) => true, Ok(()) => true,
@ -88,18 +103,18 @@ impl QueueInterface for Store {
logger::error!(error=%error.current_context(), %tag, "Error while locking"); logger::error!(error=%error.current_context(), %tag, "Error while locking");
false false
} }
} })
} }
async fn release_pt_lock(&self, tag: &str, lock_key: &str) -> bool { async fn release_pt_lock(&self, tag: &str, lock_key: &str) -> CustomResult<bool, RedisError> {
let is_lock_released = self.redis_conn.delete_key(lock_key).await; let is_lock_released = self.redis_conn()?.delete_key(lock_key).await;
match is_lock_released { Ok(match is_lock_released {
Ok(()) => true, Ok(()) => true,
Err(error) => { Err(error) => {
logger::error!(error=%error.current_context(), %tag, "Error while releasing lock"); logger::error!(error=%error.current_context(), %tag, "Error while releasing lock");
false false
} }
} })
} }
async fn stream_append_entry( async fn stream_append_entry(
@ -108,13 +123,13 @@ impl QueueInterface for Store {
entry_id: &RedisEntryId, entry_id: &RedisEntryId,
fields: Vec<(&str, String)>, fields: Vec<(&str, String)>,
) -> CustomResult<(), RedisError> { ) -> CustomResult<(), RedisError> {
self.redis_conn self.redis_conn()?
.stream_append_entry(stream, entry_id, fields) .stream_append_entry(stream, entry_id, fields)
.await .await
} }
async fn get_key(&self, key: &str) -> CustomResult<Vec<u8>, RedisError> { async fn get_key(&self, key: &str) -> CustomResult<Vec<u8>, RedisError> {
self.redis_conn.get_key::<Vec<u8>>(key).await self.redis_conn()?.get_key::<Vec<u8>>(key).await
} }
} }
@ -148,14 +163,14 @@ impl QueueInterface for MockDb {
_lock_key: &str, _lock_key: &str,
_lock_val: &str, _lock_val: &str,
_ttl: i64, _ttl: i64,
) -> bool { ) -> CustomResult<bool, RedisError> {
// [#172]: Implement function for `MockDb` // [#172]: Implement function for `MockDb`
false Ok(false)
} }
async fn release_pt_lock(&self, _tag: &str, _lock_key: &str) -> bool { async fn release_pt_lock(&self, _tag: &str, _lock_key: &str) -> CustomResult<bool, RedisError> {
// [#172]: Implement function for `MockDb` // [#172]: Implement function for `MockDb`
false Ok(false)
} }
async fn stream_append_entry( async fn stream_append_entry(

View File

@ -242,11 +242,9 @@ mod storage {
let key = &lookup.pk_id; let key = &lookup.pk_id;
db_utils::try_redis_get_else_try_database_get( db_utils::try_redis_get_else_try_database_get(
self.redis_conn.get_hash_field_and_deserialize( self.redis_conn()
key, .map_err(Into::<errors::StorageError>::into)?
&lookup.sk_id, .get_hash_field_and_deserialize(key, &lookup.sk_id, "Refund"),
"Refund",
),
database_call, database_call,
) )
.await .await
@ -300,7 +298,8 @@ mod storage {
&created_refund.attempt_id, &created_refund.refund_id &created_refund.attempt_id, &created_refund.refund_id
); );
match self match self
.redis_conn .redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.serialize_and_set_hash_field_if_not_exist(&key, &field, &created_refund) .serialize_and_set_hash_field_if_not_exist(&key, &field, &created_refund)
.await .await
{ {
@ -391,7 +390,8 @@ mod storage {
let pattern = db_utils::generate_hscan_pattern_for_refund(&lookup.sk_id); let pattern = db_utils::generate_hscan_pattern_for_refund(&lookup.sk_id);
self.redis_conn self.redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.hscan_and_deserialize(key, &pattern, None) .hscan_and_deserialize(key, &pattern, None)
.await .await
.change_context(errors::StorageError::KVError) .change_context(errors::StorageError::KVError)
@ -433,7 +433,8 @@ mod storage {
) )
.change_context(errors::StorageError::SerializationFailed)?; .change_context(errors::StorageError::SerializationFailed)?;
self.redis_conn self.redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.set_hash_fields(&lookup.pk_id, (field, redis_value)) .set_hash_fields(&lookup.pk_id, (field, redis_value))
.await .await
.change_context(errors::StorageError::KVError)?; .change_context(errors::StorageError::KVError)?;
@ -484,11 +485,9 @@ mod storage {
let key = &lookup.pk_id; let key = &lookup.pk_id;
db_utils::try_redis_get_else_try_database_get( db_utils::try_redis_get_else_try_database_get(
self.redis_conn.get_hash_field_and_deserialize( self.redis_conn()
key, .map_err(Into::<errors::StorageError>::into)?
&lookup.sk_id, .get_hash_field_and_deserialize(key, &lookup.sk_id, "Refund"),
"Refund",
),
database_call, database_call,
) )
.await .await
@ -535,7 +534,8 @@ mod storage {
let pattern = db_utils::generate_hscan_pattern_for_refund(&lookup.sk_id); let pattern = db_utils::generate_hscan_pattern_for_refund(&lookup.sk_id);
self.redis_conn self.redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.hscan_and_deserialize(&key, &pattern, None) .hscan_and_deserialize(&key, &pattern, None)
.await .await
.change_context(errors::StorageError::KVError) .change_context(errors::StorageError::KVError)

View File

@ -62,20 +62,16 @@ pub async fn run_producer_flow(
op: &SchedulerOptions, op: &SchedulerOptions,
settings: &SchedulerSettings, settings: &SchedulerSettings,
) -> CustomResult<(), errors::ProcessTrackerError> { ) -> CustomResult<(), errors::ProcessTrackerError> {
lock_acquire_release::<_, _, error_stack::Report<errors::ProcessTrackerError>>( lock_acquire_release::<_, _>(state, settings, move || async {
state, let tasks = fetch_producer_tasks(&*state.store, op, settings).await?;
settings, debug!("Producer count of tasks {}", tasks.len());
move || async {
let tasks = fetch_producer_tasks(&*state.store, op, settings).await?;
debug!("Producer count of tasks {}", tasks.len());
// [#268]: Allow task based segregation of tasks // [#268]: Allow task based segregation of tasks
divide_and_append_tasks(state, SchedulerFlow::Producer, tasks, settings).await?; divide_and_append_tasks(state, SchedulerFlow::Producer, tasks, settings).await?;
Ok(()) Ok(())
}, })
)
.await?; .await?;
Ok(()) Ok(())

View File

@ -331,14 +331,14 @@ fn get_delay<'a>(
} }
} }
pub(crate) async fn lock_acquire_release<F, Fut, E>( pub(crate) async fn lock_acquire_release<F, Fut>(
state: &AppState, state: &AppState,
settings: &SchedulerSettings, settings: &SchedulerSettings,
callback: F, callback: F,
) -> Result<(), E> ) -> CustomResult<(), errors::ProcessTrackerError>
where where
F: Fn() -> Fut, F: Fn() -> Fut,
Fut: futures::Future<Output = Result<(), E>>, Fut: futures::Future<Output = CustomResult<(), errors::ProcessTrackerError>>,
{ {
let tag = "PRODUCER_LOCK"; let tag = "PRODUCER_LOCK";
let lock_key = &settings.producer.lock_key; let lock_key = &settings.producer.lock_key;
@ -349,9 +349,15 @@ where
.store .store
.acquire_pt_lock(tag, lock_key, lock_val, ttl) .acquire_pt_lock(tag, lock_key, lock_val, ttl)
.await .await
{ .change_context(errors::ProcessTrackerError::ERedisError(
errors::RedisError::RedisConnectionError.into(),
))? {
let result = callback().await; let result = callback().await;
state.store.release_pt_lock(tag, lock_key).await; state
.store
.release_pt_lock(tag, lock_key)
.await
.map_err(errors::ProcessTrackerError::ERedisError)?;
result result
} else { } else {
Ok(()) Ok(())

View File

@ -4,12 +4,17 @@ pub mod authentication;
pub mod encryption; pub mod encryption;
pub mod logger; pub mod logger;
use std::sync::Arc; use std::sync::{atomic, Arc};
use redis_interface::errors::RedisError;
pub use self::api::*; pub use self::api::*;
#[cfg(feature = "basilisk")] #[cfg(feature = "basilisk")]
pub use self::encryption::*; pub use self::encryption::*;
use crate::connection::{diesel_make_pg_pool, PgPool}; use crate::{
connection::{diesel_make_pg_pool, PgPool},
core::errors,
};
#[derive(Clone)] #[derive(Clone)]
pub struct Store { pub struct Store {
@ -30,11 +35,18 @@ pub(crate) struct StoreConfig {
impl Store { impl Store {
pub async fn new(config: &crate::configs::settings::Settings, test_transaction: bool) -> Self { pub async fn new(config: &crate::configs::settings::Settings, test_transaction: bool) -> Self {
let redis_conn = Arc::new(crate::connection::redis_connection(config).await);
let redis_clone = redis_conn.clone();
tokio::spawn(async move {
redis_clone.on_error().await;
});
Self { Self {
master_pool: diesel_make_pg_pool(&config.master_database, test_transaction).await, master_pool: diesel_make_pg_pool(&config.master_database, test_transaction).await,
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
replica_pool: diesel_make_pg_pool(&config.replica_database, test_transaction).await, replica_pool: diesel_make_pg_pool(&config.replica_database, test_transaction).await,
redis_conn: Arc::new(crate::connection::redis_connection(config).await), redis_conn,
#[cfg(feature = "kv_store")] #[cfg(feature = "kv_store")]
config: StoreConfig { config: StoreConfig {
drainer_stream_name: config.drainer.stream_name.clone(), drainer_stream_name: config.drainer.stream_name.clone(),
@ -49,6 +61,20 @@ impl Store {
format!("{{{}}}_{}", shard_key, self.config.drainer_stream_name,) format!("{{{}}}_{}", shard_key, self.config.drainer_stream_name,)
} }
pub fn redis_conn(
&self,
) -> errors::CustomResult<Arc<redis_interface::RedisConnectionPool>, RedisError> {
if self
.redis_conn
.is_redis_available
.load(atomic::Ordering::SeqCst)
{
Ok(self.redis_conn.clone())
} else {
Err(RedisError::RedisConnectionError.into())
}
}
#[cfg(feature = "kv_store")] #[cfg(feature = "kv_store")]
pub(crate) async fn push_to_drainer_stream<T>( pub(crate) async fn push_to_drainer_stream<T>(
&self, &self,