feat: add timeout for set command on hashes and add function to allow retry in database (#509)

This commit is contained in:
Nishant Joshi
2023-02-07 16:28:39 +05:30
committed by GitHub
parent b4da08666a
commit 2e98670aa7
9 changed files with 169 additions and 119 deletions

View File

@ -43,6 +43,7 @@ pool_size = 5 # Number of connections to keep open
reconnect_max_attempts = 5 # Maximum number of reconnection attempts to make before failing. Set to 0 to retry forever. reconnect_max_attempts = 5 # Maximum number of reconnection attempts to make before failing. Set to 0 to retry forever.
reconnect_delay = 5 # Delay between reconnection attempts, in milliseconds reconnect_delay = 5 # Delay between reconnection attempts, in milliseconds
default_ttl = 300 # Default TTL for entries, in seconds default_ttl = 300 # Default TTL for entries, in seconds
default_hash_ttl = 900 # Default TTL for hashes entries, in seconds
use_legacy_version = false # Resp protocol for fred crate (set this to true if using RESPv2 or redis version < 6) use_legacy_version = false # Resp protocol for fred crate (set this to true if using RESPv2 or redis version < 6)
stream_read_count = 1 # Default number of entries to read from stream if not provided in stream read options stream_read_count = 1 # Default number of entries to read from stream if not provided in stream read options

View File

@ -10,7 +10,7 @@ use std::fmt::Debug;
use common_utils::{ use common_utils::{
errors::CustomResult, errors::CustomResult,
ext_traits::{ByteSliceExt, Encode, StringExt}, ext_traits::{AsyncExt, ByteSliceExt, Encode, StringExt},
fp_utils, fp_utils,
}; };
use error_stack::{IntoReport, ResultExt}; use error_stack::{IntoReport, ResultExt};
@ -207,11 +207,17 @@ impl super::RedisConnectionPool {
V: TryInto<RedisMap> + Debug, V: TryInto<RedisMap> + Debug,
V::Error: Into<fred::error::RedisError>, V::Error: Into<fred::error::RedisError>,
{ {
self.pool let output: Result<(), _> = self
.pool
.hset(key, values) .hset(key, values)
.await .await
.into_report() .into_report()
.change_context(errors::RedisError::SetHashFailed) .change_context(errors::RedisError::SetHashFailed);
// setting expiry for the key
output
.async_and_then(|_| self.set_expiry(key, self.config.default_hash_ttl.into()))
.await
} }
#[instrument(level = "DEBUG", skip(self))] #[instrument(level = "DEBUG", skip(self))]
@ -225,11 +231,20 @@ impl super::RedisConnectionPool {
V: TryInto<RedisValue> + Debug, V: TryInto<RedisValue> + Debug,
V::Error: Into<fred::error::RedisError>, V::Error: Into<fred::error::RedisError>,
{ {
self.pool let output: Result<HsetnxReply, _> = self
.pool
.hsetnx(key, field, value) .hsetnx(key, field, value)
.await .await
.into_report() .into_report()
.change_context(errors::RedisError::SetHashFieldFailed) .change_context(errors::RedisError::SetHashFieldFailed);
output
.async_and_then(|inner| async {
self.set_expiry(key, self.config.default_hash_ttl.into())
.await?;
Ok(inner)
})
.await
} }
#[instrument(level = "DEBUG", skip(self))] #[instrument(level = "DEBUG", skip(self))]

View File

@ -100,6 +100,7 @@ impl RedisConnectionPool {
struct RedisConfig { struct RedisConfig {
default_ttl: u32, default_ttl: u32,
default_stream_read_count: u64, default_stream_read_count: u64,
default_hash_ttl: u32,
} }
impl From<&RedisSettings> for RedisConfig { impl From<&RedisSettings> for RedisConfig {
@ -107,6 +108,7 @@ impl From<&RedisSettings> for RedisConfig {
Self { Self {
default_ttl: config.default_ttl, default_ttl: config.default_ttl,
default_stream_read_count: config.stream_read_count, default_stream_read_count: config.stream_read_count,
default_hash_ttl: config.default_hash_ttl,
} }
} }
} }

View File

@ -22,6 +22,8 @@ pub struct RedisSettings {
pub reconnect_delay: u32, pub reconnect_delay: u32,
/// TTL in seconds /// TTL in seconds
pub default_ttl: u32, pub default_ttl: u32,
/// TTL for hash-tables in seconds
pub default_hash_ttl: u32,
pub stream_read_count: u64, pub stream_read_count: u64,
} }
@ -59,6 +61,7 @@ impl Default for RedisSettings {
reconnect_delay: 5, reconnect_delay: 5,
default_ttl: 300, default_ttl: 300,
stream_read_count: 1, stream_read_count: 1,
default_hash_ttl: 900,
} }
} }
} }

