fix(redis_interface): fix derps in HSET and HSETNX command wrappers (#129)

This commit is contained in:
Sanchith Hegde
2022-12-13 18:14:19 +05:30
committed by GitHub
parent 529c28feea
commit 6d30989f59
7 changed files with 45 additions and 36 deletions

1
Cargo.lock generated
View File

@ -1292,7 +1292,6 @@ dependencies = [
"rand", "rand",
"redis-protocol", "redis-protocol",
"semver", "semver",
"serde_json",
"sha-1", "sha-1",
"tokio", "tokio",
"tokio-native-tls", "tokio-native-tls",

View File

@ -9,7 +9,7 @@ license = "Apache-2.0"
[dependencies] [dependencies]
error-stack = "0.2.4" error-stack = "0.2.4"
fred = { version = "5.2.0", features = ["metrics", "partial-tracing", "serde-json"] } fred = { version = "5.2.0", features = ["metrics", "partial-tracing"] }
serde = { version = "1.0.149", features = ["derive"] } serde = { version = "1.0.149", features = ["derive"] }
thiserror = "1.0.37" thiserror = "1.0.37"

View File

@ -20,7 +20,7 @@ use router_env::{tracing, tracing::instrument};
use crate::{ use crate::{
errors, errors,
types::{RedisEntryId, SetNXReply}, types::{HsetnxReply, RedisEntryId, SetnxReply},
}; };
impl super::RedisConnectionPool { impl super::RedisConnectionPool {
@ -142,7 +142,7 @@ impl super::RedisConnectionPool {
&self, &self,
key: &str, key: &str,
value: V, value: V,
) -> CustomResult<SetNXReply, errors::RedisError> ) -> CustomResult<SetnxReply, errors::RedisError>
where where
V: TryInto<RedisValue> + Debug, V: TryInto<RedisValue> + Debug,
V::Error: Into<fred::error::RedisError>, V::Error: Into<fred::error::RedisError>,
@ -203,28 +203,13 @@ impl super::RedisConnectionPool {
.change_context(errors::RedisError::SetHashFailed) .change_context(errors::RedisError::SetHashFailed)
} }
#[instrument(level = "DEBUG", skip(self))]
pub async fn serialize_and_set_hash_fields<V>(
&self,
key: &str,
values: V,
) -> CustomResult<(), errors::RedisError>
where
V: serde::Serialize + Debug,
{
let serialized = Encode::<V>::encode_to_value(&values)
.change_context(errors::RedisError::JsonSerializationFailed)?;
self.set_hash_fields(key, serialized).await
}
#[instrument(level = "DEBUG", skip(self))] #[instrument(level = "DEBUG", skip(self))]
pub async fn set_hash_field_if_not_exist<V>( pub async fn set_hash_field_if_not_exist<V>(
&self, &self,
key: &str, key: &str,
field: &str, field: &str,
value: V, value: V,
) -> CustomResult<SetNXReply, errors::RedisError> ) -> CustomResult<HsetnxReply, errors::RedisError>
where where
V: TryInto<RedisValue> + Debug, V: TryInto<RedisValue> + Debug,
V::Error: Into<fred::error::RedisError>, V::Error: Into<fred::error::RedisError>,
@ -242,7 +227,7 @@ impl super::RedisConnectionPool {
key: &str, key: &str,
field: &str, field: &str,
value: V, value: V,
) -> CustomResult<SetNXReply, errors::RedisError> ) -> CustomResult<HsetnxReply, errors::RedisError>
where where
V: serde::Serialize + Debug, V: serde::Serialize + Debug,
{ {

View File

@ -67,18 +67,18 @@ impl From<&RedisEntryId> for fred::types::XID {
} }
#[derive(Eq, PartialEq)] #[derive(Eq, PartialEq)]
pub enum SetNXReply { pub enum SetnxReply {
KeySet, KeySet,
KeyNotSet, // Existing key KeyNotSet, // Existing key
} }
impl fred::types::FromRedis for SetNXReply { impl fred::types::FromRedis for SetnxReply {
fn from_value(value: fred::types::RedisValue) -> Result<Self, fred::error::RedisError> { fn from_value(value: fred::types::RedisValue) -> Result<Self, fred::error::RedisError> {
match value { match value {
// Returns String ( "OK" ) in case of success // Returns String ( "OK" ) in case of success
fred::types::RedisValue::String(_) => Ok(SetNXReply::KeySet), fred::types::RedisValue::String(_) => Ok(Self::KeySet),
// Return Null in case of failure // Return Null in case of failure
fred::types::RedisValue::Null => Ok(SetNXReply::KeyNotSet), fred::types::RedisValue::Null => Ok(Self::KeyNotSet),
// Unexpected behaviour // Unexpected behaviour
_ => Err(fred::error::RedisError::new( _ => Err(fred::error::RedisError::new(
fred::error::RedisErrorKind::Unknown, fred::error::RedisErrorKind::Unknown,
@ -87,3 +87,22 @@ impl fred::types::FromRedis for SetNXReply {
} }
} }
} }
#[derive(Eq, PartialEq)]
pub enum HsetnxReply {
KeySet,
KeyNotSet, // Existing key
}
impl fred::types::FromRedis for HsetnxReply {
fn from_value(value: fred::types::RedisValue) -> Result<Self, fred::error::RedisError> {
match value {
fred::types::RedisValue::Integer(1) => Ok(Self::KeySet),
fred::types::RedisValue::Integer(0) => Ok(Self::KeyNotSet),
_ => Err(fred::error::RedisError::new(
fred::error::RedisErrorKind::Unknown,
"Unexpected HSETNX command reply",
)),
}
}
}

View File

@ -305,7 +305,7 @@ impl PaymentAttemptInterface for MockDb {
mod storage { mod storage {
use common_utils::date_time; use common_utils::date_time;
use error_stack::{IntoReport, ResultExt}; use error_stack::{IntoReport, ResultExt};
use redis_interface::{RedisEntryId, SetNXReply}; use redis_interface::{HsetnxReply, RedisEntryId};
use super::PaymentAttemptInterface; use super::PaymentAttemptInterface;
use crate::{ use crate::{
@ -377,11 +377,11 @@ mod storage {
.serialize_and_set_hash_field_if_not_exist(&key, "pa", &created_attempt) .serialize_and_set_hash_field_if_not_exist(&key, "pa", &created_attempt)
.await .await
{ {
Ok(SetNXReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue( Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue(
format!("Payment Attempt already exists for payment_id: {}", key), format!("Payment Attempt already exists for payment_id: {}", key),
)) ))
.into_report(), .into_report(),
Ok(SetNXReply::KeySet) => { Ok(HsetnxReply::KeySet) => {
let conn = pg_connection(&self.master_pool).await; let conn = pg_connection(&self.master_pool).await;
let query = payment_attempt let query = payment_attempt
.insert_query(&conn) .insert_query(&conn)
@ -430,9 +430,12 @@ mod storage {
let updated_attempt = payment_attempt.clone().apply_changeset(this.clone()); let updated_attempt = payment_attempt.clone().apply_changeset(this.clone());
// Check for database presence as well Maybe use a read replica here ? // Check for database presence as well Maybe use a read replica here ?
let redis_value = serde_json::to_string(&updated_attempt)
.into_report()
.change_context(errors::StorageError::KVError)?;
let updated_attempt = self let updated_attempt = self
.redis_conn .redis_conn
.serialize_and_set_hash_fields(&key, ("pa", &updated_attempt)) .set_hash_fields(&key, ("pa", &redis_value))
.await .await
.map(|_| updated_attempt) .map(|_| updated_attempt)
.change_context(errors::StorageError::KVError)?; .change_context(errors::StorageError::KVError)?;

View File

@ -41,7 +41,7 @@ pub trait PaymentIntentInterface {
mod storage { mod storage {
use common_utils::date_time; use common_utils::date_time;
use error_stack::{IntoReport, ResultExt}; use error_stack::{IntoReport, ResultExt};
use redis_interface::{RedisEntryId, SetNXReply}; use redis_interface::{HsetnxReply, RedisEntryId};
use super::PaymentIntentInterface; use super::PaymentIntentInterface;
use crate::{ use crate::{
@ -100,11 +100,11 @@ mod storage {
.serialize_and_set_hash_field_if_not_exist(&key, "pi", &created_intent) .serialize_and_set_hash_field_if_not_exist(&key, "pi", &created_intent)
.await .await
{ {
Ok(SetNXReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue( Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue(
format!("Payment Intent already exists for payment_id: {key}"), format!("Payment Intent already exists for payment_id: {key}"),
)) ))
.into_report(), .into_report(),
Ok(SetNXReply::KeySet) => { Ok(HsetnxReply::KeySet) => {
let conn = pg_connection(&self.master_pool).await; let conn = pg_connection(&self.master_pool).await;
let query = new let query = new
.insert_query(&conn) .insert_query(&conn)
@ -153,9 +153,12 @@ mod storage {
let updated_intent = payment_intent.clone().apply_changeset(this.clone()); let updated_intent = payment_intent.clone().apply_changeset(this.clone());
// Check for database presence as well Maybe use a read replica here ? // Check for database presence as well Maybe use a read replica here ?
let redis_value = serde_json::to_string(&updated_intent)
.into_report()
.change_context(errors::StorageError::KVError)?;
let updated_intent = self let updated_intent = self
.redis_conn .redis_conn
.serialize_and_set_hash_fields(&key, ("pi", &updated_intent)) .set_hash_fields(&key, ("pi", &redis_value))
.await .await
.map(|_| updated_intent) .map(|_| updated_intent)
.change_context(errors::StorageError::KVError)?; .change_context(errors::StorageError::KVError)?;

View File

@ -1,4 +1,4 @@
use redis_interface::{errors::RedisError, RedisEntryId, SetNXReply}; use redis_interface::{errors::RedisError, RedisEntryId, SetnxReply};
use router_env::logger; use router_env::logger;
use super::{MockDb, Store}; use super::{MockDb, Store};
@ -70,7 +70,7 @@ impl QueueInterface for Store {
let conn = self.redis_conn.clone(); let conn = self.redis_conn.clone();
let is_lock_acquired = conn.set_key_if_not_exist(lock_key, lock_val).await; let is_lock_acquired = conn.set_key_if_not_exist(lock_key, lock_val).await;
match is_lock_acquired { match is_lock_acquired {
Ok(SetNXReply::KeySet) => match conn.set_expiry(lock_key, ttl).await { Ok(SetnxReply::KeySet) => match conn.set_expiry(lock_key, ttl).await {
Ok(()) => true, Ok(()) => true,
#[allow(unused_must_use)] #[allow(unused_must_use)]
@ -80,7 +80,7 @@ impl QueueInterface for Store {
false false
} }
}, },
Ok(SetNXReply::KeyNotSet) => { Ok(SetnxReply::KeyNotSet) => {
logger::error!(%tag, "Lock not acquired, previous fetch still in progress"); logger::error!(%tag, "Lock not acquired, previous fetch still in progress");
false false
} }