mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-28 04:04:55 +08:00
feat: add unique constraint restriction for KV (#3723)
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
@ -15,11 +15,11 @@ use common_utils::{
|
||||
};
|
||||
use error_stack::{IntoReport, ResultExt};
|
||||
use fred::{
|
||||
interfaces::{HashesInterface, KeysInterface, StreamsInterface},
|
||||
interfaces::{HashesInterface, KeysInterface, SetsInterface, StreamsInterface},
|
||||
prelude::RedisErrorKind,
|
||||
types::{
|
||||
Expiration, FromRedis, MultipleIDs, MultipleKeys, MultipleOrderedPairs, MultipleStrings,
|
||||
RedisKey, RedisMap, RedisValue, Scanner, SetOptions, XCap, XReadResponse,
|
||||
MultipleValues, RedisKey, RedisMap, RedisValue, Scanner, SetOptions, XCap, XReadResponse,
|
||||
},
|
||||
};
|
||||
use futures::StreamExt;
|
||||
@ -27,7 +27,7 @@ use router_env::{instrument, logger, tracing};
|
||||
|
||||
use crate::{
|
||||
errors,
|
||||
types::{DelReply, HsetnxReply, MsetnxReply, RedisEntryId, SetnxReply},
|
||||
types::{DelReply, HsetnxReply, MsetnxReply, RedisEntryId, SaddReply, SetnxReply},
|
||||
};
|
||||
|
||||
impl super::RedisConnectionPool {
|
||||
@ -466,6 +466,23 @@ impl super::RedisConnectionPool {
|
||||
.change_context(errors::RedisError::JsonDeserializationFailed)
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn sadd<V>(
|
||||
&self,
|
||||
key: &str,
|
||||
members: V,
|
||||
) -> CustomResult<SaddReply, errors::RedisError>
|
||||
where
|
||||
V: TryInto<MultipleValues> + Debug + Send,
|
||||
V::Error: Into<fred::error::RedisError> + Send,
|
||||
{
|
||||
self.pool
|
||||
.sadd(key, members)
|
||||
.await
|
||||
.into_report()
|
||||
.change_context(errors::RedisError::SetAddMembersFailed)
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn stream_append_entry<F>(
|
||||
&self,
|
||||
|
||||
@ -50,6 +50,8 @@ pub enum RedisError {
|
||||
SetHashFailed,
|
||||
#[error("Failed to set hash field in Redis")]
|
||||
SetHashFieldFailed,
|
||||
#[error("Failed to add members to set in Redis")]
|
||||
SetAddMembersFailed,
|
||||
#[error("Failed to get hash field in Redis")]
|
||||
GetHashFieldFailed,
|
||||
#[error("The requested value was not found in Redis")]
|
||||
|
||||
@ -255,3 +255,22 @@ impl fred::types::FromRedis for DelReply {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum SaddReply {
|
||||
KeySet,
|
||||
KeyNotSet,
|
||||
}
|
||||
|
||||
impl fred::types::FromRedis for SaddReply {
|
||||
fn from_value(value: fred::types::RedisValue) -> Result<Self, fred::error::RedisError> {
|
||||
match value {
|
||||
fred::types::RedisValue::Integer(1) => Ok(Self::KeySet),
|
||||
fred::types::RedisValue::Integer(0) => Ok(Self::KeyNotSet),
|
||||
_ => Err(fred::error::RedisError::new(
|
||||
fred::error::RedisErrorKind::Unknown,
|
||||
"Unexpected sadd command reply",
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -165,10 +165,12 @@ impl RedisErrorExt for error_stack::Report<RedisError> {
|
||||
RedisError::NotFound => self.change_context(DataStorageError::ValueNotFound(format!(
|
||||
"Data does not exist for key {key}",
|
||||
))),
|
||||
RedisError::SetNxFailed => self.change_context(DataStorageError::DuplicateValue {
|
||||
RedisError::SetNxFailed | RedisError::SetAddMembersFailed => {
|
||||
self.change_context(DataStorageError::DuplicateValue {
|
||||
entity: "redis",
|
||||
key: Some(key.to_string()),
|
||||
}),
|
||||
})
|
||||
}
|
||||
_ => self.change_context(DataStorageError::KVError),
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,9 +19,10 @@ pub mod refund;
|
||||
mod reverse_lookup;
|
||||
mod utils;
|
||||
|
||||
use common_utils::errors::CustomResult;
|
||||
use database::store::PgPool;
|
||||
pub use mock_db::MockDb;
|
||||
use redis_interface::errors::RedisError;
|
||||
use redis_interface::{errors::RedisError, SaddReply};
|
||||
|
||||
pub use crate::database::store::DatabaseStore;
|
||||
|
||||
@ -259,3 +260,74 @@ pub(crate) fn diesel_error_to_data_error(
|
||||
_ => StorageError::DatabaseError(error_stack::report!(*diesel_error)),
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait UniqueConstraints {
|
||||
fn unique_constraints(&self) -> Vec<String>;
|
||||
fn table_name(&self) -> &str;
|
||||
async fn check_for_constraints(
|
||||
&self,
|
||||
redis_conn: &Arc<redis_interface::RedisConnectionPool>,
|
||||
) -> CustomResult<(), RedisError> {
|
||||
let constraints = self.unique_constraints();
|
||||
let sadd_result = redis_conn
|
||||
.sadd(
|
||||
&format!("unique_constraint:{}", self.table_name()),
|
||||
constraints,
|
||||
)
|
||||
.await?;
|
||||
|
||||
match sadd_result {
|
||||
SaddReply::KeyNotSet => Err(error_stack::report!(RedisError::SetAddMembersFailed)),
|
||||
SaddReply::KeySet => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UniqueConstraints for diesel_models::Address {
|
||||
fn unique_constraints(&self) -> Vec<String> {
|
||||
vec![format!("address_{}", self.address_id)]
|
||||
}
|
||||
fn table_name(&self) -> &str {
|
||||
"Address"
|
||||
}
|
||||
}
|
||||
|
||||
impl UniqueConstraints for diesel_models::PaymentIntent {
|
||||
fn unique_constraints(&self) -> Vec<String> {
|
||||
vec![format!("pi_{}_{}", self.merchant_id, self.payment_id)]
|
||||
}
|
||||
fn table_name(&self) -> &str {
|
||||
"PaymentIntent"
|
||||
}
|
||||
}
|
||||
|
||||
impl UniqueConstraints for diesel_models::PaymentAttempt {
|
||||
fn unique_constraints(&self) -> Vec<String> {
|
||||
vec![format!(
|
||||
"pa_{}_{}_{}",
|
||||
self.merchant_id, self.payment_id, self.attempt_id
|
||||
)]
|
||||
}
|
||||
fn table_name(&self) -> &str {
|
||||
"PaymentAttempt"
|
||||
}
|
||||
}
|
||||
|
||||
impl UniqueConstraints for diesel_models::Refund {
|
||||
fn unique_constraints(&self) -> Vec<String> {
|
||||
vec![format!("refund_{}_{}", self.merchant_id, self.refund_id)]
|
||||
}
|
||||
fn table_name(&self) -> &str {
|
||||
"Refund"
|
||||
}
|
||||
}
|
||||
|
||||
impl UniqueConstraints for diesel_models::ReverseLookup {
|
||||
fn unique_constraints(&self) -> Vec<String> {
|
||||
vec![format!("reverselookup_{}", self.lookup_id)]
|
||||
}
|
||||
fn table_name(&self) -> &str {
|
||||
"ReverseLookup"
|
||||
}
|
||||
}
|
||||
|
||||
@ -7,7 +7,7 @@ use router_derive::TryGetEnumVariant;
|
||||
use router_env::logger;
|
||||
use serde::de;
|
||||
|
||||
use crate::{metrics, store::kv::TypedSql, KVRouterStore};
|
||||
use crate::{metrics, store::kv::TypedSql, KVRouterStore, UniqueConstraints};
|
||||
|
||||
pub trait KvStorePartition {
|
||||
fn partition_number(key: PartitionKey<'_>, num_partitions: u8) -> u32 {
|
||||
@ -95,7 +95,7 @@ pub async fn kv_wrapper<'a, T, D, S>(
|
||||
where
|
||||
T: de::DeserializeOwned,
|
||||
D: crate::database::store::DatabaseStore,
|
||||
S: serde::Serialize + Debug + KvStorePartition,
|
||||
S: serde::Serialize + Debug + KvStorePartition + UniqueConstraints + Sync,
|
||||
{
|
||||
let redis_conn = store.get_redis_conn()?;
|
||||
|
||||
@ -147,6 +147,8 @@ where
|
||||
KvOperation::HSetNx(field, value, sql) => {
|
||||
logger::debug!(kv_operation= %operation, value = ?value);
|
||||
|
||||
value.check_for_constraints(&redis_conn).await?;
|
||||
|
||||
let result = redis_conn
|
||||
.serialize_and_set_hash_field_if_not_exist(key, field, value, Some(ttl))
|
||||
.await?;
|
||||
@ -168,6 +170,8 @@ where
|
||||
.serialize_and_set_key_if_not_exist(key, value, Some(ttl.into()))
|
||||
.await?;
|
||||
|
||||
value.check_for_constraints(&redis_conn).await?;
|
||||
|
||||
if matches!(result, redis_interface::SetnxReply::KeySet) {
|
||||
store
|
||||
.push_to_drainer_stream::<S>(sql, partition_key)
|
||||
|
||||
Reference in New Issue
Block a user