View File

@ -320,10 +320,11 @@ mod storage {
use super::PaymentAttemptInterface; use super::PaymentAttemptInterface;
use crate::{ use crate::{
connection::pg_connection, connection::pg_connection,
core::errors::{self, utils::RedisErrorExt, CustomResult}, core::errors::{self, CustomResult},
db::reverse_lookup::ReverseLookupInterface, db::reverse_lookup::ReverseLookupInterface,
services::Store, services::Store,
types::storage::{enums, kv, payment_attempt::*, ReverseLookupNew}, types::storage::{enums, kv, payment_attempt::*, ReverseLookupNew},
utils::db_utils,
}; };
#[async_trait::async_trait] #[async_trait::async_trait]
@ -517,15 +518,15 @@ mod storage {
merchant_id: &str, merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme, storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> { ) -> CustomResult<PaymentAttempt, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await;
PaymentAttempt::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id)
.await
.map_err(Into::into)
.into_report()
};
match storage_scheme { match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => { enums::MerchantStorageScheme::PostgresOnly => database_call().await,
let conn = pg_connection(&self.master_pool).await;
PaymentAttempt::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id)
.await
.map_err(Into::into)
.into_report()
}
enums::MerchantStorageScheme::RedisKv => { enums::MerchantStorageScheme::RedisKv => {
// [#439]: get the attempt_id from payment_intent // [#439]: get the attempt_id from payment_intent
let key = format!("{merchant_id}_{payment_id}"); let key = format!("{merchant_id}_{payment_id}");
@ -534,15 +535,16 @@ mod storage {
.await .await
.map_err(Into::<errors::StorageError>::into) .map_err(Into::<errors::StorageError>::into)
.into_report()?; .into_report()?;
self.redis_conn
.get_hash_field_and_deserialize::<PaymentAttempt>( db_utils::try_redis_get_else_try_database_get(
self.redis_conn.get_hash_field_and_deserialize(
&lookup.pk_id, &lookup.pk_id,
&lookup.sk_id, &lookup.sk_id,
"PaymentAttempt", "PaymentAttempt",
) ),
.await database_call,
.map_err(|error| error.to_redis_failed_response(&key)) )
// Check for database presence as well Maybe use a read replica here ? .await
} }
} }
} }
@ -554,19 +556,20 @@ mod storage {
merchant_id: &str, merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme, storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> { ) -> CustomResult<PaymentAttempt, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await;
PaymentAttempt::find_by_connector_transaction_id_payment_id_merchant_id(
&conn,
connector_transaction_id,
payment_id,
merchant_id,
)
.await
.map_err(Into::into)
.into_report()
};
match storage_scheme { match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => { enums::MerchantStorageScheme::PostgresOnly => database_call().await,
let conn = pg_connection(&self.master_pool).await;
PaymentAttempt::find_by_connector_transaction_id_payment_id_merchant_id(
&conn,
connector_transaction_id,
payment_id,
merchant_id,
)
.await
.map_err(Into::into)
.into_report()
}
enums::MerchantStorageScheme::RedisKv => { enums::MerchantStorageScheme::RedisKv => {
// We assume that PaymentAttempt <=> PaymentIntent is a one-to-one relation for now // We assume that PaymentAttempt <=> PaymentIntent is a one-to-one relation for now
let lookup_id = format!("{merchant_id}_{connector_transaction_id}"); let lookup_id = format!("{merchant_id}_{connector_transaction_id}");
@ -576,14 +579,16 @@ mod storage {
.map_err(Into::<errors::StorageError>::into) .map_err(Into::<errors::StorageError>::into)
.into_report()?; .into_report()?;
let key = &lookup.pk_id; let key = &lookup.pk_id;
self.redis_conn
.get_hash_field_and_deserialize::<PaymentAttempt>( db_utils::try_redis_get_else_try_database_get(
self.redis_conn.get_hash_field_and_deserialize(
key, key,
&lookup.sk_id, &lookup.sk_id,
"PaymentAttempt", "PaymentAttempt",
) ),
.await database_call,
.map_err(|error| error.to_redis_failed_response(key)) )
.await
} }
} }
} }
@ -615,18 +620,19 @@ mod storage {
connector_txn_id: &str, connector_txn_id: &str,
storage_scheme: enums::MerchantStorageScheme, storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> { ) -> CustomResult<PaymentAttempt, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await;
PaymentAttempt::find_by_merchant_id_connector_txn_id(
&conn,
merchant_id,
connector_txn_id,
)
.await
.map_err(Into::into)
.into_report()
};
match storage_scheme { match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => { enums::MerchantStorageScheme::PostgresOnly => database_call().await,
let conn = pg_connection(&self.master_pool).await;
PaymentAttempt::find_by_merchant_id_connector_txn_id(
&conn,
merchant_id,
connector_txn_id,
)
.await
.map_err(Into::into)
.into_report()
}
enums::MerchantStorageScheme::RedisKv => { enums::MerchantStorageScheme::RedisKv => {
let lookup_id = format!("{merchant_id}_{connector_txn_id}"); let lookup_id = format!("{merchant_id}_{connector_txn_id}");
@ -637,14 +643,15 @@ mod storage {
.into_report()?; .into_report()?;
let key = &lookup.pk_id; let key = &lookup.pk_id;
self.redis_conn db_utils::try_redis_get_else_try_database_get(
.get_hash_field_and_deserialize::<PaymentAttempt>( self.redis_conn.get_hash_field_and_deserialize(
key, key,
&lookup.sk_id, &lookup.sk_id,
"PaymentAttempt", "PaymentAttempt",
) ),
.await database_call,
.map_err(|error| error.to_redis_failed_response(key)) )
.await
} }
} }
} }
@ -655,14 +662,15 @@ mod storage {
attempt_id: &str, attempt_id: &str,
storage_scheme: enums::MerchantStorageScheme, storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> { ) -> CustomResult<PaymentAttempt, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await;
PaymentAttempt::find_by_merchant_id_attempt_id(&conn, merchant_id, attempt_id)
.await
.map_err(Into::into)
.into_report()
};
match storage_scheme { match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => { enums::MerchantStorageScheme::PostgresOnly => database_call().await,
let conn = pg_connection(&self.master_pool).await;
PaymentAttempt::find_by_merchant_id_attempt_id(&conn, merchant_id, attempt_id)
.await
.map_err(Into::into)
.into_report()
}
enums::MerchantStorageScheme::RedisKv => { enums::MerchantStorageScheme::RedisKv => {
let lookup_id = format!("{merchant_id}_{attempt_id}"); let lookup_id = format!("{merchant_id}_{attempt_id}");
@ -672,14 +680,15 @@ mod storage {
.map_err(Into::<errors::StorageError>::into) .map_err(Into::<errors::StorageError>::into)
.into_report()?; .into_report()?;
let key = &lookup.pk_id; let key = &lookup.pk_id;
self.redis_conn db_utils::try_redis_get_else_try_database_get(
.get_hash_field_and_deserialize::<PaymentAttempt>( self.redis_conn.get_hash_field_and_deserialize(
key, key,
&lookup.sk_id, &lookup.sk_id,
"PaymentAttempt", "PaymentAttempt",
) ),
.await database_call,
.map_err(|error| error.to_redis_failed_response(key)) )
.await
} }
} }
} }

