refactor(storage): add redis structs to storage impls (#1910)

This commit is contained in:
Sampras Lopes
2023-08-11 12:50:38 +05:30
committed by GitHub
parent 4311f2ce5e
commit 3e269663c3
9 changed files with 306 additions and 133 deletions

View File

@ -8,17 +8,35 @@ use crate::config::Database;
pub type PgPool = bb8::Pool<async_bb8_diesel::ConnectionManager<PgConnection>>;
pub type PgPooledConn = async_bb8_diesel::Connection<PgConnection>;
#[async_trait::async_trait]
pub trait DatabaseStore {
type Config;
async fn new(config: Self::Config, test_transaction: bool) -> Self;
fn get_write_pool(&self) -> PgPool;
fn get_read_pool(&self) -> PgPool;
}
#[derive(Clone)]
pub struct Store {
pub master_pool: PgPool,
}
impl Store {
pub async fn new(config: &Database, test_transaction: bool) -> Self {
#[async_trait::async_trait]
impl DatabaseStore for Store {
type Config = Database;
async fn new(config: Database, test_transaction: bool) -> Self {
Self {
master_pool: diesel_make_pg_pool(config, test_transaction).await,
master_pool: diesel_make_pg_pool(&config, test_transaction).await,
}
}
fn get_write_pool(&self) -> PgPool {
self.master_pool.clone()
}
fn get_read_pool(&self) -> PgPool {
self.master_pool.clone()
}
}
#[derive(Clone)]
@ -27,19 +45,26 @@ pub struct ReplicaStore {
pub replica_pool: PgPool,
}
impl ReplicaStore {
pub async fn new(
master_config: &Database,
replica_config: &Database,
test_transaction: bool,
) -> Self {
let master_pool = diesel_make_pg_pool(master_config, test_transaction).await;
let replica_pool = diesel_make_pg_pool(replica_config, test_transaction).await;
#[async_trait::async_trait]
impl DatabaseStore for ReplicaStore {
type Config = (Database, Database);
async fn new(config: (Database, Database), test_transaction: bool) -> Self {
let (master_config, replica_config) = config;
let master_pool = diesel_make_pg_pool(&master_config, test_transaction).await;
let replica_pool = diesel_make_pg_pool(&replica_config, test_transaction).await;
Self {
master_pool,
replica_pool,
}
}
fn get_write_pool(&self) -> PgPool {
self.master_pool.clone()
}
fn get_read_pool(&self) -> PgPool {
self.replica_pool.clone()
}
}
#[allow(clippy::expect_used)]

View File

@ -1,2 +1,109 @@
use error_stack::ResultExt;
use masking::StrongSecret;
use redis::CacheStore;
pub mod config;
pub mod diesel;
pub mod redis;
pub use crate::diesel::store::DatabaseStore;
#[allow(dead_code)]
pub struct RouterStore<T: DatabaseStore> {
db_store: T,
cache_store: CacheStore,
master_encryption_key: StrongSecret<Vec<u8>>,
}
impl<T: DatabaseStore> RouterStore<T> {
pub async fn new(
db_conf: T::Config,
cache_conf: &redis_interface::RedisSettings,
encryption_key: StrongSecret<Vec<u8>>,
cache_error_signal: tokio::sync::oneshot::Sender<()>,
inmemory_cache_stream: &str,
) -> Self {
// TODO: create an error enum and return proper error here
let db_store = T::new(db_conf, false).await;
#[allow(clippy::expect_used)]
let cache_store = CacheStore::new(cache_conf)
.await
.expect("Failed to create cache store");
cache_store.set_error_callback(cache_error_signal);
#[allow(clippy::expect_used)]
cache_store
.subscribe_to_channel(inmemory_cache_stream)
.await
.expect("Failed to subscribe to inmemory cache stream");
Self {
db_store,
cache_store,
master_encryption_key: encryption_key,
}
}
pub async fn test_store(
db_conf: T::Config,
cache_conf: &redis_interface::RedisSettings,
encryption_key: StrongSecret<Vec<u8>>,
) -> Self {
// TODO: create an error enum and return proper error here
let db_store = T::new(db_conf, true).await;
#[allow(clippy::expect_used)]
let cache_store = CacheStore::new(cache_conf)
.await
.expect("Failed to create cache store");
Self {
db_store,
cache_store,
master_encryption_key: encryption_key,
}
}
}
pub struct KVRouterStore<T: DatabaseStore> {
router_store: RouterStore<T>,
drainer_stream_name: String,
drainer_num_partitions: u8,
}
impl<T: DatabaseStore> KVRouterStore<T> {
pub fn from_store(
store: RouterStore<T>,
drainer_stream_name: String,
drainer_num_partitions: u8,
) -> Self {
Self {
router_store: store,
drainer_stream_name,
drainer_num_partitions,
}
}
pub fn get_drainer_stream_name(&self, shard_key: &str) -> String {
format!("{{{}}}_{}", shard_key, self.drainer_stream_name)
}
#[allow(dead_code)]
async fn push_to_drainer_stream<R>(
&self,
redis_entry: diesel_models::kv::TypedSql,
partition_key: redis::kv_store::PartitionKey<'_>,
) -> error_stack::Result<(), redis_interface::errors::RedisError>
where
R: crate::redis::kv_store::KvStorePartition,
{
let shard_key = R::shard_key(partition_key, self.drainer_num_partitions);
let stream_name = self.get_drainer_stream_name(&shard_key);
self.router_store
.cache_store
.redis_conn
.stream_append_entry(
&stream_name,
&redis_interface::RedisEntryId::AutoGeneratedID,
redis_entry
.to_field_value_pairs()
.change_context(redis_interface::errors::RedisError::JsonSerializationFailed)?,
)
.await
.change_context(redis_interface::errors::RedisError::StreamAppendFailed)
}
}

View File

@ -0,0 +1,51 @@
pub mod kv_store;
use std::sync::Arc;
use error_stack::{IntoReport, ResultExt};
use redis_interface::PubsubInterface;
pub struct CacheStore {
// Maybe expose the redis_conn via traits instead of the making the field public
pub(crate) redis_conn: Arc<redis_interface::RedisConnectionPool>,
}
impl CacheStore {
pub async fn new(
conf: &redis_interface::RedisSettings,
) -> error_stack::Result<Self, redis_interface::errors::RedisError> {
Ok(Self {
redis_conn: Arc::new(redis_interface::RedisConnectionPool::new(conf).await?),
})
}
pub fn set_error_callback(&self, callback: tokio::sync::oneshot::Sender<()>) {
let redis_clone = self.redis_conn.clone();
tokio::spawn(async move {
redis_clone.on_error(callback).await;
});
}
pub async fn subscribe_to_channel(
&self,
channel: &str,
) -> error_stack::Result<(), redis_interface::errors::RedisError> {
self.redis_conn.subscriber.manage_subscriptions();
self.redis_conn
.subscriber
.subscribe::<(), _>(channel)
.await
.into_report()
.change_context(redis_interface::errors::RedisError::SubscribeError)?;
// TODO: Handle on message failures
// let redis_clone = self.redis_conn.clone();
// tokio::spawn(async move {
// if let Err(e) = redis_clone.on_message().await {
// logger::error!(pubsub_err=?e);
// }
// });
Ok(())
}
}

View File

@ -0,0 +1,28 @@
pub(crate) trait KvStorePartition {
fn partition_number(key: PartitionKey<'_>, num_partitions: u8) -> u32 {
crc32fast::hash(key.to_string().as_bytes()) % u32::from(num_partitions)
}
fn shard_key(key: PartitionKey<'_>, num_partitions: u8) -> String {
format!("shard_{}", Self::partition_number(key, num_partitions))
}
}
#[allow(unused)]
pub(crate) enum PartitionKey<'a> {
MerchantIdPaymentId {
merchant_id: &'a str,
payment_id: &'a str,
},
}
impl<'a> std::fmt::Display for PartitionKey<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match *self {
PartitionKey::MerchantIdPaymentId {
merchant_id,
payment_id,
} => f.write_str(&format!("mid_{merchant_id}_pid_{payment_id}")),
}
}
}