use std::{fmt::Debug, sync::Arc}; use common_enums::enums::MerchantStorageScheme; use common_utils::{fallback_reverse_lookup_not_found, types::keymanager::KeyManagerState}; use diesel_models::{errors::DatabaseError, kv}; use error_stack::ResultExt; use hyperswitch_domain_models::{ behaviour::{Conversion, ReverseConversion}, merchant_key_store::MerchantKeyStore, }; #[cfg(not(feature = "payouts"))] use hyperswitch_domain_models::{PayoutAttemptInterface, PayoutsInterface}; use masking::StrongSecret; use redis_interface::{errors::RedisError, types::HsetnxReply, RedisConnectionPool}; use router_env::logger; use serde::de; #[cfg(not(feature = "payouts"))] pub use crate::database::store::Store; use crate::{ config::TenantConfig, database::store::PgPool, diesel_error_to_data_error, errors::{self, RedisErrorExt, StorageResult}, lookup::ReverseLookupInterface, metrics, redis::kv_store::{ decide_storage_scheme, kv_wrapper, KvOperation, KvStorePartition, Op, PartitionKey, RedisConnInterface, }, utils::{find_all_combined_kv_database, try_redis_get_else_try_database_get}, RouterStore, UniqueConstraints, }; pub use crate::{database::store::DatabaseStore, mock_db::MockDb}; #[derive(Debug, Clone)] pub struct KVRouterStore { pub router_store: RouterStore, drainer_stream_name: String, drainer_num_partitions: u8, pub ttl_for_kv: u32, pub request_id: Option, pub soft_kill_mode: bool, } pub struct InsertResourceParams<'a> { pub insertable: kv::Insertable, pub reverse_lookups: Vec, pub key: PartitionKey<'a>, // secondary key pub identifier: String, // type of resource Eg: "payment_attempt" pub resource_type: &'static str, } pub struct UpdateResourceParams<'a> { pub updateable: kv::Updateable, pub operation: Op<'a>, } pub struct FilterResourceParams<'a> { pub key: PartitionKey<'a>, pub pattern: &'static str, pub limit: Option, } pub enum FindResourceBy<'a> { Id(String, PartitionKey<'a>), LookupId(String), } pub trait DomainType: Debug + Sync + Conversion {} impl DomainType for T {} /// Storage model with all required capabilities for KV operations pub trait StorageModel: de::DeserializeOwned + serde::Serialize + Debug + KvStorePartition + UniqueConstraints + Sync + Send + ReverseConversion { } impl StorageModel for T where T: de::DeserializeOwned + serde::Serialize + Debug + KvStorePartition + UniqueConstraints + Sync + Send + ReverseConversion, D: DomainType, { } #[async_trait::async_trait] impl DatabaseStore for KVRouterStore where RouterStore: DatabaseStore, T: DatabaseStore, { type Config = (RouterStore, String, u8, u32, Option); async fn new( config: Self::Config, tenant_config: &dyn TenantConfig, _test_transaction: bool, ) -> StorageResult { let (router_store, _, drainer_num_partitions, ttl_for_kv, soft_kill_mode) = config; let drainer_stream_name = format!("{}_{}", tenant_config.get_schema(), config.1); Ok(Self::from_store( router_store, drainer_stream_name, drainer_num_partitions, ttl_for_kv, soft_kill_mode, )) } fn get_master_pool(&self) -> &PgPool { self.router_store.get_master_pool() } fn get_replica_pool(&self) -> &PgPool { self.router_store.get_replica_pool() } fn get_accounts_master_pool(&self) -> &PgPool { self.router_store.get_accounts_master_pool() } fn get_accounts_replica_pool(&self) -> &PgPool { self.router_store.get_accounts_replica_pool() } } impl RedisConnInterface for KVRouterStore { fn get_redis_conn(&self) -> error_stack::Result, RedisError> { self.router_store.get_redis_conn() } } impl KVRouterStore { pub fn from_store( store: RouterStore, drainer_stream_name: String, drainer_num_partitions: u8, ttl_for_kv: u32, soft_kill: Option, ) -> Self { let request_id = store.request_id.clone(); Self { router_store: store, drainer_stream_name, drainer_num_partitions, ttl_for_kv, request_id, soft_kill_mode: soft_kill.unwrap_or(false), } } pub fn master_key(&self) -> &StrongSecret> { self.router_store.master_key() } pub fn get_drainer_stream_name(&self, shard_key: &str) -> String { format!("{{{}}}_{}", shard_key, self.drainer_stream_name) } pub async fn push_to_drainer_stream( &self, redis_entry: kv::TypedSql, partition_key: PartitionKey<'_>, ) -> error_stack::Result<(), RedisError> where R: KvStorePartition, { let global_id = format!("{partition_key}"); let request_id = self.request_id.clone().unwrap_or_default(); let shard_key = R::shard_key(partition_key, self.drainer_num_partitions); let stream_name = self.get_drainer_stream_name(&shard_key); self.router_store .cache_store .redis_conn .stream_append_entry( &stream_name.into(), &redis_interface::RedisEntryId::AutoGeneratedID, redis_entry .to_field_value_pairs(request_id, global_id) .change_context(RedisError::JsonSerializationFailed)?, ) .await .map(|_| metrics::KV_PUSHED_TO_DRAINER.add(1, &[])) .inspect_err(|error| { metrics::KV_FAILED_TO_PUSH_TO_DRAINER.add(1, &[]); logger::error!(?error, "Failed to add entry in drainer stream"); }) .change_context(RedisError::StreamAppendFailed) } pub async fn find_resource_by_id( &self, state: &KeyManagerState, key_store: &MerchantKeyStore, storage_scheme: MerchantStorageScheme, find_resource_db_fut: R, find_by: FindResourceBy<'_>, ) -> error_stack::Result where D: DomainType, M: StorageModel, R: futures::Future> + Send, { let database_call = || async { find_resource_db_fut.await.map_err(|error| { let new_err = diesel_error_to_data_error(*error.current_context()); error.change_context(new_err) }) }; let storage_scheme = Box::pin(decide_storage_scheme::( self, storage_scheme, Op::Find, )) .await; let res = || async { match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { let (field, key) = match find_by { FindResourceBy::Id(field, key) => (field, key), FindResourceBy::LookupId(lookup_id) => { let lookup = fallback_reverse_lookup_not_found!( self.get_lookup_by_lookup_id(&lookup_id, storage_scheme) .await, database_call().await ); ( lookup.clone().sk_id, PartitionKey::CombinationKey { combination: &lookup.clone().pk_id, }, ) } }; Box::pin(try_redis_get_else_try_database_get( async { Box::pin(kv_wrapper(self, KvOperation::::HGet(&field), key)) .await? .try_into_hget() }, database_call, )) .await } } }; res() .await? .convert( state, key_store.key.get_inner(), key_store.merchant_id.clone().into(), ) .await .change_context(errors::StorageError::DecryptionError) } pub async fn find_optional_resource_by_id( &self, state: &KeyManagerState, key_store: &MerchantKeyStore, storage_scheme: MerchantStorageScheme, find_resource_db_fut: R, find_by: FindResourceBy<'_>, ) -> error_stack::Result, errors::StorageError> where D: DomainType, M: StorageModel, R: futures::Future, DatabaseError>> + Send, { let database_call = || async { find_resource_db_fut.await.map_err(|error| { let new_err = diesel_error_to_data_error(*error.current_context()); error.change_context(new_err) }) }; let storage_scheme = Box::pin(decide_storage_scheme::( self, storage_scheme, Op::Find, )) .await; let res = || async { match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { let (field, key) = match find_by { FindResourceBy::Id(field, key) => (field, key), FindResourceBy::LookupId(lookup_id) => { let lookup = fallback_reverse_lookup_not_found!( self.get_lookup_by_lookup_id(&lookup_id, storage_scheme) .await, database_call().await ); ( lookup.clone().sk_id, PartitionKey::CombinationKey { combination: &lookup.clone().pk_id, }, ) } }; Box::pin(try_redis_get_else_try_database_get( async { Box::pin(kv_wrapper(self, KvOperation::::HGet(&field), key)) .await? .try_into_hget() .map(Some) }, database_call, )) .await } } }; match res().await? { Some(resource) => Ok(Some( resource .convert( state, key_store.key.get_inner(), key_store.merchant_id.clone().into(), ) .await .change_context(errors::StorageError::DecryptionError)?, )), None => Ok(None), } } pub async fn insert_resource( &self, state: &KeyManagerState, key_store: &MerchantKeyStore, storage_scheme: MerchantStorageScheme, create_resource_fut: R, resource_new: M, InsertResourceParams { insertable, reverse_lookups, key, identifier, resource_type, }: InsertResourceParams<'_>, ) -> error_stack::Result where D: Debug + Sync + Conversion, M: StorageModel, R: futures::Future> + Send, { let storage_scheme = Box::pin(decide_storage_scheme::<_, M>( self, storage_scheme, Op::Insert, )) .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => create_resource_fut.await.map_err(|error| { let new_err = diesel_error_to_data_error(*error.current_context()); error.change_context(new_err) }), MerchantStorageScheme::RedisKv => { let key_str = key.to_string(); let reverse_lookup_entry = |v: String| diesel_models::ReverseLookupNew { sk_id: identifier.clone(), pk_id: key_str.clone(), lookup_id: v, source: resource_type.to_string(), updated_by: storage_scheme.to_string(), }; let results = reverse_lookups .into_iter() .map(|v| self.insert_reverse_lookup(reverse_lookup_entry(v), storage_scheme)); futures::future::try_join_all(results).await?; let redis_entry = kv::TypedSql { op: kv::DBOperation::Insert { insertable: Box::new(insertable), }, }; match Box::pin(kv_wrapper::( self, KvOperation::::HSetNx(&identifier, &resource_new, redis_entry), key.clone(), )) .await .map_err(|err| err.to_redis_failed_response(&key.to_string()))? .try_into_hsetnx() { Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue { entity: resource_type, key: Some(key_str), } .into()), Ok(HsetnxReply::KeySet) => Ok(resource_new), Err(er) => Err(er).change_context(errors::StorageError::KVError), } } }? .convert( state, key_store.key.get_inner(), key_store.merchant_id.clone().into(), ) .await .change_context(errors::StorageError::DecryptionError) } pub async fn update_resource( &self, state: &KeyManagerState, key_store: &MerchantKeyStore, storage_scheme: MerchantStorageScheme, update_resource_fut: R, updated_resource: M, UpdateResourceParams { updateable, operation, }: UpdateResourceParams<'_>, ) -> error_stack::Result where D: Debug + Sync + Conversion, M: StorageModel, R: futures::Future> + Send, { match operation { Op::Update(key, field, updated_by) => { let storage_scheme = Box::pin(decide_storage_scheme::<_, M>( self, storage_scheme, Op::Update(key.clone(), field, updated_by), )) .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { update_resource_fut.await.map_err(|error| { let new_err = diesel_error_to_data_error(*error.current_context()); error.change_context(new_err) }) } MerchantStorageScheme::RedisKv => { let key_str = key.to_string(); let redis_value = serde_json::to_string(&updated_resource) .change_context(errors::StorageError::SerializationFailed)?; let redis_entry = kv::TypedSql { op: kv::DBOperation::Update { updatable: Box::new(updateable), }, }; Box::pin(kv_wrapper::<(), _, _>( self, KvOperation::::Hset((field, redis_value), redis_entry), key, )) .await .map_err(|err| err.to_redis_failed_response(&key_str))? .try_into_hset() .change_context(errors::StorageError::KVError)?; Ok(updated_resource) } } } _ => Err(errors::StorageError::KVError.into()), }? .convert( state, key_store.key.get_inner(), key_store.merchant_id.clone().into(), ) .await .change_context(errors::StorageError::DecryptionError) } pub async fn filter_resources( &self, state: &KeyManagerState, key_store: &MerchantKeyStore, storage_scheme: MerchantStorageScheme, filter_resource_db_fut: R, filter_fn: impl Fn(&M) -> bool, FilterResourceParams { key, pattern, limit, }: FilterResourceParams<'_>, ) -> error_stack::Result, errors::StorageError> where D: Debug + Sync + Conversion, M: StorageModel, R: futures::Future, DatabaseError>> + Send, { let db_call = || async { filter_resource_db_fut.await.map_err(|error| { let new_err = diesel_error_to_data_error(*error.current_context()); error.change_context(new_err) }) }; let resources = match storage_scheme { MerchantStorageScheme::PostgresOnly => db_call().await, MerchantStorageScheme::RedisKv => { let redis_fut = async { let kv_result = Box::pin(kv_wrapper::( self, KvOperation::::Scan(pattern), key, )) .await? .try_into_scan(); kv_result.map(|records| records.into_iter().filter(filter_fn).collect()) }; Box::pin(find_all_combined_kv_database(redis_fut, db_call, limit)).await } }?; let resource_futures = resources .into_iter() .map(|pm| async { pm.convert( state, key_store.key.get_inner(), key_store.merchant_id.clone().into(), ) .await .change_context(errors::StorageError::DecryptionError) }) .collect::>(); futures::future::try_join_all(resource_futures).await } } #[cfg(not(feature = "payouts"))] impl PayoutAttemptInterface for KVRouterStore {} #[cfg(not(feature = "payouts"))] impl PayoutsInterface for KVRouterStore {}