From 9f199d9ab8fb7360bda2661a7014aea8906b74f9 Mon Sep 17 00:00:00 2001 From: Sampras Lopes Date: Mon, 14 Aug 2023 18:46:31 +0530 Subject: [PATCH] refactor(storage_impl): Integrate the composite store from external crate (#1921) --- .github/workflows/CI-pr.yml | 20 ++ Cargo.lock | 7 +- crates/cards/src/lib.rs | 5 +- crates/router/Cargo.toml | 2 - crates/router/src/connection.rs | 14 +- crates/router/src/connector/cashtocode.rs | 4 +- crates/router/src/connector/tsys.rs | 6 +- crates/router/src/core/cache.rs | 2 +- crates/router/src/core/files/helpers.rs | 6 +- .../router/src/core/payment_methods/vault.rs | 2 +- crates/router/src/core/payouts.rs | 2 +- crates/router/src/core/payouts/validator.rs | 2 +- crates/router/src/db.rs | 13 +- crates/router/src/db/api_keys.rs | 17 +- crates/router/src/db/cache.rs | 14 +- crates/router/src/db/configs.rs | 9 +- crates/router/src/db/ephemeral_key.rs | 13 +- crates/router/src/db/merchant_account.rs | 10 +- .../src/db/merchant_connector_account.rs | 19 +- crates/router/src/db/merchant_key_store.rs | 4 +- crates/router/src/db/payment_attempt.rs | 25 +- crates/router/src/db/payment_intent.rs | 13 +- crates/router/src/db/queue.rs | 13 +- crates/router/src/db/refund.rs | 21 +- crates/router/src/lib.rs | 1 - crates/router/src/routes/app.rs | 4 +- crates/router/src/services.rs | 319 ++++-------------- .../src/types/storage/payment_attempt.rs | 3 - .../src/types/storage/payment_intent.rs | 3 - crates/router/src/types/storage/refund.rs | 3 - .../router/src/utils/storage_partitioning.rs | 28 +- crates/router/tests/cache.rs | 7 +- crates/router/tests/connectors/payeezy.rs | 2 +- crates/router_derive/src/macros/diesel.rs | 1 + crates/storage_impl/Cargo.toml | 13 +- .../src/{diesel.rs => database.rs} | 0 .../src/{diesel => database}/store.rs | 26 +- crates/storage_impl/src/lib.rs | 112 +++++- crates/storage_impl/src/payments.rs | 6 + crates/storage_impl/src/redis.rs | 52 ++- .../src => storage_impl/src/redis}/cache.rs | 3 +- crates/storage_impl/src/redis/kv_store.rs | 15 +- crates/storage_impl/src/redis/pub_sub.rs | 89 +++++ crates/storage_impl/src/refund.rs | 5 + 44 files changed, 490 insertions(+), 445 deletions(-) rename crates/storage_impl/src/{diesel.rs => database.rs} (100%) rename crates/storage_impl/src/{diesel => database}/store.rs (85%) create mode 100644 crates/storage_impl/src/payments.rs rename crates/{router/src => storage_impl/src/redis}/cache.rs (99%) create mode 100644 crates/storage_impl/src/redis/pub_sub.rs create mode 100644 crates/storage_impl/src/refund.rs diff --git a/.github/workflows/CI-pr.yml b/.github/workflows/CI-pr.yml index 61d4c6f7df..61aaf5951f 100644 --- a/.github/workflows/CI-pr.yml +++ b/.github/workflows/CI-pr.yml @@ -150,6 +150,11 @@ jobs: else echo "router_changes_exist=true" >> $GITHUB_ENV fi + if git diff --exit-code --quiet origin/$GITHUB_BASE_REF -- crates/storage_impl/; then + echo "storage_impl_changes_exist=false" >> $GITHUB_ENV + else + echo "storage_impl_changes_exist=true" >> $GITHUB_ENV + fi if git diff --exit-code --quiet origin/$GITHUB_BASE_REF -- crates/router_derive/; then echo "router_derive_changes_exist=false" >> $GITHUB_ENV else @@ -216,6 +221,11 @@ jobs: shell: bash run: cargo hack check --each-feature --skip kms,basilisk,kv_store,accounts_cache,openapi --no-dev-deps -p router + - name: Cargo hack storage_impl + if: env.storage_impl_changes_exist == 'true' + shell: bash + run: cargo hack check --each-feature --no-dev-deps -p storage_impl + - name: Cargo hack router_derive if: env.router_derive_changes_exist == 'true' shell: bash @@ -369,6 +379,11 @@ jobs: else echo "router_derive_changes_exist=true" >> $GITHUB_ENV fi + if git diff --exit-code --quiet origin/$GITHUB_BASE_REF -- crates/storage_impl/; then + echo "storage_impl_changes_exist=false" >> $GITHUB_ENV + else + echo "storage_impl_changes_exist=true" >> $GITHUB_ENV + fi if git diff --exit-code --quiet origin/$GITHUB_BASE_REF -- crates/router_env/; then echo "router_env_changes_exist=false" >> $GITHUB_ENV else @@ -435,6 +450,11 @@ jobs: shell: bash run: cargo hack check --each-feature --no-dev-deps -p router_derive + - name: Cargo hack storage_impl + if: env.storage_impl_changes_exist == 'true' + shell: bash + run: cargo hack check --each-feature --no-dev-deps -p storage_impl + - name: Cargo hack router_env if: env.router_env_changes_exist == 'true' shell: bash diff --git a/Cargo.lock b/Cargo.lock index 803f744a16..e8f0d09863 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4014,7 +4014,6 @@ dependencies = [ "clap", "common_utils", "config", - "crc32fast", "derive_deref", "diesel", "diesel_models", @@ -4034,7 +4033,6 @@ dependencies = [ "maud", "mimalloc", "mime", - "moka", "nanoid", "num_cpus", "once_cell", @@ -4668,12 +4666,17 @@ dependencies = [ "async-bb8-diesel", "async-trait", "bb8", + "common_utils", "crc32fast", "diesel", "diesel_models", + "dyn-clone", "error-stack", "masking", + "moka", + "once_cell", "redis_interface", + "router_env", "tokio", ] diff --git a/crates/cards/src/lib.rs b/crates/cards/src/lib.rs index 52e62e0912..d47d4f33a1 100644 --- a/crates/cards/src/lib.rs +++ b/crates/cards/src/lib.rs @@ -4,10 +4,7 @@ use std::ops::Deref; use common_utils::{date_time, errors}; use error_stack::report; use masking::{PeekInterface, StrongSecret}; -use serde::{ - de::{self}, - Deserialize, Serialize, -}; +use serde::{de, Deserialize, Serialize}; use time::{util::days_in_year_month, Date, Duration, PrimitiveDateTime, Time}; pub use crate::validate::{CCValError, CardNumber, CardNumberStrategy}; diff --git a/crates/router/Cargo.toml b/crates/router/Cargo.toml index e99f3c150d..b9f374efca 100644 --- a/crates/router/Cargo.toml +++ b/crates/router/Cargo.toml @@ -44,7 +44,6 @@ blake3 = "1.3.3" bytes = "1.4.0" clap = { version = "4.3.2", default-features = false, features = ["std", "derive", "help", "usage"] } config = { version = "0.13.3", features = ["toml"] } -crc32fast = "1.3.2" diesel = { version = "2.1.0", features = ["postgres"] } dyn-clone = "1.0.11" encoding_rs = "0.8.32" @@ -60,7 +59,6 @@ literally = "0.1.3" maud = { version = "0.25", features = ["actix-web"] } mimalloc = { version = "0.1", optional = true } mime = "0.3.17" -moka = { version = "0.11", features = ["future"] } nanoid = "0.4.0" num_cpus = "1.15.0" once_cell = "1.18.0" diff --git a/crates/router/src/connection.rs b/crates/router/src/connection.rs index bf435c3067..c5ca810049 100644 --- a/crates/router/src/connection.rs +++ b/crates/router/src/connection.rs @@ -17,15 +17,15 @@ pub async fn redis_connection( .expect("Failed to create Redis Connection Pool") } -pub async fn pg_connection_read( - store: &crate::services::Store, +pub async fn pg_connection_read( + store: &T, ) -> errors::CustomResult< PooledConnection<'_, async_bb8_diesel::ConnectionManager>, errors::StorageError, > { // If only OLAP is enabled get replica pool. #[cfg(all(feature = "olap", not(feature = "oltp")))] - let pool = &store.diesel_store.replica_pool; + let pool = store.get_replica_pool(); // If either one of these are true we need to get master pool. // 1. Only OLTP is enabled. @@ -36,7 +36,7 @@ pub async fn pg_connection_read( all(feature = "olap", feature = "oltp"), all(not(feature = "olap"), not(feature = "oltp")) ))] - let pool = &store.diesel_store.master_pool; + let pool = store.get_master_pool(); pool.get() .await @@ -44,14 +44,14 @@ pub async fn pg_connection_read( .change_context(errors::StorageError::DatabaseConnectionError) } -pub async fn pg_connection_write( - store: &crate::services::Store, +pub async fn pg_connection_write( + store: &T, ) -> errors::CustomResult< PooledConnection<'_, async_bb8_diesel::ConnectionManager>, errors::StorageError, > { // Since all writes should happen to master DB only choose master DB. - let pool = &store.diesel_store.master_pool; + let pool = store.get_master_pool(); pool.get() .await diff --git a/crates/router/src/connector/cashtocode.rs b/crates/router/src/connector/cashtocode.rs index f1dc49966d..5b48a0ddf9 100644 --- a/crates/router/src/connector/cashtocode.rs +++ b/crates/router/src/connector/cashtocode.rs @@ -20,9 +20,7 @@ use crate::{ types::{ self, api::{self, ConnectorCommon, ConnectorCommonExt}, - domain, - storage::{self}, - ErrorResponse, Response, + domain, storage, ErrorResponse, Response, }, utils::{self, ByteSliceExt, BytesExt}, }; diff --git a/crates/router/src/connector/tsys.rs b/crates/router/src/connector/tsys.rs index e0f6e592c3..f7cab44071 100644 --- a/crates/router/src/connector/tsys.rs +++ b/crates/router/src/connector/tsys.rs @@ -9,11 +9,7 @@ use crate::{ configs::settings, core::errors::{self, CustomResult}, headers, - services::{ - self, - request::{self}, - ConnectorIntegration, - }, + services::{self, request, ConnectorIntegration}, types::{ self, api::{self, ConnectorCommon, ConnectorCommonExt}, diff --git a/crates/router/src/core/cache.rs b/crates/router/src/core/cache.rs index c3914d2751..124a09e1bd 100644 --- a/crates/router/src/core/cache.rs +++ b/crates/router/src/core/cache.rs @@ -1,9 +1,9 @@ use common_utils::errors::CustomResult; use error_stack::{report, ResultExt}; +use storage_impl::redis::cache::CacheKind; use super::errors; use crate::{ - cache::CacheKind, db::{cache::publish_into_redact_channel, StorageInterface}, services, }; diff --git a/crates/router/src/core/files/helpers.rs b/crates/router/src/core/files/helpers.rs index dfe9cee92c..b9d5c6afc1 100644 --- a/crates/router/src/core/files/helpers.rs +++ b/crates/router/src/core/files/helpers.rs @@ -12,11 +12,7 @@ use crate::{ }, routes::AppState, services, - types::{ - self, api, - domain::{self}, - transformers::ForeignTryFrom, - }, + types::{self, api, domain, transformers::ForeignTryFrom}, }; pub async fn read_string(field: &mut Field) -> Option { diff --git a/crates/router/src/core/payment_methods/vault.rs b/crates/router/src/core/payment_methods/vault.rs index 5a504a0fac..0a647d6a54 100644 --- a/crates/router/src/core/payment_methods/vault.rs +++ b/crates/router/src/core/payment_methods/vault.rs @@ -16,7 +16,7 @@ use crate::{ core::errors::{self, CustomResult, RouterResult}, logger, routes, types::{ - api::{self}, + api, storage::{self, enums}, }, utils::{self, StringExt}, diff --git a/crates/router/src/core/payouts.rs b/crates/router/src/core/payouts.rs index 2ad4f43ca7..ab60989fa6 100644 --- a/crates/router/src/core/payouts.rs +++ b/crates/router/src/core/payouts.rs @@ -6,7 +6,7 @@ use common_utils::{crypto::Encryptable, ext_traits::ValueExt}; use diesel_models::enums as storage_enums; use error_stack::{report, ResultExt}; use router_env::{instrument, tracing}; -use serde_json::{self}; +use serde_json; use super::errors::{ConnectorErrorExt, StorageErrorExt}; use crate::{ diff --git a/crates/router/src/core/payouts/validator.rs b/crates/router/src/core/payouts/validator.rs index 74b212ade2..c815d91e41 100644 --- a/crates/router/src/core/payouts/validator.rs +++ b/crates/router/src/core/payouts/validator.rs @@ -11,7 +11,7 @@ use crate::{ logger, routes::AppState, types::{api::payouts, domain, storage}, - utils::{self}, + utils, }; #[cfg(feature = "payouts")] diff --git a/crates/router/src/db.rs b/crates/router/src/db.rs index 7e20d1dae7..1f034cb25c 100644 --- a/crates/router/src/db.rs +++ b/crates/router/src/db.rs @@ -29,11 +29,10 @@ pub mod reverse_lookup; use std::sync::Arc; use futures::lock::Mutex; +use masking::PeekInterface; +use storage_impl::redis::kv_store::RedisConnInterface; -use crate::{ - services::{self, Store}, - types::storage, -}; +use crate::{services::Store, types::storage}; #[derive(PartialEq, Eq)] pub enum StorageImpl { @@ -75,7 +74,7 @@ pub trait StorageInterface: + cards_info::CardsInfoInterface + merchant_key_store::MerchantKeyStoreInterface + MasterKeyInterface - + services::RedisConnInterface + + RedisConnInterface + 'static { } @@ -86,7 +85,7 @@ pub trait MasterKeyInterface { impl MasterKeyInterface for Store { fn get_master_key(&self) -> &[u8] { - &self.master_key + self.master_key().peek() } } @@ -176,7 +175,7 @@ where .change_context(redis_interface::errors::RedisError::JsonDeserializationFailed) } -impl services::RedisConnInterface for MockDb { +impl RedisConnInterface for MockDb { fn get_redis_conn( &self, ) -> Result< diff --git a/crates/router/src/db/api_keys.rs b/crates/router/src/db/api_keys.rs index ad767a6927..70f0dc42f9 100644 --- a/crates/router/src/db/api_keys.rs +++ b/crates/router/src/db/api_keys.rs @@ -1,8 +1,10 @@ use error_stack::IntoReport; +#[cfg(feature = "accounts_cache")] +use storage_impl::redis::cache::CacheKind; +#[cfg(feature = "accounts_cache")] +use storage_impl::redis::cache::ACCOUNTS_CACHE; use super::{MockDb, Store}; -#[cfg(feature = "accounts_cache")] -use crate::cache::{self, ACCOUNTS_CACHE}; use crate::{ connection, core::errors::{self, CustomResult}, @@ -104,7 +106,7 @@ impl ApiKeyInterface for Store { super::cache::publish_and_redact( self, - cache::CacheKind::Accounts(api_key.hashed_api_key.into_inner().into()), + CacheKind::Accounts(api_key.hashed_api_key.into_inner().into()), update_call, ) .await @@ -147,7 +149,7 @@ impl ApiKeyInterface for Store { super::cache::publish_and_redact( self, - cache::CacheKind::Accounts(api_key.hashed_api_key.into_inner().into()), + CacheKind::Accounts(api_key.hashed_api_key.into_inner().into()), delete_call, ) .await @@ -373,12 +375,15 @@ impl ApiKeyInterface for MockDb { #[cfg(test)] mod tests { + use storage_impl::redis::{ + cache::{CacheKind, ACCOUNTS_CACHE}, + kv_store::RedisConnInterface, + pub_sub::PubSubInterface, + }; use time::macros::datetime; use crate::{ - cache::{CacheKind, ACCOUNTS_CACHE}, db::{api_keys::ApiKeyInterface, cache, MockDb}, - services::{PubSubInterface, RedisConnInterface}, types::storage, }; diff --git a/crates/router/src/db/cache.rs b/crates/router/src/db/cache.rs index fee7cd8591..7f084df5d5 100644 --- a/crates/router/src/db/cache.rs +++ b/crates/router/src/db/cache.rs @@ -1,13 +1,15 @@ use common_utils::ext_traits::AsyncExt; use error_stack::ResultExt; use redis_interface::errors::RedisError; +use storage_impl::redis::{ + cache::{Cache, CacheKind, Cacheable}, + pub_sub::PubSubInterface, +}; use super::StorageInterface; use crate::{ - cache::{self, Cacheable}, consts, core::errors::{self, CustomResult}, - services::PubSubInterface, }; pub async fn get_or_populate_redis( @@ -53,7 +55,7 @@ pub async fn get_or_populate_in_memory( store: &dyn StorageInterface, key: &str, fun: F, - cache: &cache::Cache, + cache: &Cache, ) -> CustomResult where T: Cacheable + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Clone, @@ -74,7 +76,7 @@ pub async fn redact_cache( store: &dyn StorageInterface, key: &str, fun: F, - in_memory: Option<&cache::Cache>, + in_memory: Option<&Cache>, ) -> CustomResult where F: FnOnce() -> Fut + Send, @@ -99,7 +101,7 @@ where pub async fn publish_into_redact_channel<'a>( store: &dyn StorageInterface, - key: cache::CacheKind<'a>, + key: CacheKind<'a>, ) -> CustomResult { let redis_conn = store .get_redis_conn() @@ -116,7 +118,7 @@ pub async fn publish_into_redact_channel<'a>( pub async fn publish_and_redact<'a, T, F, Fut>( store: &dyn StorageInterface, - key: cache::CacheKind<'a>, + key: CacheKind<'a>, fun: F, ) -> CustomResult where diff --git a/crates/router/src/db/configs.rs b/crates/router/src/db/configs.rs index 6684e42f80..ac78207811 100644 --- a/crates/router/src/db/configs.rs +++ b/crates/router/src/db/configs.rs @@ -1,12 +1,15 @@ use diesel_models::configs::ConfigUpdateInternal; use error_stack::IntoReport; +use storage_impl::redis::{ + cache::{CacheKind, CONFIG_CACHE}, + kv_store::RedisConnInterface, + pub_sub::PubSubInterface, +}; use super::{cache, MockDb, Store}; use crate::{ - cache::{CacheKind, CONFIG_CACHE}, connection, consts, core::errors::{self, CustomResult}, - services::PubSubInterface, types::storage, }; @@ -104,7 +107,7 @@ impl ConfigInterface for Store { .map_err(Into::into) .into_report()?; - self.redis_conn() + self.get_redis_conn() .map_err(Into::::into)? .publish(consts::PUB_SUB_CHANNEL, CacheKind::Config(key.into())) .await diff --git a/crates/router/src/db/ephemeral_key.rs b/crates/router/src/db/ephemeral_key.rs index a95ba436e1..6a8afebff7 100644 --- a/crates/router/src/db/ephemeral_key.rs +++ b/crates/router/src/db/ephemeral_key.rs @@ -27,6 +27,7 @@ mod storage { use common_utils::date_time; use error_stack::ResultExt; use redis_interface::HsetnxReply; + use storage_impl::redis::kv_store::RedisConnInterface; use time::ext::NumericalDuration; use super::EphemeralKeyInterface; @@ -58,7 +59,7 @@ mod storage { }; match self - .redis_conn() + .get_redis_conn() .map_err(Into::::into)? .serialize_and_set_multiple_hash_field_if_not_exist( &[(&secret_key, &created_ek), (&id_key, &created_ek)], @@ -75,12 +76,12 @@ mod storage { } Ok(_) => { let expire_at = expires.assume_utc().unix_timestamp(); - self.redis_conn() + self.get_redis_conn() .map_err(Into::::into)? .set_expire_at(&secret_key, expire_at) .await .change_context(errors::StorageError::KVError)?; - self.redis_conn() + self.get_redis_conn() .map_err(Into::::into)? .set_expire_at(&id_key, expire_at) .await @@ -95,7 +96,7 @@ mod storage { key: &str, ) -> CustomResult { let key = format!("epkey_{key}"); - self.redis_conn() + self.get_redis_conn() .map_err(Into::::into)? .get_hash_field_and_deserialize(&key, "ephkey", "EphemeralKey") .await @@ -107,13 +108,13 @@ mod storage { ) -> CustomResult { let ek = self.get_ephemeral_key(id).await?; - self.redis_conn() + self.get_redis_conn() .map_err(Into::::into)? .delete_key(&format!("epkey_{}", &ek.id)) .await .change_context(errors::StorageError::KVError)?; - self.redis_conn() + self.get_redis_conn() .map_err(Into::::into)? .delete_key(&format!("epkey_{}", &ek.secret)) .await diff --git a/crates/router/src/db/merchant_account.rs b/crates/router/src/db/merchant_account.rs index c888a5faa2..db76f94b22 100644 --- a/crates/router/src/db/merchant_account.rs +++ b/crates/router/src/db/merchant_account.rs @@ -1,9 +1,9 @@ use common_utils::ext_traits::AsyncExt; use error_stack::{IntoReport, ResultExt}; +#[cfg(feature = "accounts_cache")] +use storage_impl::redis::cache::{CacheKind, ACCOUNTS_CACHE}; use super::{MasterKeyInterface, MockDb, Store}; -#[cfg(feature = "accounts_cache")] -use crate::cache::{self, ACCOUNTS_CACHE}; use crate::{ connection, core::errors::{self, CustomResult}, @@ -147,7 +147,7 @@ impl MerchantAccountInterface for Store { { super::cache::publish_and_redact( self, - cache::CacheKind::Accounts((&_merchant_id).into()), + CacheKind::Accounts((&_merchant_id).into()), update_func, ) .await @@ -187,7 +187,7 @@ impl MerchantAccountInterface for Store { { super::cache::publish_and_redact( self, - cache::CacheKind::Accounts(merchant_id.into()), + CacheKind::Accounts(merchant_id.into()), update_func, ) .await @@ -240,7 +240,7 @@ impl MerchantAccountInterface for Store { { super::cache::publish_and_redact( self, - cache::CacheKind::Accounts(merchant_id.into()), + CacheKind::Accounts(merchant_id.into()), delete_func, ) .await diff --git a/crates/router/src/db/merchant_connector_account.rs b/crates/router/src/db/merchant_connector_account.rs index 03e93b22a1..65da0069fe 100644 --- a/crates/router/src/db/merchant_connector_account.rs +++ b/crates/router/src/db/merchant_connector_account.rs @@ -3,10 +3,11 @@ use std::cmp::Ordering; use common_utils::ext_traits::{AsyncExt, ByteSliceExt, Encode}; use diesel_models::errors as storage_errors; use error_stack::{IntoReport, ResultExt}; +#[cfg(feature = "accounts_cache")] +use storage_impl::redis::cache; +use storage_impl::redis::kv_store::RedisConnInterface; use super::{MockDb, Store}; -#[cfg(feature = "accounts_cache")] -use crate::cache::{self, ACCOUNTS_CACHE}; use crate::{ connection, core::errors::{self, CustomResult}, @@ -49,7 +50,7 @@ impl ConnectorAccessToken for Store { // being refreshed by other request then wait till it finishes and use the same access token let key = format!("access_token_{merchant_id}_{connector_name}"); let maybe_token = self - .redis_conn() + .get_redis_conn() .map_err(Into::::into)? .get_key::>>(&key) .await @@ -75,7 +76,7 @@ impl ConnectorAccessToken for Store { let serialized_access_token = Encode::::encode_to_string_of_json(&access_token) .change_context(errors::StorageError::SerializationFailed)?; - self.redis_conn() + self.get_redis_conn() .map_err(Into::::into)? .set_key_with_expiry(&key, serialized_access_token, access_token.expires) .await @@ -198,7 +199,7 @@ impl MerchantConnectorAccountInterface for Store { self, &format!("{}_{}", merchant_id, connector_label), find_call, - &ACCOUNTS_CACHE, + &cache::ACCOUNTS_CACHE, ) .await .async_and_then(|item| async { @@ -669,16 +670,20 @@ mod merchant_connector_account_cache_tests { use diesel_models::enums::ConnectorType; use error_stack::ResultExt; use masking::PeekInterface; + use storage_impl::redis::{ + cache::{CacheKind, ACCOUNTS_CACHE}, + kv_store::RedisConnInterface, + pub_sub::PubSubInterface, + }; use time::macros::datetime; use crate::{ - cache::{CacheKind, ACCOUNTS_CACHE}, core::errors, db::{ cache, merchant_connector_account::MerchantConnectorAccountInterface, merchant_key_store::MerchantKeyStoreInterface, MasterKeyInterface, MockDb, }, - services::{self, PubSubInterface, RedisConnInterface}, + services, types::{ domain::{self, behaviour::Conversion, types as domain_types}, storage, diff --git a/crates/router/src/db/merchant_key_store.rs b/crates/router/src/db/merchant_key_store.rs index 32bf9f4270..a85b5f5fe2 100644 --- a/crates/router/src/db/merchant_key_store.rs +++ b/crates/router/src/db/merchant_key_store.rs @@ -1,8 +1,8 @@ use error_stack::{IntoReport, ResultExt}; use masking::Secret; - #[cfg(feature = "accounts_cache")] -use crate::cache::ACCOUNTS_CACHE; +use storage_impl::redis::cache::ACCOUNTS_CACHE; + use crate::{ connection, core::errors::{self, CustomResult}, diff --git a/crates/router/src/db/payment_attempt.rs b/crates/router/src/db/payment_attempt.rs index 7230fa7894..52180cd15e 100644 --- a/crates/router/src/db/payment_attempt.rs +++ b/crates/router/src/db/payment_attempt.rs @@ -111,7 +111,7 @@ mod storage { payment_attempt: PaymentAttemptUpdate, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let conn = connection::pg_connection_write(&self).await?; + let conn = connection::pg_connection_write(self).await?; this.update_with_attempt_id(&conn, payment_attempt) .await .map_err(Into::into) @@ -430,6 +430,7 @@ mod storage { use diesel_models::reverse_lookup::ReverseLookup; use error_stack::{IntoReport, ResultExt}; use redis_interface::HsetnxReply; + use storage_impl::redis::kv_store::RedisConnInterface; use super::PaymentAttemptInterface; use crate::{ @@ -511,7 +512,7 @@ mod storage { let field = format!("pa_{}", created_attempt.attempt_id); match self - .redis_conn() + .get_redis_conn() .map_err(Into::::into)? .serialize_and_set_hash_field_if_not_exist(&key, &field, &created_attempt) .await @@ -551,7 +552,7 @@ mod storage { payment_id: &created_attempt.payment_id, } ) - .await?; + .await.change_context(errors::StorageError::KVError)?; Ok(created_attempt) } Err(error) => Err(error.change_context(errors::StorageError::KVError)), @@ -586,7 +587,8 @@ mod storage { .change_context(errors::StorageError::KVError)?; let field = format!("pa_{}", updated_attempt.attempt_id); let updated_attempt = self - .redis_conn + .get_redis_conn() + .change_context(errors::StorageError::KVError)? .set_hash_fields(&key, (&field, &redis_value)) .await .map(|_| updated_attempt) @@ -664,7 +666,8 @@ mod storage { payment_id: &updated_attempt.payment_id, }, ) - .await?; + .await + .change_context(errors::StorageError::KVError)?; Ok(updated_attempt) } } @@ -698,7 +701,7 @@ mod storage { let key = &lookup.pk_id; db_utils::try_redis_get_else_try_database_get( - self.redis_conn() + self.get_redis_conn() .map_err(Into::::into)? .get_hash_field_and_deserialize(key, &lookup.sk_id, "PaymentAttempt"), database_call, @@ -751,7 +754,7 @@ mod storage { let key = &lookup.pk_id; db_utils::try_redis_get_else_try_database_get( - self.redis_conn() + self.get_redis_conn() .map_err(Into::::into)? .get_hash_field_and_deserialize(key, &lookup.sk_id, "PaymentAttempt"), database_call, @@ -782,7 +785,7 @@ mod storage { let lookup = self.get_lookup_by_lookup_id(&lookup_id).await?; let key = &lookup.pk_id; db_utils::try_redis_get_else_try_database_get( - self.redis_conn() + self.get_redis_conn() .map_err(Into::::into)? .get_hash_field_and_deserialize(key, &lookup.sk_id, "PaymentAttempt"), database_call, @@ -817,7 +820,7 @@ mod storage { let key = &lookup.pk_id; db_utils::try_redis_get_else_try_database_get( - self.redis_conn() + self.get_redis_conn() .map_err(Into::::into)? .get_hash_field_and_deserialize(key, &lookup.sk_id, "PaymentAttempt"), database_call, @@ -854,7 +857,7 @@ mod storage { let lookup = self.get_lookup_by_lookup_id(&lookup_id).await?; let key = &lookup.pk_id; db_utils::try_redis_get_else_try_database_get( - self.redis_conn() + self.get_redis_conn() .map_err(Into::::into)? .get_hash_field_and_deserialize(key, &lookup.sk_id, "PaymentAttempt"), database_call, @@ -884,7 +887,7 @@ mod storage { let pattern = db_utils::generate_hscan_pattern_for_attempt(&lookup.sk_id); - self.redis_conn() + self.get_redis_conn() .map_err(Into::::into)? .hscan_and_deserialize(&key, &pattern, None) .await diff --git a/crates/router/src/db/payment_intent.rs b/crates/router/src/db/payment_intent.rs index e20dca4308..0d807ce85d 100644 --- a/crates/router/src/db/payment_intent.rs +++ b/crates/router/src/db/payment_intent.rs @@ -58,6 +58,7 @@ mod storage { use common_utils::date_time; use error_stack::{IntoReport, ResultExt}; use redis_interface::HsetnxReply; + use storage_impl::redis::kv_store::RedisConnInterface; use super::PaymentIntentInterface; #[cfg(feature = "olap")] @@ -119,7 +120,7 @@ mod storage { }; match self - .redis_conn() + .get_redis_conn() .map_err(Into::::into)? .serialize_and_set_hash_field_if_not_exist(&key, "pi", &created_intent) .await @@ -142,7 +143,8 @@ mod storage { payment_id: &created_intent.payment_id, }, ) - .await?; + .await + .change_context(errors::StorageError::KVError)?; Ok(created_intent) } Err(error) => Err(error.change_context(errors::StorageError::KVError)), @@ -177,7 +179,7 @@ mod storage { .change_context(errors::StorageError::SerializationFailed)?; let updated_intent = self - .redis_conn() + .get_redis_conn() .map_err(Into::::into)? .set_hash_fields(&key, ("pi", &redis_value)) .await @@ -202,7 +204,8 @@ mod storage { payment_id: &updated_intent.payment_id, }, ) - .await?; + .await + .change_context(errors::StorageError::KVError)?; Ok(updated_intent) } } @@ -227,7 +230,7 @@ mod storage { enums::MerchantStorageScheme::RedisKv => { let key = format!("{merchant_id}_{payment_id}"); db_utils::try_redis_get_else_try_database_get( - self.redis_conn() + self.get_redis_conn() .map_err(Into::::into)? .get_hash_field_and_deserialize(&key, "pi", "PaymentIntent"), database_call, diff --git a/crates/router/src/db/queue.rs b/crates/router/src/db/queue.rs index a5e08a125e..17dc81b928 100644 --- a/crates/router/src/db/queue.rs +++ b/crates/router/src/db/queue.rs @@ -1,5 +1,6 @@ use redis_interface::{errors::RedisError, RedisEntryId, SetnxReply}; use router_env::logger; +use storage_impl::redis::kv_store::RedisConnInterface; use super::{MockDb, Store}; use crate::{ @@ -54,7 +55,7 @@ impl QueueInterface for Store { crate::scheduler::consumer::fetch_consumer_tasks( self, &self - .redis_conn() + .get_redis_conn() .map_err(ProcessTrackerError::ERedisError)? .clone(), stream_name, @@ -70,7 +71,7 @@ impl QueueInterface for Store { group: &str, id: &RedisEntryId, ) -> CustomResult<(), RedisError> { - self.redis_conn()? + self.get_redis_conn()? .consumer_group_create(stream, group, id) .await } @@ -82,7 +83,7 @@ impl QueueInterface for Store { lock_val: &str, ttl: i64, ) -> CustomResult { - let conn = self.redis_conn()?.clone(); + let conn = self.get_redis_conn()?.clone(); let is_lock_acquired = conn .set_key_if_not_exists_with_expiry(lock_key, lock_val, None) .await; @@ -109,7 +110,7 @@ impl QueueInterface for Store { } async fn release_pt_lock(&self, tag: &str, lock_key: &str) -> CustomResult { - let is_lock_released = self.redis_conn()?.delete_key(lock_key).await; + let is_lock_released = self.get_redis_conn()?.delete_key(lock_key).await; Ok(match is_lock_released { Ok(_del_reply) => true, Err(error) => { @@ -125,13 +126,13 @@ impl QueueInterface for Store { entry_id: &RedisEntryId, fields: Vec<(&str, String)>, ) -> CustomResult<(), RedisError> { - self.redis_conn()? + self.get_redis_conn()? .stream_append_entry(stream, entry_id, fields) .await } async fn get_key(&self, key: &str) -> CustomResult, RedisError> { - self.redis_conn()?.get_key::>(key).await + self.get_redis_conn()?.get_key::>(key).await } } diff --git a/crates/router/src/db/refund.rs b/crates/router/src/db/refund.rs index df2b2edf10..65c23584dc 100644 --- a/crates/router/src/db/refund.rs +++ b/crates/router/src/db/refund.rs @@ -244,6 +244,7 @@ mod storage { use common_utils::date_time; use error_stack::{IntoReport, ResultExt}; use redis_interface::HsetnxReply; + use storage_impl::redis::kv_store::RedisConnInterface; use super::RefundInterface; use crate::{ @@ -282,7 +283,7 @@ mod storage { let key = &lookup.pk_id; db_utils::try_redis_get_else_try_database_get( - self.redis_conn() + self.get_redis_conn() .map_err(Into::::into)? .get_hash_field_and_deserialize(key, &lookup.sk_id, "Refund"), database_call, @@ -338,7 +339,7 @@ mod storage { &created_refund.attempt_id, &created_refund.refund_id ); match self - .redis_conn() + .get_redis_conn() .map_err(Into::::into)? .serialize_and_set_hash_field_if_not_exist(&key, &field, &created_refund) .await @@ -404,7 +405,8 @@ mod storage { payment_id: &created_refund.payment_id, }, ) - .await?; + .await + .change_context(errors::StorageError::KVError)?; Ok(created_refund) } @@ -445,7 +447,7 @@ mod storage { let pattern = db_utils::generate_hscan_pattern_for_refund(&lookup.sk_id); - self.redis_conn() + self.get_redis_conn() .map_err(Into::::into)? .hscan_and_deserialize(key, &pattern, None) .await @@ -484,7 +486,7 @@ mod storage { ) .change_context(errors::StorageError::SerializationFailed)?; - self.redis_conn() + self.get_redis_conn() .map_err(Into::::into)? .set_hash_fields(&lookup.pk_id, (field, redis_value)) .await @@ -505,7 +507,8 @@ mod storage { payment_id: &updated_refund.payment_id, }, ) - .await?; + .await + .change_context(errors::StorageError::KVError)?; Ok(updated_refund) } } @@ -532,7 +535,7 @@ mod storage { let key = &lookup.pk_id; db_utils::try_redis_get_else_try_database_get( - self.redis_conn() + self.get_redis_conn() .map_err(Into::::into)? .get_hash_field_and_deserialize(key, &lookup.sk_id, "Refund"), database_call, @@ -569,7 +572,7 @@ mod storage { let key = &lookup.pk_id; db_utils::try_redis_get_else_try_database_get( - self.redis_conn() + self.get_redis_conn() .map_err(Into::::into)? .get_hash_field_and_deserialize(key, &lookup.sk_id, "Refund"), database_call, @@ -603,7 +606,7 @@ mod storage { let pattern = db_utils::generate_hscan_pattern_for_refund(&lookup.sk_id); - self.redis_conn() + self.get_redis_conn() .map_err(Into::::into)? .hscan_and_deserialize(&key, &pattern, None) .await diff --git a/crates/router/src/lib.rs b/crates/router/src/lib.rs index 1ead4924bf..2c8c0a4f9f 100644 --- a/crates/router/src/lib.rs +++ b/crates/router/src/lib.rs @@ -1,7 +1,6 @@ #![forbid(unsafe_code)] #![recursion_limit = "256"] -pub mod cache; #[cfg(feature = "stripe")] pub mod compatibility; pub mod configs; diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index 34f43647f4..cce227dc46 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -20,7 +20,7 @@ use crate::{ configs::settings, db::{MockDb, StorageImpl, StorageInterface}, routes::cards_info::card_iin_info, - services::Store, + services::get_store, }; #[derive(Clone)] @@ -69,7 +69,7 @@ impl AppState { let testable = storage_impl == StorageImpl::PostgresqlTest; let store: Box = match storage_impl { StorageImpl::Postgresql | StorageImpl::PostgresqlTest => { - Box::new(Store::new(&conf, testable, shut_down_signal).await) + Box::new(get_store(&conf, shut_down_signal, testable).await) } StorageImpl::Mock => Box::new(MockDb::new(&conf).await), }; diff --git a/crates/router/src/services.rs b/crates/router/src/services.rs index 8d5bb95212..fd4a1667d1 100644 --- a/crates/router/src/services.rs +++ b/crates/router/src/services.rs @@ -3,283 +3,100 @@ pub mod authentication; pub mod encryption; pub mod logger; -use std::sync::{atomic, Arc}; - use error_stack::{IntoReport, ResultExt}; #[cfg(feature = "kms")] use external_services::kms::{self, decrypt::KmsDecrypt}; #[cfg(not(feature = "kms"))] use masking::PeekInterface; -use redis_interface::{errors as redis_errors, PubsubInterface, RedisValue}; -use storage_impl::{diesel as diesel_impl, DatabaseStore}; +use masking::StrongSecret; +#[cfg(feature = "kv_store")] +use storage_impl::KVRouterStore; +use storage_impl::RouterStore; use tokio::sync::oneshot; pub use self::{api::*, encryption::*}; -use crate::{ - async_spawn, - cache::{CacheKind, ACCOUNTS_CACHE, CONFIG_CACHE}, - configs::settings, - consts, - core::errors, -}; +use crate::{configs::settings, consts, core::errors}; -#[async_trait::async_trait] -pub trait PubSubInterface { - async fn subscribe(&self, channel: &str) -> errors::CustomResult<(), redis_errors::RedisError>; - - async fn publish<'a>( - &self, - channel: &str, - key: CacheKind<'a>, - ) -> errors::CustomResult; - - async fn on_message(&self) -> errors::CustomResult<(), redis_errors::RedisError>; -} - -#[async_trait::async_trait] -impl PubSubInterface for redis_interface::RedisConnectionPool { - #[inline] - async fn subscribe(&self, channel: &str) -> errors::CustomResult<(), redis_errors::RedisError> { - // Spawns a task that will automatically re-subscribe to any channels or channel patterns used by the client. - self.subscriber.manage_subscriptions(); - - self.subscriber - .subscribe(channel) - .await - .into_report() - .change_context(redis_errors::RedisError::SubscribeError) - } - - #[inline] - async fn publish<'a>( - &self, - channel: &str, - key: CacheKind<'a>, - ) -> errors::CustomResult { - self.publisher - .publish(channel, RedisValue::from(key).into_inner()) - .await - .into_report() - .change_context(redis_errors::RedisError::SubscribeError) - } - - #[inline] - async fn on_message(&self) -> errors::CustomResult<(), redis_errors::RedisError> { - logger::debug!("Started on message"); - let mut rx = self.subscriber.on_message(); - while let Ok(message) = rx.recv().await { - logger::debug!("Invalidating {message:?}"); - let key: CacheKind<'_> = match RedisValue::new(message.value) - .try_into() - .change_context(redis_errors::RedisError::OnMessageError) - { - Ok(value) => value, - Err(err) => { - logger::error!(value_conversion_err=?err); - continue; - } - }; - - let key = match key { - CacheKind::Config(key) => { - CONFIG_CACHE.invalidate(key.as_ref()).await; - key - } - CacheKind::Accounts(key) => { - ACCOUNTS_CACHE.invalidate(key.as_ref()).await; - key - } - CacheKind::All(key) => { - CONFIG_CACHE.invalidate(key.as_ref()).await; - ACCOUNTS_CACHE.invalidate(key.as_ref()).await; - key - } - }; - - self.delete_key(key.as_ref()) - .await - .map_err(|err| logger::error!("Error while deleting redis key: {err:?}")) - .ok(); - - logger::debug!("Done invalidating {key}"); - } - Ok(()) - } -} - -pub trait RedisConnInterface { - fn get_redis_conn( - &self, - ) -> common_utils::errors::CustomResult< - Arc, - errors::RedisError, - >; -} - -impl RedisConnInterface for Store { - fn get_redis_conn( - &self, - ) -> common_utils::errors::CustomResult< - Arc, - errors::RedisError, - > { - self.redis_conn() - } -} - -#[derive(Clone)] -pub struct Store { - #[cfg(not(feature = "olap"))] - pub diesel_store: diesel_impl::store::Store, - #[cfg(feature = "olap")] - pub diesel_store: diesel_impl::store::ReplicaStore, - pub redis_conn: Arc, - #[cfg(feature = "kv_store")] - pub(crate) config: StoreConfig, - pub master_key: Vec, -} +#[cfg(not(feature = "olap"))] +type StoreType = storage_impl::database::store::Store; +#[cfg(feature = "olap")] +type StoreType = storage_impl::database::store::ReplicaStore; +#[cfg(not(feature = "kv_store"))] +pub type Store = RouterStore; #[cfg(feature = "kv_store")] -#[derive(Clone)] -pub(crate) struct StoreConfig { - pub(crate) drainer_stream_name: String, - pub(crate) drainer_num_partitions: u8, -} +pub type Store = KVRouterStore; -impl Store { - pub async fn new( - config: &settings::Settings, - test_transaction: bool, - shut_down_signal: oneshot::Sender<()>, - ) -> Self { - let redis_conn = Arc::new(crate::connection::redis_connection(config).await); - let redis_clone = redis_conn.clone(); +pub async fn get_store( + config: &settings::Settings, + shut_down_signal: oneshot::Sender<()>, + test_transaction: bool, +) -> Store { + #[cfg(feature = "kms")] + let kms_client = kms::get_kms_client(&config.kms).await; - let subscriber_conn = redis_conn.clone(); + #[cfg(feature = "kms")] + #[allow(clippy::expect_used)] + let master_config = config + .master_database + .clone() + .decrypt_inner(kms_client) + .await + .expect("Failed to decrypt master database config"); + #[cfg(not(feature = "kms"))] + let master_config = config.master_database.clone().into(); - if let Err(e) = redis_conn.subscribe(consts::PUB_SUB_CHANNEL).await { - logger::error!(subscribe_err=?e); - } + #[cfg(all(feature = "olap", feature = "kms"))] + #[allow(clippy::expect_used)] + let replica_config = config + .replica_database + .clone() + .decrypt_inner(kms_client) + .await + .expect("Failed to decrypt replica database config"); - async_spawn!({ - if let Err(e) = subscriber_conn.on_message().await { - logger::error!(pubsub_err=?e); - } - }); - async_spawn!({ - redis_clone.on_error(shut_down_signal).await; - }); + #[cfg(all(feature = "olap", not(feature = "kms")))] + let replica_config = config.replica_database.clone().into(); + + let master_enc_key = get_master_enc_key( + config, #[cfg(feature = "kms")] - let kms_client = kms::get_kms_client(&config.kms).await; + kms_client, + ) + .await; + #[cfg(not(feature = "olap"))] + let conf = master_config; + #[cfg(feature = "olap")] + let conf = (master_config, replica_config); - let master_enc_key = get_master_enc_key( - config, - #[cfg(feature = "kms")] - kms_client, + let store: RouterStore = if test_transaction { + RouterStore::test_store(conf, &config.redis, master_enc_key).await + } else { + RouterStore::from_config( + conf, + &config.redis, + master_enc_key, + shut_down_signal, + consts::PUB_SUB_CHANNEL, ) - .await; - - #[allow(clippy::expect_used)] - Self { - #[cfg(not(feature = "olap"))] - diesel_store: diesel_impl::store::Store::new( - #[cfg(not(feature = "kms"))] - config.master_database.clone().into(), - #[cfg(feature = "kms")] - config - .master_database - .clone() - .decrypt_inner(kms_client) - .await - .expect("Failed to decrypt master database"), - test_transaction, - ) - .await, - #[cfg(feature = "olap")] - diesel_store: diesel_impl::store::ReplicaStore::new( - ( - #[cfg(not(feature = "kms"))] - config.master_database.clone().into(), - #[cfg(feature = "kms")] - config - .master_database - .clone() - .decrypt_inner(kms_client) - .await - .expect("Failed to decrypt master database"), - #[cfg(not(feature = "kms"))] - config.replica_database.clone().into(), - #[cfg(feature = "kms")] - config - .replica_database - .clone() - .decrypt_inner(kms_client) - .await - .expect("Failed to decrypt replica database"), - ), - test_transaction, - ) - .await, - redis_conn, - #[cfg(feature = "kv_store")] - config: StoreConfig { - drainer_stream_name: config.drainer.stream_name.clone(), - drainer_num_partitions: config.drainer.num_partitions, - }, - master_key: master_enc_key, - } - } + .await + }; #[cfg(feature = "kv_store")] - pub fn get_drainer_stream_name(&self, shard_key: &str) -> String { - // Example: {shard_5}_drainer_stream - format!("{{{}}}_{}", shard_key, self.config.drainer_stream_name,) - } + let store = KVRouterStore::from_store( + store, + config.drainer.stream_name.clone(), + config.drainer.num_partitions, + ); - pub fn redis_conn( - &self, - ) -> errors::CustomResult, redis_errors::RedisError> - { - if self - .redis_conn - .is_redis_available - .load(atomic::Ordering::SeqCst) - { - Ok(self.redis_conn.clone()) - } else { - Err(redis_errors::RedisError::RedisConnectionError.into()) - } - } - - #[cfg(feature = "kv_store")] - pub(crate) async fn push_to_drainer_stream( - &self, - redis_entry: diesel_models::kv::TypedSql, - partition_key: crate::utils::storage_partitioning::PartitionKey<'_>, - ) -> crate::core::errors::CustomResult<(), crate::core::errors::StorageError> - where - T: crate::utils::storage_partitioning::KvStorePartition, - { - let shard_key = T::shard_key(partition_key, self.config.drainer_num_partitions); - let stream_name = self.get_drainer_stream_name(&shard_key); - self.redis_conn - .stream_append_entry( - &stream_name, - &redis_interface::RedisEntryId::AutoGeneratedID, - redis_entry - .to_field_value_pairs() - .change_context(crate::core::errors::StorageError::KVError)?, - ) - .await - .change_context(crate::core::errors::StorageError::KVError) - } + store } #[allow(clippy::expect_used)] async fn get_master_enc_key( conf: &crate::configs::settings::Settings, #[cfg(feature = "kms")] kms_client: &kms::KmsClient, -) -> Vec { +) -> StrongSecret> { #[cfg(feature = "kms")] let master_enc_key = hex::decode( conf.secrets @@ -295,7 +112,7 @@ async fn get_master_enc_key( let master_enc_key = hex::decode(conf.secrets.master_enc_key.peek()).expect("Failed to decode from hex"); - master_enc_key + StrongSecret::new(master_enc_key) } #[inline] diff --git a/crates/router/src/types/storage/payment_attempt.rs b/crates/router/src/types/storage/payment_attempt.rs index 76134ee8cb..8edfc77d65 100644 --- a/crates/router/src/types/storage/payment_attempt.rs +++ b/crates/router/src/types/storage/payment_attempt.rs @@ -9,9 +9,6 @@ pub struct RoutingData { pub algorithm: Option, } -#[cfg(feature = "kv_store")] -impl crate::utils::storage_partitioning::KvStorePartition for PaymentAttempt {} - pub trait PaymentAttemptExt { fn make_new_capture( &self, diff --git a/crates/router/src/types/storage/payment_intent.rs b/crates/router/src/types/storage/payment_intent.rs index a78ea8eaaf..a2752b5499 100644 --- a/crates/router/src/types/storage/payment_intent.rs +++ b/crates/router/src/types/storage/payment_intent.rs @@ -19,9 +19,6 @@ use crate::{connection::PgPooledConn, core::errors::CustomResult, types::api}; const JOIN_LIMIT: i64 = 20; -#[cfg(feature = "kv_store")] -impl crate::utils::storage_partitioning::KvStorePartition for PaymentIntent {} - #[async_trait::async_trait] pub trait PaymentIntentDbExt: Sized { async fn filter_by_constraints( diff --git a/crates/router/src/types/storage/refund.rs b/crates/router/src/types/storage/refund.rs index 76084f87bd..de842574b9 100644 --- a/crates/router/src/types/storage/refund.rs +++ b/crates/router/src/types/storage/refund.rs @@ -14,9 +14,6 @@ use error_stack::{IntoReport, ResultExt}; use crate::{connection::PgPooledConn, logger}; -#[cfg(feature = "kv_store")] -impl crate::utils::storage_partitioning::KvStorePartition for Refund {} - #[async_trait::async_trait] pub trait RefundDbExt: Sized { async fn filter_by_constraints( diff --git a/crates/router/src/utils/storage_partitioning.rs b/crates/router/src/utils/storage_partitioning.rs index 943b84d189..de043879a0 100644 --- a/crates/router/src/utils/storage_partitioning.rs +++ b/crates/router/src/utils/storage_partitioning.rs @@ -1,27 +1 @@ -pub(crate) trait KvStorePartition { - fn partition_number(key: PartitionKey<'_>, num_partitions: u8) -> u32 { - crc32fast::hash(key.to_string().as_bytes()) % u32::from(num_partitions) - } - - fn shard_key(key: PartitionKey<'_>, num_partitions: u8) -> String { - format!("shard_{}", Self::partition_number(key, num_partitions)) - } -} - -pub(crate) enum PartitionKey<'a> { - MerchantIdPaymentId { - merchant_id: &'a str, - payment_id: &'a str, - }, -} - -impl<'a> std::fmt::Display for PartitionKey<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match *self { - PartitionKey::MerchantIdPaymentId { - merchant_id, - payment_id, - } => f.write_str(&format!("mid_{merchant_id}_pid_{payment_id}")), - } - } -} +pub use storage_impl::redis::kv_store::{KvStorePartition, PartitionKey}; diff --git a/crates/router/tests/cache.rs b/crates/router/tests/cache.rs index 0799335ee1..03f3466903 100644 --- a/crates/router/tests/cache.rs +++ b/crates/router/tests/cache.rs @@ -1,9 +1,6 @@ #![allow(clippy::unwrap_used)] -use router::{ - cache::{self}, - configs::settings::Settings, - routes, -}; +use router::{configs::settings::Settings, routes}; +use storage_impl::redis::cache; mod utils; diff --git a/crates/router/tests/connectors/payeezy.rs b/crates/router/tests/connectors/payeezy.rs index 66d81a6770..058ec1772b 100644 --- a/crates/router/tests/connectors/payeezy.rs +++ b/crates/router/tests/connectors/payeezy.rs @@ -9,7 +9,7 @@ use router::{ }; use crate::{ - connector_auth::{self}, + connector_auth, utils::{self, ConnectorActions, PaymentInfo}, }; diff --git a/crates/router_derive/src/macros/diesel.rs b/crates/router_derive/src/macros/diesel.rs index 3b50fdcf6c..07957bef78 100644 --- a/crates/router_derive/src/macros/diesel.rs +++ b/crates/router_derive/src/macros/diesel.rs @@ -1,3 +1,4 @@ +#![allow(clippy::use_self)] use darling::FromMeta; use proc_macro2::{Span, TokenStream}; use quote::{format_ident, quote}; diff --git a/crates/storage_impl/Cargo.toml b/crates/storage_impl/Cargo.toml index bf0068c25d..5597bc1af5 100644 --- a/crates/storage_impl/Cargo.toml +++ b/crates/storage_impl/Cargo.toml @@ -11,15 +11,20 @@ license.workspace = true [dependencies] # First Party dependencies +common_utils = { version = "0.1.0", path = "../common_utils" } +diesel_models = { version = "0.1.0", path = "../diesel_models" } masking = { version = "0.1.0", path = "../masking" } redis_interface = { version = "0.1.0", path = "../redis_interface" } -diesel_models = { version = "0.1.0", path = "../diesel_models" } +router_env = { version = "0.1.0", path = "../router_env" } # Third party crates -bb8 = "0.8.1" -diesel = { version = "2.1.0", default-features = false, features = ["postgres"] } async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "be3d9bce50051d8c0e0c06078e8066cc27db3001" } async-trait = "0.1.72" +bb8 = "0.8.1" crc32fast = "1.3.2" +diesel = { version = "2.1.0", default-features = false, features = ["postgres"] } +dyn-clone = "1.0.12" error-stack = "0.3.1" -tokio = { version = "1.28.2", features = ["rt-multi-thread"] } \ No newline at end of file +moka = { version = "0.11.3", features = ["future"] } +once_cell = "1.18.0" +tokio = { version = "1.28.2", features = ["rt-multi-thread"] } diff --git a/crates/storage_impl/src/diesel.rs b/crates/storage_impl/src/database.rs similarity index 100% rename from crates/storage_impl/src/diesel.rs rename to crates/storage_impl/src/database.rs diff --git a/crates/storage_impl/src/diesel/store.rs b/crates/storage_impl/src/database/store.rs similarity index 85% rename from crates/storage_impl/src/diesel/store.rs rename to crates/storage_impl/src/database/store.rs index 90e40373e1..a80a3b1fdf 100644 --- a/crates/storage_impl/src/diesel/store.rs +++ b/crates/storage_impl/src/database/store.rs @@ -9,14 +9,14 @@ pub type PgPool = bb8::Pool>; pub type PgPooledConn = async_bb8_diesel::Connection; #[async_trait::async_trait] -pub trait DatabaseStore { +pub trait DatabaseStore: Clone + Send { type Config; async fn new(config: Self::Config, test_transaction: bool) -> Self; - fn get_write_pool(&self) -> PgPool; - fn get_read_pool(&self) -> PgPool; + fn get_master_pool(&self) -> &PgPool; + fn get_replica_pool(&self) -> &PgPool; } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct Store { pub master_pool: PgPool, } @@ -30,16 +30,16 @@ impl DatabaseStore for Store { } } - fn get_write_pool(&self) -> PgPool { - self.master_pool.clone() + fn get_master_pool(&self) -> &PgPool { + &self.master_pool } - fn get_read_pool(&self) -> PgPool { - self.master_pool.clone() + fn get_replica_pool(&self) -> &PgPool { + &self.master_pool } } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct ReplicaStore { pub master_pool: PgPool, pub replica_pool: PgPool, @@ -58,12 +58,12 @@ impl DatabaseStore for ReplicaStore { } } - fn get_write_pool(&self) -> PgPool { - self.master_pool.clone() + fn get_master_pool(&self) -> &PgPool { + &self.master_pool } - fn get_read_pool(&self) -> PgPool { - self.replica_pool.clone() + fn get_replica_pool(&self) -> &PgPool { + &self.replica_pool } } diff --git a/crates/storage_impl/src/lib.rs b/crates/storage_impl/src/lib.rs index f24f184a70..ef3970f1f9 100644 --- a/crates/storage_impl/src/lib.rs +++ b/crates/storage_impl/src/lib.rs @@ -1,21 +1,72 @@ +use std::sync::Arc; + use error_stack::ResultExt; use masking::StrongSecret; -use redis::CacheStore; +use redis::{kv_store::RedisConnInterface, RedisStore}; pub mod config; -pub mod diesel; +pub mod database; +pub mod payments; pub mod redis; +pub mod refund; -pub use crate::diesel::store::DatabaseStore; +use database::store::PgPool; +use redis_interface::errors::RedisError; -#[allow(dead_code)] +pub use crate::database::store::DatabaseStore; + +#[derive(Debug, Clone)] pub struct RouterStore { db_store: T, - cache_store: CacheStore, + cache_store: RedisStore, master_encryption_key: StrongSecret>, } +#[async_trait::async_trait] +impl DatabaseStore for RouterStore +where + T::Config: Send, +{ + type Config = ( + T::Config, + redis_interface::RedisSettings, + StrongSecret>, + tokio::sync::oneshot::Sender<()>, + &'static str, + ); + async fn new(config: Self::Config, test_transaction: bool) -> Self { + let (db_conf, cache_conf, encryption_key, cache_error_signal, inmemory_cache_stream) = + config; + if test_transaction { + Self::test_store(db_conf, &cache_conf, encryption_key).await + } else { + Self::from_config( + db_conf, + &cache_conf, + encryption_key, + cache_error_signal, + inmemory_cache_stream, + ) + .await + } + } + fn get_master_pool(&self) -> &PgPool { + self.db_store.get_master_pool() + } + fn get_replica_pool(&self) -> &PgPool { + self.db_store.get_replica_pool() + } +} + +impl RedisConnInterface for RouterStore { + fn get_redis_conn( + &self, + ) -> error_stack::Result, RedisError> { + self.cache_store.get_redis_conn() + } +} + impl RouterStore { - pub async fn new( + pub async fn from_config( db_conf: T::Config, cache_conf: &redis_interface::RedisSettings, encryption_key: StrongSecret>, @@ -25,7 +76,7 @@ impl RouterStore { // TODO: create an error enum and return proper error here let db_store = T::new(db_conf, false).await; #[allow(clippy::expect_used)] - let cache_store = CacheStore::new(cache_conf) + let cache_store = RedisStore::new(cache_conf) .await .expect("Failed to create cache store"); cache_store.set_error_callback(cache_error_signal); @@ -40,6 +91,11 @@ impl RouterStore { master_encryption_key: encryption_key, } } + + pub fn master_key(&self) -> &StrongSecret> { + &self.master_encryption_key + } + pub async fn test_store( db_conf: T::Config, cache_conf: &redis_interface::RedisSettings, @@ -48,7 +104,7 @@ impl RouterStore { // TODO: create an error enum and return proper error here let db_store = T::new(db_conf, true).await; #[allow(clippy::expect_used)] - let cache_store = CacheStore::new(cache_conf) + let cache_store = RedisStore::new(cache_conf) .await .expect("Failed to create cache store"); Self { @@ -59,12 +115,39 @@ impl RouterStore { } } +#[derive(Debug, Clone)] pub struct KVRouterStore { router_store: RouterStore, drainer_stream_name: String, drainer_num_partitions: u8, } +#[async_trait::async_trait] +impl DatabaseStore for KVRouterStore +where + RouterStore: DatabaseStore, + T: DatabaseStore, +{ + type Config = (RouterStore, String, u8); + async fn new(config: Self::Config, _test_transaction: bool) -> Self { + let (router_store, drainer_stream_name, drainer_num_partitions) = config; + Self::from_store(router_store, drainer_stream_name, drainer_num_partitions) + } + fn get_master_pool(&self) -> &PgPool { + self.router_store.get_master_pool() + } + fn get_replica_pool(&self) -> &PgPool { + self.router_store.get_replica_pool() + } +} + +impl RedisConnInterface for KVRouterStore { + fn get_redis_conn( + &self, + ) -> error_stack::Result, RedisError> { + self.router_store.get_redis_conn() + } +} impl KVRouterStore { pub fn from_store( store: RouterStore, @@ -78,16 +161,19 @@ impl KVRouterStore { } } + pub fn master_key(&self) -> &StrongSecret> { + self.router_store.master_key() + } + pub fn get_drainer_stream_name(&self, shard_key: &str) -> String { format!("{{{}}}_{}", shard_key, self.drainer_stream_name) } - #[allow(dead_code)] - async fn push_to_drainer_stream( + pub async fn push_to_drainer_stream( &self, redis_entry: diesel_models::kv::TypedSql, partition_key: redis::kv_store::PartitionKey<'_>, - ) -> error_stack::Result<(), redis_interface::errors::RedisError> + ) -> error_stack::Result<(), RedisError> where R: crate::redis::kv_store::KvStorePartition, { @@ -101,9 +187,9 @@ impl KVRouterStore { &redis_interface::RedisEntryId::AutoGeneratedID, redis_entry .to_field_value_pairs() - .change_context(redis_interface::errors::RedisError::JsonSerializationFailed)?, + .change_context(RedisError::JsonSerializationFailed)?, ) .await - .change_context(redis_interface::errors::RedisError::StreamAppendFailed) + .change_context(RedisError::StreamAppendFailed) } } diff --git a/crates/storage_impl/src/payments.rs b/crates/storage_impl/src/payments.rs new file mode 100644 index 0000000000..edf5b191e1 --- /dev/null +++ b/crates/storage_impl/src/payments.rs @@ -0,0 +1,6 @@ +use diesel_models::{payment_attempt::PaymentAttempt, payment_intent::PaymentIntent}; + +use crate::redis::kv_store::KvStorePartition; + +impl KvStorePartition for PaymentIntent {} +impl KvStorePartition for PaymentAttempt {} diff --git a/crates/storage_impl/src/redis.rs b/crates/storage_impl/src/redis.rs index c60926596b..e4e0c021ac 100644 --- a/crates/storage_impl/src/redis.rs +++ b/crates/storage_impl/src/redis.rs @@ -1,16 +1,30 @@ +pub mod cache; pub mod kv_store; +pub mod pub_sub; -use std::sync::Arc; +use std::sync::{atomic, Arc}; use error_stack::{IntoReport, ResultExt}; use redis_interface::PubsubInterface; +use router_env::logger; -pub struct CacheStore { +use self::{kv_store::RedisConnInterface, pub_sub::PubSubInterface}; + +#[derive(Clone)] +pub struct RedisStore { // Maybe expose the redis_conn via traits instead of the making the field public pub(crate) redis_conn: Arc, } -impl CacheStore { +impl std::fmt::Debug for RedisStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CacheStore") + .field("redis_conn", &"Redis conn doesn't implement debug") + .finish() + } +} + +impl RedisStore { pub async fn new( conf: &redis_interface::RedisSettings, ) -> error_stack::Result { @@ -39,13 +53,31 @@ impl CacheStore { .into_report() .change_context(redis_interface::errors::RedisError::SubscribeError)?; - // TODO: Handle on message failures - // let redis_clone = self.redis_conn.clone(); - // tokio::spawn(async move { - // if let Err(e) = redis_clone.on_message().await { - // logger::error!(pubsub_err=?e); - // } - // }); + let redis_clone = self.redis_conn.clone(); + tokio::spawn(async move { + if let Err(e) = redis_clone.on_message().await { + logger::error!(pubsub_err=?e); + } + }); Ok(()) } } + +impl RedisConnInterface for RedisStore { + fn get_redis_conn( + &self, + ) -> error_stack::Result< + Arc, + redis_interface::errors::RedisError, + > { + if self + .redis_conn + .is_redis_available + .load(atomic::Ordering::SeqCst) + { + Ok(self.redis_conn.clone()) + } else { + Err(redis_interface::errors::RedisError::RedisConnectionError.into()) + } + } +} diff --git a/crates/router/src/cache.rs b/crates/storage_impl/src/redis/cache.rs similarity index 99% rename from crates/router/src/cache.rs rename to crates/storage_impl/src/redis/cache.rs index 3331e3eb71..7d4edaca13 100644 --- a/crates/router/src/cache.rs +++ b/crates/storage_impl/src/redis/cache.rs @@ -1,13 +1,12 @@ use std::{any::Any, borrow::Cow, sync::Arc}; +use common_utils::errors; use dyn_clone::DynClone; use error_stack::Report; use moka::future::Cache as MokaCache; use once_cell::sync::Lazy; use redis_interface::RedisValue; -use crate::core::errors; - /// Prefix for config cache key const CONFIG_CACHE_PREFIX: &str = "config"; diff --git a/crates/storage_impl/src/redis/kv_store.rs b/crates/storage_impl/src/redis/kv_store.rs index 61e2dafb6f..407cb838d8 100644 --- a/crates/storage_impl/src/redis/kv_store.rs +++ b/crates/storage_impl/src/redis/kv_store.rs @@ -1,4 +1,6 @@ -pub(crate) trait KvStorePartition { +use std::sync::Arc; + +pub trait KvStorePartition { fn partition_number(key: PartitionKey<'_>, num_partitions: u8) -> u32 { crc32fast::hash(key.to_string().as_bytes()) % u32::from(num_partitions) } @@ -9,7 +11,7 @@ pub(crate) trait KvStorePartition { } #[allow(unused)] -pub(crate) enum PartitionKey<'a> { +pub enum PartitionKey<'a> { MerchantIdPaymentId { merchant_id: &'a str, payment_id: &'a str, @@ -26,3 +28,12 @@ impl<'a> std::fmt::Display for PartitionKey<'a> { } } } + +pub trait RedisConnInterface { + fn get_redis_conn( + &self, + ) -> error_stack::Result< + Arc, + redis_interface::errors::RedisError, + >; +} diff --git a/crates/storage_impl/src/redis/pub_sub.rs b/crates/storage_impl/src/redis/pub_sub.rs new file mode 100644 index 0000000000..c00ed13fac --- /dev/null +++ b/crates/storage_impl/src/redis/pub_sub.rs @@ -0,0 +1,89 @@ +use error_stack::{IntoReport, ResultExt}; +use redis_interface::{errors as redis_errors, PubsubInterface, RedisValue}; +use router_env::logger; + +use crate::redis::cache::{CacheKind, ACCOUNTS_CACHE, CONFIG_CACHE}; + +#[async_trait::async_trait] +pub trait PubSubInterface { + async fn subscribe(&self, channel: &str) -> error_stack::Result<(), redis_errors::RedisError>; + + async fn publish<'a>( + &self, + channel: &str, + key: CacheKind<'a>, + ) -> error_stack::Result; + + async fn on_message(&self) -> error_stack::Result<(), redis_errors::RedisError>; +} + +#[async_trait::async_trait] +impl PubSubInterface for redis_interface::RedisConnectionPool { + #[inline] + async fn subscribe(&self, channel: &str) -> error_stack::Result<(), redis_errors::RedisError> { + // Spawns a task that will automatically re-subscribe to any channels or channel patterns used by the client. + self.subscriber.manage_subscriptions(); + + self.subscriber + .subscribe(channel) + .await + .into_report() + .change_context(redis_errors::RedisError::SubscribeError) + } + + #[inline] + async fn publish<'a>( + &self, + channel: &str, + key: CacheKind<'a>, + ) -> error_stack::Result { + self.publisher + .publish(channel, RedisValue::from(key).into_inner()) + .await + .into_report() + .change_context(redis_errors::RedisError::SubscribeError) + } + + #[inline] + async fn on_message(&self) -> error_stack::Result<(), redis_errors::RedisError> { + logger::debug!("Started on message"); + let mut rx = self.subscriber.on_message(); + while let Ok(message) = rx.recv().await { + logger::debug!("Invalidating {message:?}"); + let key: CacheKind<'_> = match RedisValue::new(message.value) + .try_into() + .change_context(redis_errors::RedisError::OnMessageError) + { + Ok(value) => value, + Err(err) => { + logger::error!(value_conversion_err=?err); + continue; + } + }; + + let key = match key { + CacheKind::Config(key) => { + CONFIG_CACHE.invalidate(key.as_ref()).await; + key + } + CacheKind::Accounts(key) => { + ACCOUNTS_CACHE.invalidate(key.as_ref()).await; + key + } + CacheKind::All(key) => { + CONFIG_CACHE.invalidate(key.as_ref()).await; + ACCOUNTS_CACHE.invalidate(key.as_ref()).await; + key + } + }; + + self.delete_key(key.as_ref()) + .await + .map_err(|err| logger::error!("Error while deleting redis key: {err:?}")) + .ok(); + + logger::debug!("Done invalidating {key}"); + } + Ok(()) + } +} diff --git a/crates/storage_impl/src/refund.rs b/crates/storage_impl/src/refund.rs new file mode 100644 index 0000000000..f8d94705be --- /dev/null +++ b/crates/storage_impl/src/refund.rs @@ -0,0 +1,5 @@ +use diesel_models::refund::Refund; + +use crate::redis::kv_store::KvStorePartition; + +impl KvStorePartition for Refund {}