View File

@ -51,7 +51,7 @@ mod storage {
core::errors::{self, CustomResult}, core::errors::{self, CustomResult},
services::Store, services::Store,
types::storage::{enums, kv, payment_intent::*}, types::storage::{enums, kv, payment_intent::*},
utils::{self, storage_partitioning}, utils::{self, db_utils, storage_partitioning},
}; };
#[async_trait::async_trait] #[async_trait::async_trait]
@ -188,32 +188,24 @@ mod storage {
merchant_id: &str, merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme, storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError> { ) -> CustomResult<PaymentIntent, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await;
PaymentIntent::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id)
.await
.map_err(Into::into)
.into_report()
};
match storage_scheme { match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => { enums::MerchantStorageScheme::PostgresOnly => database_call().await,
let conn = pg_connection(&self.master_pool).await;
PaymentIntent::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id)
.await
.map_err(Into::into)
.into_report()
}
enums::MerchantStorageScheme::RedisKv => { enums::MerchantStorageScheme::RedisKv => {
let key = format!("{merchant_id}_{payment_id}"); let key = format!("{merchant_id}_{payment_id}");
self.redis_conn db_utils::try_redis_get_else_try_database_get(
.get_hash_field_and_deserialize::<PaymentIntent>( self.redis_conn
&key, .get_hash_field_and_deserialize(&key, "pi", "PaymentIntent"),
"pi", database_call,
"PaymentIntent", )
) .await
.await
.map_err(|error| match error.current_context() {
errors::RedisError::NotFound => errors::StorageError::ValueNotFound(
format!("Payment Intent does not exist for {key}"),
)
.into(),
_ => error.change_context(errors::StorageError::KVError),
})
// Check for database presence as well Maybe use a read replica here ?
} }
} }
} }

