mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-28 04:04:55 +08:00
refactor(storage_impl): Integrate the composite store from external crate (#1921)
This commit is contained in:
@ -11,15 +11,20 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
# First Party dependencies
|
||||
common_utils = { version = "0.1.0", path = "../common_utils" }
|
||||
diesel_models = { version = "0.1.0", path = "../diesel_models" }
|
||||
masking = { version = "0.1.0", path = "../masking" }
|
||||
redis_interface = { version = "0.1.0", path = "../redis_interface" }
|
||||
diesel_models = { version = "0.1.0", path = "../diesel_models" }
|
||||
router_env = { version = "0.1.0", path = "../router_env" }
|
||||
|
||||
# Third party crates
|
||||
bb8 = "0.8.1"
|
||||
diesel = { version = "2.1.0", default-features = false, features = ["postgres"] }
|
||||
async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "be3d9bce50051d8c0e0c06078e8066cc27db3001" }
|
||||
async-trait = "0.1.72"
|
||||
bb8 = "0.8.1"
|
||||
crc32fast = "1.3.2"
|
||||
diesel = { version = "2.1.0", default-features = false, features = ["postgres"] }
|
||||
dyn-clone = "1.0.12"
|
||||
error-stack = "0.3.1"
|
||||
tokio = { version = "1.28.2", features = ["rt-multi-thread"] }
|
||||
moka = { version = "0.11.3", features = ["future"] }
|
||||
once_cell = "1.18.0"
|
||||
tokio = { version = "1.28.2", features = ["rt-multi-thread"] }
|
||||
|
||||
@ -9,14 +9,14 @@ pub type PgPool = bb8::Pool<async_bb8_diesel::ConnectionManager<PgConnection>>;
|
||||
pub type PgPooledConn = async_bb8_diesel::Connection<PgConnection>;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait DatabaseStore {
|
||||
pub trait DatabaseStore: Clone + Send {
|
||||
type Config;
|
||||
async fn new(config: Self::Config, test_transaction: bool) -> Self;
|
||||
fn get_write_pool(&self) -> PgPool;
|
||||
fn get_read_pool(&self) -> PgPool;
|
||||
fn get_master_pool(&self) -> &PgPool;
|
||||
fn get_replica_pool(&self) -> &PgPool;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Store {
|
||||
pub master_pool: PgPool,
|
||||
}
|
||||
@ -30,16 +30,16 @@ impl DatabaseStore for Store {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_write_pool(&self) -> PgPool {
|
||||
self.master_pool.clone()
|
||||
fn get_master_pool(&self) -> &PgPool {
|
||||
&self.master_pool
|
||||
}
|
||||
|
||||
fn get_read_pool(&self) -> PgPool {
|
||||
self.master_pool.clone()
|
||||
fn get_replica_pool(&self) -> &PgPool {
|
||||
&self.master_pool
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ReplicaStore {
|
||||
pub master_pool: PgPool,
|
||||
pub replica_pool: PgPool,
|
||||
@ -58,12 +58,12 @@ impl DatabaseStore for ReplicaStore {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_write_pool(&self) -> PgPool {
|
||||
self.master_pool.clone()
|
||||
fn get_master_pool(&self) -> &PgPool {
|
||||
&self.master_pool
|
||||
}
|
||||
|
||||
fn get_read_pool(&self) -> PgPool {
|
||||
self.replica_pool.clone()
|
||||
fn get_replica_pool(&self) -> &PgPool {
|
||||
&self.replica_pool
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,21 +1,72 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use error_stack::ResultExt;
|
||||
use masking::StrongSecret;
|
||||
use redis::CacheStore;
|
||||
use redis::{kv_store::RedisConnInterface, RedisStore};
|
||||
pub mod config;
|
||||
pub mod diesel;
|
||||
pub mod database;
|
||||
pub mod payments;
|
||||
pub mod redis;
|
||||
pub mod refund;
|
||||
|
||||
pub use crate::diesel::store::DatabaseStore;
|
||||
use database::store::PgPool;
|
||||
use redis_interface::errors::RedisError;
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub use crate::database::store::DatabaseStore;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RouterStore<T: DatabaseStore> {
|
||||
db_store: T,
|
||||
cache_store: CacheStore,
|
||||
cache_store: RedisStore,
|
||||
master_encryption_key: StrongSecret<Vec<u8>>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<T: DatabaseStore> DatabaseStore for RouterStore<T>
|
||||
where
|
||||
T::Config: Send,
|
||||
{
|
||||
type Config = (
|
||||
T::Config,
|
||||
redis_interface::RedisSettings,
|
||||
StrongSecret<Vec<u8>>,
|
||||
tokio::sync::oneshot::Sender<()>,
|
||||
&'static str,
|
||||
);
|
||||
async fn new(config: Self::Config, test_transaction: bool) -> Self {
|
||||
let (db_conf, cache_conf, encryption_key, cache_error_signal, inmemory_cache_stream) =
|
||||
config;
|
||||
if test_transaction {
|
||||
Self::test_store(db_conf, &cache_conf, encryption_key).await
|
||||
} else {
|
||||
Self::from_config(
|
||||
db_conf,
|
||||
&cache_conf,
|
||||
encryption_key,
|
||||
cache_error_signal,
|
||||
inmemory_cache_stream,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
fn get_master_pool(&self) -> &PgPool {
|
||||
self.db_store.get_master_pool()
|
||||
}
|
||||
fn get_replica_pool(&self) -> &PgPool {
|
||||
self.db_store.get_replica_pool()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: DatabaseStore> RedisConnInterface for RouterStore<T> {
|
||||
fn get_redis_conn(
|
||||
&self,
|
||||
) -> error_stack::Result<Arc<redis_interface::RedisConnectionPool>, RedisError> {
|
||||
self.cache_store.get_redis_conn()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: DatabaseStore> RouterStore<T> {
|
||||
pub async fn new(
|
||||
pub async fn from_config(
|
||||
db_conf: T::Config,
|
||||
cache_conf: &redis_interface::RedisSettings,
|
||||
encryption_key: StrongSecret<Vec<u8>>,
|
||||
@ -25,7 +76,7 @@ impl<T: DatabaseStore> RouterStore<T> {
|
||||
// TODO: create an error enum and return proper error here
|
||||
let db_store = T::new(db_conf, false).await;
|
||||
#[allow(clippy::expect_used)]
|
||||
let cache_store = CacheStore::new(cache_conf)
|
||||
let cache_store = RedisStore::new(cache_conf)
|
||||
.await
|
||||
.expect("Failed to create cache store");
|
||||
cache_store.set_error_callback(cache_error_signal);
|
||||
@ -40,6 +91,11 @@ impl<T: DatabaseStore> RouterStore<T> {
|
||||
master_encryption_key: encryption_key,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn master_key(&self) -> &StrongSecret<Vec<u8>> {
|
||||
&self.master_encryption_key
|
||||
}
|
||||
|
||||
pub async fn test_store(
|
||||
db_conf: T::Config,
|
||||
cache_conf: &redis_interface::RedisSettings,
|
||||
@ -48,7 +104,7 @@ impl<T: DatabaseStore> RouterStore<T> {
|
||||
// TODO: create an error enum and return proper error here
|
||||
let db_store = T::new(db_conf, true).await;
|
||||
#[allow(clippy::expect_used)]
|
||||
let cache_store = CacheStore::new(cache_conf)
|
||||
let cache_store = RedisStore::new(cache_conf)
|
||||
.await
|
||||
.expect("Failed to create cache store");
|
||||
Self {
|
||||
@ -59,12 +115,39 @@ impl<T: DatabaseStore> RouterStore<T> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct KVRouterStore<T: DatabaseStore> {
|
||||
router_store: RouterStore<T>,
|
||||
drainer_stream_name: String,
|
||||
drainer_num_partitions: u8,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<T> DatabaseStore for KVRouterStore<T>
|
||||
where
|
||||
RouterStore<T>: DatabaseStore,
|
||||
T: DatabaseStore,
|
||||
{
|
||||
type Config = (RouterStore<T>, String, u8);
|
||||
async fn new(config: Self::Config, _test_transaction: bool) -> Self {
|
||||
let (router_store, drainer_stream_name, drainer_num_partitions) = config;
|
||||
Self::from_store(router_store, drainer_stream_name, drainer_num_partitions)
|
||||
}
|
||||
fn get_master_pool(&self) -> &PgPool {
|
||||
self.router_store.get_master_pool()
|
||||
}
|
||||
fn get_replica_pool(&self) -> &PgPool {
|
||||
self.router_store.get_replica_pool()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: DatabaseStore> RedisConnInterface for KVRouterStore<T> {
|
||||
fn get_redis_conn(
|
||||
&self,
|
||||
) -> error_stack::Result<Arc<redis_interface::RedisConnectionPool>, RedisError> {
|
||||
self.router_store.get_redis_conn()
|
||||
}
|
||||
}
|
||||
impl<T: DatabaseStore> KVRouterStore<T> {
|
||||
pub fn from_store(
|
||||
store: RouterStore<T>,
|
||||
@ -78,16 +161,19 @@ impl<T: DatabaseStore> KVRouterStore<T> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn master_key(&self) -> &StrongSecret<Vec<u8>> {
|
||||
self.router_store.master_key()
|
||||
}
|
||||
|
||||
pub fn get_drainer_stream_name(&self, shard_key: &str) -> String {
|
||||
format!("{{{}}}_{}", shard_key, self.drainer_stream_name)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
async fn push_to_drainer_stream<R>(
|
||||
pub async fn push_to_drainer_stream<R>(
|
||||
&self,
|
||||
redis_entry: diesel_models::kv::TypedSql,
|
||||
partition_key: redis::kv_store::PartitionKey<'_>,
|
||||
) -> error_stack::Result<(), redis_interface::errors::RedisError>
|
||||
) -> error_stack::Result<(), RedisError>
|
||||
where
|
||||
R: crate::redis::kv_store::KvStorePartition,
|
||||
{
|
||||
@ -101,9 +187,9 @@ impl<T: DatabaseStore> KVRouterStore<T> {
|
||||
&redis_interface::RedisEntryId::AutoGeneratedID,
|
||||
redis_entry
|
||||
.to_field_value_pairs()
|
||||
.change_context(redis_interface::errors::RedisError::JsonSerializationFailed)?,
|
||||
.change_context(RedisError::JsonSerializationFailed)?,
|
||||
)
|
||||
.await
|
||||
.change_context(redis_interface::errors::RedisError::StreamAppendFailed)
|
||||
.change_context(RedisError::StreamAppendFailed)
|
||||
}
|
||||
}
|
||||
|
||||
6
crates/storage_impl/src/payments.rs
Normal file
6
crates/storage_impl/src/payments.rs
Normal file
@ -0,0 +1,6 @@
|
||||
use diesel_models::{payment_attempt::PaymentAttempt, payment_intent::PaymentIntent};
|
||||
|
||||
use crate::redis::kv_store::KvStorePartition;
|
||||
|
||||
impl KvStorePartition for PaymentIntent {}
|
||||
impl KvStorePartition for PaymentAttempt {}
|
||||
@ -1,16 +1,30 @@
|
||||
pub mod cache;
|
||||
pub mod kv_store;
|
||||
pub mod pub_sub;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::{atomic, Arc};
|
||||
|
||||
use error_stack::{IntoReport, ResultExt};
|
||||
use redis_interface::PubsubInterface;
|
||||
use router_env::logger;
|
||||
|
||||
pub struct CacheStore {
|
||||
use self::{kv_store::RedisConnInterface, pub_sub::PubSubInterface};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RedisStore {
|
||||
// Maybe expose the redis_conn via traits instead of the making the field public
|
||||
pub(crate) redis_conn: Arc<redis_interface::RedisConnectionPool>,
|
||||
}
|
||||
|
||||
impl CacheStore {
|
||||
impl std::fmt::Debug for RedisStore {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("CacheStore")
|
||||
.field("redis_conn", &"Redis conn doesn't implement debug")
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl RedisStore {
|
||||
pub async fn new(
|
||||
conf: &redis_interface::RedisSettings,
|
||||
) -> error_stack::Result<Self, redis_interface::errors::RedisError> {
|
||||
@ -39,13 +53,31 @@ impl CacheStore {
|
||||
.into_report()
|
||||
.change_context(redis_interface::errors::RedisError::SubscribeError)?;
|
||||
|
||||
// TODO: Handle on message failures
|
||||
// let redis_clone = self.redis_conn.clone();
|
||||
// tokio::spawn(async move {
|
||||
// if let Err(e) = redis_clone.on_message().await {
|
||||
// logger::error!(pubsub_err=?e);
|
||||
// }
|
||||
// });
|
||||
let redis_clone = self.redis_conn.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = redis_clone.on_message().await {
|
||||
logger::error!(pubsub_err=?e);
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl RedisConnInterface for RedisStore {
|
||||
fn get_redis_conn(
|
||||
&self,
|
||||
) -> error_stack::Result<
|
||||
Arc<redis_interface::RedisConnectionPool>,
|
||||
redis_interface::errors::RedisError,
|
||||
> {
|
||||
if self
|
||||
.redis_conn
|
||||
.is_redis_available
|
||||
.load(atomic::Ordering::SeqCst)
|
||||
{
|
||||
Ok(self.redis_conn.clone())
|
||||
} else {
|
||||
Err(redis_interface::errors::RedisError::RedisConnectionError.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
166
crates/storage_impl/src/redis/cache.rs
Normal file
166
crates/storage_impl/src/redis/cache.rs
Normal file
@ -0,0 +1,166 @@
|
||||
use std::{any::Any, borrow::Cow, sync::Arc};
|
||||
|
||||
use common_utils::errors;
|
||||
use dyn_clone::DynClone;
|
||||
use error_stack::Report;
|
||||
use moka::future::Cache as MokaCache;
|
||||
use once_cell::sync::Lazy;
|
||||
use redis_interface::RedisValue;
|
||||
|
||||
/// Prefix for config cache key
|
||||
const CONFIG_CACHE_PREFIX: &str = "config";
|
||||
|
||||
/// Prefix for accounts cache key
|
||||
const ACCOUNTS_CACHE_PREFIX: &str = "accounts";
|
||||
|
||||
/// Prefix for all kinds of cache key
|
||||
const ALL_CACHE_PREFIX: &str = "all_cache_kind";
|
||||
|
||||
/// Time to live 30 mins
|
||||
const CACHE_TTL: u64 = 30 * 60;
|
||||
|
||||
/// Time to idle 10 mins
|
||||
const CACHE_TTI: u64 = 10 * 60;
|
||||
|
||||
/// Max Capacity of Cache in MB
|
||||
const MAX_CAPACITY: u64 = 30;
|
||||
|
||||
/// Config Cache with time_to_live as 30 mins and time_to_idle as 10 mins.
|
||||
pub static CONFIG_CACHE: Lazy<Cache> = Lazy::new(|| Cache::new(CACHE_TTL, CACHE_TTI, None));
|
||||
|
||||
/// Accounts cache with time_to_live as 30 mins and size limit
|
||||
pub static ACCOUNTS_CACHE: Lazy<Cache> =
|
||||
Lazy::new(|| Cache::new(CACHE_TTL, CACHE_TTI, Some(MAX_CAPACITY)));
|
||||
|
||||
/// Trait which defines the behaviour of types that's gonna be stored in Cache
|
||||
pub trait Cacheable: Any + Send + Sync + DynClone {
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
}
|
||||
|
||||
pub enum CacheKind<'a> {
|
||||
Config(Cow<'a, str>),
|
||||
Accounts(Cow<'a, str>),
|
||||
All(Cow<'a, str>),
|
||||
}
|
||||
|
||||
impl<'a> From<CacheKind<'a>> for RedisValue {
|
||||
fn from(kind: CacheKind<'a>) -> Self {
|
||||
let value = match kind {
|
||||
CacheKind::Config(s) => format!("{CONFIG_CACHE_PREFIX},{s}"),
|
||||
CacheKind::Accounts(s) => format!("{ACCOUNTS_CACHE_PREFIX},{s}"),
|
||||
CacheKind::All(s) => format!("{ALL_CACHE_PREFIX},{s}"),
|
||||
};
|
||||
Self::from_string(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> TryFrom<RedisValue> for CacheKind<'a> {
|
||||
type Error = Report<errors::ValidationError>;
|
||||
fn try_from(kind: RedisValue) -> Result<Self, Self::Error> {
|
||||
let validation_err = errors::ValidationError::InvalidValue {
|
||||
message: "Invalid publish key provided in pubsub".into(),
|
||||
};
|
||||
let kind = kind.as_string().ok_or(validation_err.clone())?;
|
||||
let split = kind.split_once(',').ok_or(validation_err.clone())?;
|
||||
match split.0 {
|
||||
ACCOUNTS_CACHE_PREFIX => Ok(Self::Accounts(Cow::Owned(split.1.to_string()))),
|
||||
CONFIG_CACHE_PREFIX => Ok(Self::Config(Cow::Owned(split.1.to_string()))),
|
||||
ALL_CACHE_PREFIX => Ok(Self::All(Cow::Owned(split.1.to_string()))),
|
||||
_ => Err(validation_err.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Cacheable for T
|
||||
where
|
||||
T: Any + Clone + Send + Sync,
|
||||
{
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
dyn_clone::clone_trait_object!(Cacheable);
|
||||
|
||||
pub struct Cache {
|
||||
inner: MokaCache<String, Arc<dyn Cacheable>>,
|
||||
}
|
||||
|
||||
impl std::ops::Deref for Cache {
|
||||
type Target = MokaCache<String, Arc<dyn Cacheable>>;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl Cache {
|
||||
/// With given `time_to_live` and `time_to_idle` creates a moka cache.
|
||||
///
|
||||
/// `time_to_live`: Time in seconds before an object is stored in a caching system before it’s deleted
|
||||
/// `time_to_idle`: Time in seconds before a `get` or `insert` operation an object is stored in a caching system before it's deleted
|
||||
/// `max_capacity`: Max size in MB's that the cache can hold
|
||||
pub fn new(time_to_live: u64, time_to_idle: u64, max_capacity: Option<u64>) -> Self {
|
||||
let mut cache_builder = MokaCache::builder()
|
||||
.eviction_listener_with_queued_delivery_mode(|_, _, _| {})
|
||||
.time_to_live(std::time::Duration::from_secs(time_to_live))
|
||||
.time_to_idle(std::time::Duration::from_secs(time_to_idle));
|
||||
|
||||
if let Some(capacity) = max_capacity {
|
||||
cache_builder = cache_builder.max_capacity(capacity * 1024 * 1024);
|
||||
}
|
||||
|
||||
Self {
|
||||
inner: cache_builder.build(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn push<T: Cacheable>(&self, key: String, val: T) {
|
||||
self.insert(key, Arc::new(val)).await;
|
||||
}
|
||||
|
||||
pub fn get_val<T: Clone + Cacheable>(&self, key: &str) -> Option<T> {
|
||||
let val = self.get(key)?;
|
||||
(*val).as_any().downcast_ref::<T>().cloned()
|
||||
}
|
||||
|
||||
pub async fn remove(&self, key: &str) {
|
||||
self.invalidate(key).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod cache_tests {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn construct_and_get_cache() {
|
||||
let cache = Cache::new(1800, 1800, None);
|
||||
cache.push("key".to_string(), "val".to_string()).await;
|
||||
assert_eq!(cache.get_val::<String>("key"), Some(String::from("val")));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn eviction_on_size_test() {
|
||||
let cache = Cache::new(2, 2, Some(0));
|
||||
cache.push("key".to_string(), "val".to_string()).await;
|
||||
assert_eq!(cache.get_val::<String>("key"), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn invalidate_cache_for_key() {
|
||||
let cache = Cache::new(1800, 1800, None);
|
||||
cache.push("key".to_string(), "val".to_string()).await;
|
||||
|
||||
cache.remove("key").await;
|
||||
|
||||
assert_eq!(cache.get_val::<String>("key"), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn eviction_on_time_test() {
|
||||
let cache = Cache::new(2, 2, None);
|
||||
cache.push("key".to_string(), "val".to_string()).await;
|
||||
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
|
||||
assert_eq!(cache.get_val::<String>("key"), None);
|
||||
}
|
||||
}
|
||||
@ -1,4 +1,6 @@
|
||||
pub(crate) trait KvStorePartition {
|
||||
use std::sync::Arc;
|
||||
|
||||
pub trait KvStorePartition {
|
||||
fn partition_number(key: PartitionKey<'_>, num_partitions: u8) -> u32 {
|
||||
crc32fast::hash(key.to_string().as_bytes()) % u32::from(num_partitions)
|
||||
}
|
||||
@ -9,7 +11,7 @@ pub(crate) trait KvStorePartition {
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) enum PartitionKey<'a> {
|
||||
pub enum PartitionKey<'a> {
|
||||
MerchantIdPaymentId {
|
||||
merchant_id: &'a str,
|
||||
payment_id: &'a str,
|
||||
@ -26,3 +28,12 @@ impl<'a> std::fmt::Display for PartitionKey<'a> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait RedisConnInterface {
|
||||
fn get_redis_conn(
|
||||
&self,
|
||||
) -> error_stack::Result<
|
||||
Arc<redis_interface::RedisConnectionPool>,
|
||||
redis_interface::errors::RedisError,
|
||||
>;
|
||||
}
|
||||
|
||||
89
crates/storage_impl/src/redis/pub_sub.rs
Normal file
89
crates/storage_impl/src/redis/pub_sub.rs
Normal file
@ -0,0 +1,89 @@
|
||||
use error_stack::{IntoReport, ResultExt};
|
||||
use redis_interface::{errors as redis_errors, PubsubInterface, RedisValue};
|
||||
use router_env::logger;
|
||||
|
||||
use crate::redis::cache::{CacheKind, ACCOUNTS_CACHE, CONFIG_CACHE};
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait PubSubInterface {
|
||||
async fn subscribe(&self, channel: &str) -> error_stack::Result<(), redis_errors::RedisError>;
|
||||
|
||||
async fn publish<'a>(
|
||||
&self,
|
||||
channel: &str,
|
||||
key: CacheKind<'a>,
|
||||
) -> error_stack::Result<usize, redis_errors::RedisError>;
|
||||
|
||||
async fn on_message(&self) -> error_stack::Result<(), redis_errors::RedisError>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl PubSubInterface for redis_interface::RedisConnectionPool {
|
||||
#[inline]
|
||||
async fn subscribe(&self, channel: &str) -> error_stack::Result<(), redis_errors::RedisError> {
|
||||
// Spawns a task that will automatically re-subscribe to any channels or channel patterns used by the client.
|
||||
self.subscriber.manage_subscriptions();
|
||||
|
||||
self.subscriber
|
||||
.subscribe(channel)
|
||||
.await
|
||||
.into_report()
|
||||
.change_context(redis_errors::RedisError::SubscribeError)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn publish<'a>(
|
||||
&self,
|
||||
channel: &str,
|
||||
key: CacheKind<'a>,
|
||||
) -> error_stack::Result<usize, redis_errors::RedisError> {
|
||||
self.publisher
|
||||
.publish(channel, RedisValue::from(key).into_inner())
|
||||
.await
|
||||
.into_report()
|
||||
.change_context(redis_errors::RedisError::SubscribeError)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn on_message(&self) -> error_stack::Result<(), redis_errors::RedisError> {
|
||||
logger::debug!("Started on message");
|
||||
let mut rx = self.subscriber.on_message();
|
||||
while let Ok(message) = rx.recv().await {
|
||||
logger::debug!("Invalidating {message:?}");
|
||||
let key: CacheKind<'_> = match RedisValue::new(message.value)
|
||||
.try_into()
|
||||
.change_context(redis_errors::RedisError::OnMessageError)
|
||||
{
|
||||
Ok(value) => value,
|
||||
Err(err) => {
|
||||
logger::error!(value_conversion_err=?err);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let key = match key {
|
||||
CacheKind::Config(key) => {
|
||||
CONFIG_CACHE.invalidate(key.as_ref()).await;
|
||||
key
|
||||
}
|
||||
CacheKind::Accounts(key) => {
|
||||
ACCOUNTS_CACHE.invalidate(key.as_ref()).await;
|
||||
key
|
||||
}
|
||||
CacheKind::All(key) => {
|
||||
CONFIG_CACHE.invalidate(key.as_ref()).await;
|
||||
ACCOUNTS_CACHE.invalidate(key.as_ref()).await;
|
||||
key
|
||||
}
|
||||
};
|
||||
|
||||
self.delete_key(key.as_ref())
|
||||
.await
|
||||
.map_err(|err| logger::error!("Error while deleting redis key: {err:?}"))
|
||||
.ok();
|
||||
|
||||
logger::debug!("Done invalidating {key}");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
5
crates/storage_impl/src/refund.rs
Normal file
5
crates/storage_impl/src/refund.rs
Normal file
@ -0,0 +1,5 @@
|
||||
use diesel_models::refund::Refund;
|
||||
|
||||
use crate::redis::kv_store::KvStorePartition;
|
||||
|
||||
impl KvStorePartition for Refund {}
|
||||
Reference in New Issue
Block a user