diff --git a/crates/redis_interface/src/commands.rs b/crates/redis_interface/src/commands.rs index 70d24f0493..eb02e9bc82 100644 --- a/crates/redis_interface/src/commands.rs +++ b/crates/redis_interface/src/commands.rs @@ -15,11 +15,11 @@ use common_utils::{ }; use error_stack::{IntoReport, ResultExt}; use fred::{ - interfaces::{HashesInterface, KeysInterface, StreamsInterface}, + interfaces::{HashesInterface, KeysInterface, SetsInterface, StreamsInterface}, prelude::RedisErrorKind, types::{ Expiration, FromRedis, MultipleIDs, MultipleKeys, MultipleOrderedPairs, MultipleStrings, - RedisKey, RedisMap, RedisValue, Scanner, SetOptions, XCap, XReadResponse, + MultipleValues, RedisKey, RedisMap, RedisValue, Scanner, SetOptions, XCap, XReadResponse, }, }; use futures::StreamExt; @@ -27,7 +27,7 @@ use router_env::{instrument, logger, tracing}; use crate::{ errors, - types::{DelReply, HsetnxReply, MsetnxReply, RedisEntryId, SetnxReply}, + types::{DelReply, HsetnxReply, MsetnxReply, RedisEntryId, SaddReply, SetnxReply}, }; impl super::RedisConnectionPool { @@ -466,6 +466,23 @@ impl super::RedisConnectionPool { .change_context(errors::RedisError::JsonDeserializationFailed) } + #[instrument(level = "DEBUG", skip(self))] + pub async fn sadd( + &self, + key: &str, + members: V, + ) -> CustomResult + where + V: TryInto + Debug + Send, + V::Error: Into + Send, + { + self.pool + .sadd(key, members) + .await + .into_report() + .change_context(errors::RedisError::SetAddMembersFailed) + } + #[instrument(level = "DEBUG", skip(self))] pub async fn stream_append_entry( &self, diff --git a/crates/redis_interface/src/errors.rs b/crates/redis_interface/src/errors.rs index 5289ec4fec..cc7f8bdd21 100644 --- a/crates/redis_interface/src/errors.rs +++ b/crates/redis_interface/src/errors.rs @@ -50,6 +50,8 @@ pub enum RedisError { SetHashFailed, #[error("Failed to set hash field in Redis")] SetHashFieldFailed, + #[error("Failed to add members to set in Redis")] + SetAddMembersFailed, #[error("Failed to get hash field in Redis")] GetHashFieldFailed, #[error("The requested value was not found in Redis")] diff --git a/crates/redis_interface/src/types.rs b/crates/redis_interface/src/types.rs index 364fcfabc1..4ebb620c36 100644 --- a/crates/redis_interface/src/types.rs +++ b/crates/redis_interface/src/types.rs @@ -255,3 +255,22 @@ impl fred::types::FromRedis for DelReply { } } } + +#[derive(Debug)] +pub enum SaddReply { + KeySet, + KeyNotSet, +} + +impl fred::types::FromRedis for SaddReply { + fn from_value(value: fred::types::RedisValue) -> Result { + match value { + fred::types::RedisValue::Integer(1) => Ok(Self::KeySet), + fred::types::RedisValue::Integer(0) => Ok(Self::KeyNotSet), + _ => Err(fred::error::RedisError::new( + fred::error::RedisErrorKind::Unknown, + "Unexpected sadd command reply", + )), + } + } +} diff --git a/crates/storage_impl/src/errors.rs b/crates/storage_impl/src/errors.rs index 9e3ef903f5..59ddacc5dc 100644 --- a/crates/storage_impl/src/errors.rs +++ b/crates/storage_impl/src/errors.rs @@ -165,10 +165,12 @@ impl RedisErrorExt for error_stack::Report { RedisError::NotFound => self.change_context(DataStorageError::ValueNotFound(format!( "Data does not exist for key {key}", ))), - RedisError::SetNxFailed => self.change_context(DataStorageError::DuplicateValue { - entity: "redis", - key: Some(key.to_string()), - }), + RedisError::SetNxFailed | RedisError::SetAddMembersFailed => { + self.change_context(DataStorageError::DuplicateValue { + entity: "redis", + key: Some(key.to_string()), + }) + } _ => self.change_context(DataStorageError::KVError), } } diff --git a/crates/storage_impl/src/lib.rs b/crates/storage_impl/src/lib.rs index d31f05b497..4fb940a063 100644 --- a/crates/storage_impl/src/lib.rs +++ b/crates/storage_impl/src/lib.rs @@ -19,9 +19,10 @@ pub mod refund; mod reverse_lookup; mod utils; +use common_utils::errors::CustomResult; use database::store::PgPool; pub use mock_db::MockDb; -use redis_interface::errors::RedisError; +use redis_interface::{errors::RedisError, SaddReply}; pub use crate::database::store::DatabaseStore; @@ -259,3 +260,74 @@ pub(crate) fn diesel_error_to_data_error( _ => StorageError::DatabaseError(error_stack::report!(*diesel_error)), } } + +#[async_trait::async_trait] +pub trait UniqueConstraints { + fn unique_constraints(&self) -> Vec; + fn table_name(&self) -> &str; + async fn check_for_constraints( + &self, + redis_conn: &Arc, + ) -> CustomResult<(), RedisError> { + let constraints = self.unique_constraints(); + let sadd_result = redis_conn + .sadd( + &format!("unique_constraint:{}", self.table_name()), + constraints, + ) + .await?; + + match sadd_result { + SaddReply::KeyNotSet => Err(error_stack::report!(RedisError::SetAddMembersFailed)), + SaddReply::KeySet => Ok(()), + } + } +} + +impl UniqueConstraints for diesel_models::Address { + fn unique_constraints(&self) -> Vec { + vec![format!("address_{}", self.address_id)] + } + fn table_name(&self) -> &str { + "Address" + } +} + +impl UniqueConstraints for diesel_models::PaymentIntent { + fn unique_constraints(&self) -> Vec { + vec![format!("pi_{}_{}", self.merchant_id, self.payment_id)] + } + fn table_name(&self) -> &str { + "PaymentIntent" + } +} + +impl UniqueConstraints for diesel_models::PaymentAttempt { + fn unique_constraints(&self) -> Vec { + vec![format!( + "pa_{}_{}_{}", + self.merchant_id, self.payment_id, self.attempt_id + )] + } + fn table_name(&self) -> &str { + "PaymentAttempt" + } +} + +impl UniqueConstraints for diesel_models::Refund { + fn unique_constraints(&self) -> Vec { + vec![format!("refund_{}_{}", self.merchant_id, self.refund_id)] + } + fn table_name(&self) -> &str { + "Refund" + } +} + +impl UniqueConstraints for diesel_models::ReverseLookup { + fn unique_constraints(&self) -> Vec { + vec![format!("reverselookup_{}", self.lookup_id)] + } + fn table_name(&self) -> &str { + "ReverseLookup" + } +} diff --git a/crates/storage_impl/src/redis/kv_store.rs b/crates/storage_impl/src/redis/kv_store.rs index ad57ad403d..310e6e2dc5 100644 --- a/crates/storage_impl/src/redis/kv_store.rs +++ b/crates/storage_impl/src/redis/kv_store.rs @@ -7,7 +7,7 @@ use router_derive::TryGetEnumVariant; use router_env::logger; use serde::de; -use crate::{metrics, store::kv::TypedSql, KVRouterStore}; +use crate::{metrics, store::kv::TypedSql, KVRouterStore, UniqueConstraints}; pub trait KvStorePartition { fn partition_number(key: PartitionKey<'_>, num_partitions: u8) -> u32 { @@ -95,7 +95,7 @@ pub async fn kv_wrapper<'a, T, D, S>( where T: de::DeserializeOwned, D: crate::database::store::DatabaseStore, - S: serde::Serialize + Debug + KvStorePartition, + S: serde::Serialize + Debug + KvStorePartition + UniqueConstraints + Sync, { let redis_conn = store.get_redis_conn()?; @@ -147,6 +147,8 @@ where KvOperation::HSetNx(field, value, sql) => { logger::debug!(kv_operation= %operation, value = ?value); + value.check_for_constraints(&redis_conn).await?; + let result = redis_conn .serialize_and_set_hash_field_if_not_exist(key, field, value, Some(ttl)) .await?; @@ -168,6 +170,8 @@ where .serialize_and_set_key_if_not_exist(key, value, Some(ttl.into())) .await?; + value.check_for_constraints(&redis_conn).await?; + if matches!(result, redis_interface::SetnxReply::KeySet) { store .push_to_drainer_stream::(sql, partition_key)