View File

@ -204,7 +204,7 @@ mod storage {
use super::RefundInterface; use super::RefundInterface;
use crate::{ use crate::{
connection::pg_connection, connection::pg_connection,
core::errors::{self, utils::RedisErrorExt, CustomResult}, core::errors::{self, CustomResult},
db::reverse_lookup::ReverseLookupInterface, db::reverse_lookup::ReverseLookupInterface,
logger, logger,
services::Store, services::Store,
@ -219,18 +219,19 @@ mod storage {
merchant_id: &str, merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme, storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> { ) -> CustomResult<storage_types::Refund, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await;
storage_types::Refund::find_by_internal_reference_id_merchant_id(
&conn,
internal_reference_id,
merchant_id,
)
.await
.map_err(Into::into)
.into_report()
};
match storage_scheme { match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => { enums::MerchantStorageScheme::PostgresOnly => database_call().await,
let conn = pg_connection(&self.master_pool).await;
storage_types::Refund::find_by_internal_reference_id_merchant_id(
&conn,
internal_reference_id,
merchant_id,
)
.await
.map_err(Into::into)
.into_report()
}
enums::MerchantStorageScheme::RedisKv => { enums::MerchantStorageScheme::RedisKv => {
let lookup_id = format!("{merchant_id}_{internal_reference_id}"); let lookup_id = format!("{merchant_id}_{internal_reference_id}");
let lookup = self let lookup = self
@ -240,14 +241,15 @@ mod storage {
.into_report()?; .into_report()?;
let key = &lookup.pk_id; let key = &lookup.pk_id;
self.redis_conn db_utils::try_redis_get_else_try_database_get(
.get_hash_field_and_deserialize::<storage_types::Refund>( self.redis_conn.get_hash_field_and_deserialize(
key, key,
&lookup.sk_id, &lookup.sk_id,
"Refund", "Refund",
) ),
.await database_call,
.map_err(|error| error.to_redis_failed_response(key)) )
.await
} }
} }
} }
@ -463,18 +465,15 @@ mod storage {
refund_id: &str, refund_id: &str,
storage_scheme: enums::MerchantStorageScheme, storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> { ) -> CustomResult<storage_types::Refund, errors::StorageError> {
match storage_scheme { let database_call = || async {
enums::MerchantStorageScheme::PostgresOnly => { let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await; storage_types::Refund::find_by_merchant_id_refund_id(&conn, merchant_id, refund_id)
storage_types::Refund::find_by_merchant_id_refund_id(
&conn,
merchant_id,
refund_id,
)
.await .await
.map_err(Into::into) .map_err(Into::into)
.into_report() .into_report()
} };
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => database_call().await,
enums::MerchantStorageScheme::RedisKv => { enums::MerchantStorageScheme::RedisKv => {
let lookup_id = format!("{merchant_id}_{refund_id}"); let lookup_id = format!("{merchant_id}_{refund_id}");
let lookup = self let lookup = self
@ -484,14 +483,15 @@ mod storage {
.into_report()?; .into_report()?;
let key = &lookup.pk_id; let key = &lookup.pk_id;
self.redis_conn db_utils::try_redis_get_else_try_database_get(
.get_hash_field_and_deserialize::<storage_types::Refund>( self.redis_conn.get_hash_field_and_deserialize(
key, key,
&lookup.sk_id, &lookup.sk_id,
"Refund", "Refund",
) ),
.await database_call,
.map_err(|error| error.to_redis_failed_response(key)) )
.await
} }
} }
} }

View File

@ -10,3 +10,6 @@ static GLOBAL_METER: Lazy<Meter> = Lazy::new(|| global::meter("ROUTER_API"));
pub(crate) static HEALTH_METRIC: Lazy<Counter<u64>> = pub(crate) static HEALTH_METRIC: Lazy<Counter<u64>> =
Lazy::new(|| GLOBAL_METER.u64_counter("HEALTH_API").init()); Lazy::new(|| GLOBAL_METER.u64_counter("HEALTH_API").init());
pub(crate) static KV_MISS: Lazy<Counter<u64>> =
Lazy::new(|| GLOBAL_METER.u64_counter("KV_MISS").init());

View File

@ -1,3 +1,5 @@
use crate::{core::errors, routes::metrics};
#[cfg(feature = "kv_store")] #[cfg(feature = "kv_store")]
/// Generates hscan field pattern. Suppose the field is pa_1234_ref_1211 it will generate /// Generates hscan field pattern. Suppose the field is pa_1234_ref_1211 it will generate
/// pa_1234_ref_* /// pa_1234_ref_*
@ -8,3 +10,26 @@ pub fn generate_hscan_pattern_for_refund(sk: &str) -> String {
.collect::<Vec<&str>>() .collect::<Vec<&str>>()
.join("_") .join("_")
} }
// The first argument should be a future while the second argument should be a closure that returns a future for a database call
pub async fn try_redis_get_else_try_database_get<F, RFut, DFut, T>(
redis_fut: RFut,
database_call_closure: F,
) -> errors::CustomResult<T, errors::StorageError>
where
F: FnOnce() -> DFut,
RFut: futures::Future<Output = errors::CustomResult<T, redis_interface::errors::RedisError>>,
DFut: futures::Future<Output = errors::CustomResult<T, errors::StorageError>>,
{
let redis_output = redis_fut.await;
match redis_output {
Ok(output) => Ok(output),
Err(redis_error) => match redis_error.current_context() {
redis_interface::errors::RedisError::NotFound => {
metrics::KV_MISS.add(&metrics::CONTEXT, 1, &[]);
database_call_closure().await
}
_ => Err(redis_error.change_context(errors::StorageError::KVError)),
},
}
}