mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-27 19:46:48 +08:00
refactor(redis_interface): separating redis functionality and dependent functionalities outside router crate (#15)
Co-authored-by: Sanchith Hegde
This commit is contained in:
@ -1,6 +1,7 @@
|
||||
use std::path::PathBuf;
|
||||
|
||||
use config::{Environment, File, FileFormat};
|
||||
use redis_interface::RedisSettings;
|
||||
pub use router_env::config::{Log, LogConsole, LogFile, LogTelemetry};
|
||||
use serde::Deserialize;
|
||||
use structopt::StructOpt;
|
||||
@ -27,7 +28,7 @@ pub struct Settings {
|
||||
pub master_database: Database,
|
||||
#[cfg(feature = "olap")]
|
||||
pub replica_database: Database,
|
||||
pub redis: Redis,
|
||||
pub redis: RedisSettings,
|
||||
pub log: Log,
|
||||
pub keys: Keys,
|
||||
pub locker: Locker,
|
||||
@ -76,22 +77,6 @@ pub struct Database {
|
||||
pub pool_size: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct Redis {
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
pub cluster_urls: Vec<String>,
|
||||
pub cluster_enabled: bool,
|
||||
pub use_legacy_version: bool,
|
||||
pub pool_size: usize,
|
||||
pub reconnect_max_attempts: u32,
|
||||
/// Reconnect delay in milliseconds
|
||||
pub reconnect_delay: u32,
|
||||
/// TTL in seconds
|
||||
pub default_ttl: u32,
|
||||
pub stream_read_count: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct Connectors {
|
||||
pub aci: ConnectorParams,
|
||||
|
||||
@ -6,7 +6,7 @@ use crate::configs::settings::{Database, Settings};
|
||||
|
||||
pub type PgPool = bb8::Pool<async_bb8_diesel::ConnectionManager<PgConnection>>;
|
||||
pub type PgPooledConn = async_bb8_diesel::Connection<PgConnection>;
|
||||
pub type RedisPool = std::sync::Arc<crate::services::redis::RedisConnectionPool>;
|
||||
pub type RedisPool = std::sync::Arc<redis_interface::RedisConnectionPool>;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TestTransaction;
|
||||
@ -25,8 +25,8 @@ impl CustomizeConnection<PgPooledConn, ConnectionError> for TestTransaction {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn redis_connection(conf: &Settings) -> crate::services::redis::RedisConnectionPool {
|
||||
crate::services::redis::RedisConnectionPool::new(&conf.redis).await
|
||||
pub async fn redis_connection(conf: &Settings) -> redis_interface::RedisConnectionPool {
|
||||
redis_interface::RedisConnectionPool::new(&conf.redis).await
|
||||
}
|
||||
|
||||
#[allow(clippy::expect_used)]
|
||||
|
||||
@ -5,15 +5,15 @@ pub(crate) mod utils;
|
||||
use std::fmt::Display;
|
||||
|
||||
use actix_web::{body::BoxBody, http::StatusCode, HttpResponse, ResponseError};
|
||||
pub use common_utils::errors::{CustomResult, ParsingError};
|
||||
use config::ConfigError;
|
||||
use error_stack;
|
||||
pub use redis_interface::errors::RedisError;
|
||||
use router_env::opentelemetry::metrics::MetricsError;
|
||||
|
||||
pub use self::api_error_response::ApiErrorResponse;
|
||||
pub(crate) use self::utils::{ApiClientErrorExt, ConnectorErrorExt, StorageErrorExt};
|
||||
use crate::services;
|
||||
|
||||
pub type CustomResult<T, E> = error_stack::Result<T, E>;
|
||||
pub type RouterResult<T> = CustomResult<T, ApiErrorResponse>;
|
||||
pub type RouterResponse<T> = CustomResult<services::BachResponse<T>, ApiErrorResponse>;
|
||||
|
||||
@ -97,7 +97,6 @@ pub enum DatabaseError {
|
||||
impl_error_type!(AuthenticationError, "Authentication error");
|
||||
impl_error_type!(AuthorisationError, "Authorisation error");
|
||||
impl_error_type!(EncryptionError, "Encryption error");
|
||||
impl_error_type!(ParsingError, "Parsing error");
|
||||
impl_error_type!(UnexpectedError, "Unexpected error");
|
||||
impl_error_type!(ValidateError, "validation failed");
|
||||
|
||||
@ -418,42 +417,6 @@ error_to_process_tracker_error!(
|
||||
ProcessTrackerError::EValidationError(error_stack::Report<ValidationError>)
|
||||
);
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum RedisError {
|
||||
#[error("Failed to set key value in Redis")]
|
||||
SetFailed,
|
||||
#[error("Failed to set key value with expiry in Redis")]
|
||||
SetExFailed,
|
||||
#[error("Failed to set expiry for key value in Redis")]
|
||||
SetExpiryFailed,
|
||||
#[error("Failed to get key value in Redis")]
|
||||
GetFailed,
|
||||
#[error("Failed to delete key value in Redis")]
|
||||
DeleteFailed,
|
||||
#[error("Failed to append entry to redis stream")]
|
||||
StreamAppendFailed,
|
||||
#[error("Failed to read entries from redis stream")]
|
||||
StreamReadFailed,
|
||||
#[error("Failed to delete entries from redis stream")]
|
||||
StreamDeleteFailed,
|
||||
#[error("Failed to acknowledge redis stream entry")]
|
||||
StreamAcknowledgeFailed,
|
||||
#[error("Failed to create redis consumer group")]
|
||||
ConsumerGroupCreateFailed,
|
||||
#[error("Failed to destroy redis consumer group")]
|
||||
ConsumerGroupDestroyFailed,
|
||||
#[error("Failed to delete consumer from consumer group")]
|
||||
ConsumerGroupRemoveConsumerFailed,
|
||||
#[error("Failed to set last ID on consumer group")]
|
||||
ConsumerGroupSetIdFailed,
|
||||
#[error("Failed to set redis stream message owner")]
|
||||
ConsumerGroupClaimFailed,
|
||||
#[error("Failed to serialize application type to json")]
|
||||
JsonSerializationFailed,
|
||||
#[error("Failed to deserialize application type from json")]
|
||||
JsonDeserializationFailed,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ValidationError {
|
||||
#[error("Missing required field: {field_name}")]
|
||||
|
||||
@ -148,12 +148,13 @@ mod storage {
|
||||
mod storage {
|
||||
use error_stack::{IntoReport, ResultExt};
|
||||
use fred::prelude::*;
|
||||
use redis_interface::RedisEntryId;
|
||||
|
||||
use super::IPaymentAttempt;
|
||||
use crate::{
|
||||
connection::pg_connection,
|
||||
core::errors::{self, CustomResult},
|
||||
services::{redis::RedisEntryId, Store},
|
||||
services::Store,
|
||||
types::storage::{enums, payment_attempt::*},
|
||||
utils::{date_time, storage_partitioning::KvStorePartition},
|
||||
};
|
||||
|
||||
@ -36,12 +36,13 @@ pub trait IPaymentIntent {
|
||||
mod storage {
|
||||
use error_stack::{IntoReport, ResultExt};
|
||||
use fred::prelude::{RedisErrorKind, *};
|
||||
use redis_interface::RedisEntryId;
|
||||
|
||||
use super::IPaymentIntent;
|
||||
use crate::{
|
||||
connection::pg_connection,
|
||||
core::errors::{self, CustomResult},
|
||||
services::{redis::RedisEntryId, Store},
|
||||
services::Store,
|
||||
types::{api, storage::payment_intent::*},
|
||||
utils::{date_time, storage_partitioning::KvStorePartition},
|
||||
};
|
||||
|
||||
@ -22,7 +22,6 @@ use crate::{
|
||||
logger::{error, info},
|
||||
routes::AppState,
|
||||
scheduler::utils as pt_utils,
|
||||
services::redis::*,
|
||||
types::storage::{self, enums},
|
||||
utils::date_time,
|
||||
};
|
||||
@ -93,7 +92,11 @@ pub async fn consumer_operations(
|
||||
.store
|
||||
.redis_conn
|
||||
.clone()
|
||||
.consumer_group_create(&stream_name, &group_name, &RedisEntryId::AfterLastID)
|
||||
.consumer_group_create(
|
||||
&stream_name,
|
||||
&group_name,
|
||||
&redis_interface::RedisEntryId::AfterLastID,
|
||||
)
|
||||
.await;
|
||||
if group_created.is_err() {
|
||||
info!("Consumer group already exists");
|
||||
@ -132,7 +135,7 @@ pub async fn consumer_operations(
|
||||
#[instrument(skip(db, redis_conn))]
|
||||
pub async fn fetch_consumer_tasks(
|
||||
db: &dyn Db,
|
||||
redis_conn: &RedisConnectionPool,
|
||||
redis_conn: &redis_interface::RedisConnectionPool,
|
||||
stream_name: &str,
|
||||
group_name: &str,
|
||||
consumer_name: &str,
|
||||
|
||||
@ -15,7 +15,6 @@ use crate::{
|
||||
logger,
|
||||
routes::AppState,
|
||||
scheduler::{ProcessTrackerBatch, SchedulerFlow},
|
||||
services::redis::*,
|
||||
types::storage::{self, enums::ProcessTrackerStatus},
|
||||
utils::{self, date_time, OptionExt, StringExt},
|
||||
};
|
||||
@ -30,7 +29,7 @@ pub async fn acquire_pt_lock(
|
||||
let conn = state.store.redis_conn.clone();
|
||||
let is_lock_acquired = conn.set_key_if_not_exist(lock_key, lock_val).await;
|
||||
match is_lock_acquired {
|
||||
Ok(SetNXReply::KeySet) => match conn.set_expiry(lock_key, ttl).await {
|
||||
Ok(redis_interface::SetNXReply::KeySet) => match conn.set_expiry(lock_key, ttl).await {
|
||||
Ok(()) => true,
|
||||
|
||||
#[allow(unused_must_use)]
|
||||
@ -40,7 +39,7 @@ pub async fn acquire_pt_lock(
|
||||
false
|
||||
}
|
||||
},
|
||||
Ok(SetNXReply::KeyNotSet) => {
|
||||
Ok(redis_interface::SetNXReply::KeyNotSet) => {
|
||||
logger::error!(%tag, "Lock not acquired, previous fetch still in progress");
|
||||
false
|
||||
}
|
||||
@ -51,7 +50,11 @@ pub async fn acquire_pt_lock(
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn release_pt_lock(redis_conn: &RedisConnectionPool, tag: &str, lock_key: &str) -> bool {
|
||||
pub async fn release_pt_lock(
|
||||
redis_conn: &redis_interface::RedisConnectionPool,
|
||||
tag: &str,
|
||||
lock_key: &str,
|
||||
) -> bool {
|
||||
let is_lock_released = redis_conn.delete_key(lock_key).await;
|
||||
match is_lock_released {
|
||||
Ok(()) => true,
|
||||
@ -143,7 +146,7 @@ pub async fn update_status_and_append(
|
||||
redis_conn
|
||||
.stream_append_entry(
|
||||
&pt_batch.stream_name,
|
||||
&RedisEntryId::AutoGeneratedID,
|
||||
&redis_interface::RedisEntryId::AutoGeneratedID,
|
||||
field_value_pairs,
|
||||
)
|
||||
.await
|
||||
@ -186,7 +189,7 @@ pub fn divide_into_batches(
|
||||
}
|
||||
|
||||
pub async fn get_batches(
|
||||
conn: &RedisConnectionPool,
|
||||
conn: &redis_interface::RedisConnectionPool,
|
||||
stream_name: &str,
|
||||
group_name: &str,
|
||||
consumer_name: &str,
|
||||
@ -194,7 +197,7 @@ pub async fn get_batches(
|
||||
let response = conn
|
||||
.stream_read_with_options(
|
||||
stream_name,
|
||||
RedisEntryId::UndeliveredEntryID,
|
||||
redis_interface::RedisEntryId::UndeliveredEntryID,
|
||||
// Update logic for collecting to Vec and flattening, if count > 1 is provided
|
||||
Some(1),
|
||||
None,
|
||||
|
||||
@ -1,5 +1,8 @@
|
||||
use std::sync;
|
||||
|
||||
use redis_interface as redis;
|
||||
use redis_interface::errors as redis_errors;
|
||||
|
||||
use super::{PaymentsSyncWorkflow, ProcessTrackerWorkflow};
|
||||
use crate::{
|
||||
core::payments::{self as payment_flows, operations},
|
||||
@ -7,7 +10,6 @@ use crate::{
|
||||
errors,
|
||||
routes::AppState,
|
||||
scheduler::{consumer, process_data, utils as pt_utils},
|
||||
services::redis,
|
||||
types::{
|
||||
api,
|
||||
storage::{self, enums},
|
||||
@ -93,10 +95,12 @@ pub async fn get_sync_process_schedule_time(
|
||||
redis: sync::Arc<redis::RedisConnectionPool>,
|
||||
retry_count: i32,
|
||||
) -> Result<Option<time::PrimitiveDateTime>, errors::ProcessTrackerError> {
|
||||
let redis_mapping: errors::CustomResult<process_data::ConnectorPTMapping, errors::RedisError> =
|
||||
redis
|
||||
.get_and_deserialize_key(&format!("pt_mapping_{}", connector), "ConnectorPTMapping")
|
||||
.await;
|
||||
let redis_mapping: errors::CustomResult<
|
||||
process_data::ConnectorPTMapping,
|
||||
redis_errors::RedisError,
|
||||
> = redis
|
||||
.get_and_deserialize_key(&format!("pt_mapping_{}", connector), "ConnectorPTMapping")
|
||||
.await;
|
||||
let mapping = match redis_mapping {
|
||||
Ok(x) => x,
|
||||
Err(_) => process_data::ConnectorPTMapping::default(),
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
pub mod api;
|
||||
pub mod encryption;
|
||||
pub mod logger;
|
||||
pub mod redis;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
@ -12,7 +11,7 @@ pub struct Store {
|
||||
pub master_pool: crate::db::SqlDb,
|
||||
#[cfg(feature = "olap")]
|
||||
pub replica_pool: crate::db::SqlDb,
|
||||
pub redis_conn: Arc<crate::services::redis::RedisConnectionPool>,
|
||||
pub redis_conn: Arc<redis_interface::RedisConnectionPool>,
|
||||
#[cfg(feature = "kv_store")]
|
||||
pub(crate) config: StoreConfig,
|
||||
}
|
||||
|
||||
@ -1,89 +0,0 @@
|
||||
pub mod commands;
|
||||
pub mod types;
|
||||
|
||||
pub use self::{commands::*, types::*};
|
||||
use crate::logger;
|
||||
|
||||
pub struct RedisConnectionPool {
|
||||
pub pool: fred::pool::RedisPool,
|
||||
config: RedisConfig,
|
||||
_join_handles: Vec<fred::types::ConnectHandle>,
|
||||
}
|
||||
|
||||
impl RedisConnectionPool {
|
||||
/// Create a new Redis connection
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if a connection to Redis is not successful.
|
||||
#[allow(clippy::expect_used)]
|
||||
pub(crate) async fn new(conf: &crate::configs::settings::Redis) -> Self {
|
||||
let redis_connection_url = match conf.cluster_enabled {
|
||||
// Fred relies on this format for specifying cluster where the host port is ignored & only query parameters are used for node addresses
|
||||
// redis-cluster://username:password@host:port?node=bar.com:30002&node=baz.com:30003
|
||||
true => format!(
|
||||
"redis-cluster://{}:{}?{}",
|
||||
conf.host,
|
||||
conf.port,
|
||||
conf.cluster_urls
|
||||
.iter()
|
||||
.flat_map(|url| vec!["&", url])
|
||||
.skip(1)
|
||||
.collect::<String>()
|
||||
),
|
||||
false => format!(
|
||||
"redis://{}:{}", //URI Schema
|
||||
conf.host, conf.port,
|
||||
),
|
||||
};
|
||||
let mut config = fred::types::RedisConfig::from_url(&redis_connection_url)
|
||||
.expect("Invalid Redis connection URL");
|
||||
if !conf.use_legacy_version {
|
||||
config.version = fred::types::RespVersion::RESP3;
|
||||
}
|
||||
config.tracing = true;
|
||||
let policy = fred::types::ReconnectPolicy::new_constant(
|
||||
conf.reconnect_max_attempts,
|
||||
conf.reconnect_delay,
|
||||
);
|
||||
let pool = fred::pool::RedisPool::new(config, conf.pool_size)
|
||||
.expect("Unable to construct Redis pool");
|
||||
|
||||
let _join_handles = pool.connect(Some(policy));
|
||||
pool.wait_for_connect()
|
||||
.await
|
||||
.expect("Error connecting to Redis");
|
||||
let config = RedisConfig::from(conf);
|
||||
|
||||
Self {
|
||||
pool,
|
||||
config,
|
||||
_join_handles,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn close_connections(&mut self) {
|
||||
self.pool.quit_pool().await;
|
||||
for handle in self._join_handles.drain(..) {
|
||||
match handle.await {
|
||||
Ok(Ok(_)) => (),
|
||||
Ok(Err(error)) => logger::error!(%error),
|
||||
Err(error) => logger::error!(%error),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct RedisConfig {
|
||||
default_ttl: u32,
|
||||
default_stream_read_count: u64,
|
||||
}
|
||||
|
||||
impl From<&crate::configs::settings::Redis> for RedisConfig {
|
||||
fn from(config: &crate::configs::settings::Redis) -> Self {
|
||||
Self {
|
||||
default_ttl: config.default_ttl,
|
||||
default_stream_read_count: config.stream_read_count,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,333 +0,0 @@
|
||||
use std::fmt::Debug;
|
||||
|
||||
use error_stack::{IntoReport, ResultExt};
|
||||
use fred::{
|
||||
interfaces::{KeysInterface, StreamsInterface},
|
||||
types::{
|
||||
Expiration, FromRedis, MultipleIDs, MultipleKeys, MultipleOrderedPairs, MultipleStrings,
|
||||
RedisValue, SetOptions, XReadResponse,
|
||||
},
|
||||
};
|
||||
use router_env::{tracing, tracing::instrument};
|
||||
|
||||
use crate::{
|
||||
core::errors::{self, CustomResult},
|
||||
services::redis::types::{RedisEntryId, SetNXReply},
|
||||
utils::{ByteSliceExt, Encode},
|
||||
};
|
||||
|
||||
impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn set_key<V>(&self, key: &str, value: V) -> CustomResult<(), errors::RedisError>
|
||||
where
|
||||
V: TryInto<RedisValue> + Debug,
|
||||
V::Error: Into<fred::error::RedisError>,
|
||||
{
|
||||
self.pool
|
||||
.set(
|
||||
key,
|
||||
value,
|
||||
Some(Expiration::EX(self.config.default_ttl.into())),
|
||||
None,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.into_report()
|
||||
.change_context(errors::RedisError::SetFailed)
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn serialize_and_set_key<V>(
|
||||
&self,
|
||||
key: &str,
|
||||
value: V,
|
||||
) -> CustomResult<(), errors::RedisError>
|
||||
where
|
||||
V: serde::Serialize + Debug,
|
||||
{
|
||||
let serialized = Encode::<V>::encode_to_vec(&value)
|
||||
.change_context(errors::RedisError::JsonSerializationFailed)?;
|
||||
|
||||
self.set_key(key, &serialized as &[u8]).await
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn get_key<V>(&self, key: &str) -> CustomResult<V, errors::RedisError>
|
||||
where
|
||||
V: FromRedis + Unpin + Send + 'static,
|
||||
{
|
||||
self.pool
|
||||
.get(key)
|
||||
.await
|
||||
.into_report()
|
||||
.change_context(errors::RedisError::GetFailed)
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn get_and_deserialize_key<T>(
|
||||
&self,
|
||||
key: &str,
|
||||
type_name: &str,
|
||||
) -> CustomResult<T, errors::RedisError>
|
||||
where
|
||||
T: serde::de::DeserializeOwned,
|
||||
{
|
||||
let value_bytes = self.get_key::<Vec<u8>>(key).await?;
|
||||
|
||||
value_bytes
|
||||
.parse_struct(type_name)
|
||||
.change_context(errors::RedisError::JsonDeserializationFailed)
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn delete_key(&self, key: &str) -> CustomResult<(), errors::RedisError> {
|
||||
self.pool
|
||||
.del(key)
|
||||
.await
|
||||
.into_report()
|
||||
.change_context(errors::RedisError::DeleteFailed)
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn set_key_with_expiry<V>(
|
||||
&self,
|
||||
key: &str,
|
||||
value: V,
|
||||
seconds: i64,
|
||||
) -> CustomResult<(), errors::RedisError>
|
||||
where
|
||||
V: TryInto<RedisValue> + Debug,
|
||||
V::Error: Into<fred::error::RedisError>,
|
||||
{
|
||||
self.pool
|
||||
.set(key, value, Some(Expiration::EX(seconds)), None, false)
|
||||
.await
|
||||
.into_report()
|
||||
.change_context(errors::RedisError::SetExFailed)
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn set_key_if_not_exist<V>(
|
||||
&self,
|
||||
key: &str,
|
||||
value: V,
|
||||
) -> CustomResult<SetNXReply, errors::RedisError>
|
||||
where
|
||||
V: TryInto<RedisValue> + Debug,
|
||||
V::Error: Into<fred::error::RedisError>,
|
||||
{
|
||||
self.pool
|
||||
.set(
|
||||
key,
|
||||
value,
|
||||
Some(Expiration::EX(self.config.default_ttl.into())),
|
||||
Some(SetOptions::NX),
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.into_report()
|
||||
.change_context(errors::RedisError::SetFailed)
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn set_expiry(
|
||||
&self,
|
||||
key: &str,
|
||||
seconds: i64,
|
||||
) -> CustomResult<(), errors::RedisError> {
|
||||
self.pool
|
||||
.expire(key, seconds)
|
||||
.await
|
||||
.into_report()
|
||||
.change_context(errors::RedisError::SetExpiryFailed)
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn stream_append_entry<F>(
|
||||
&self,
|
||||
stream: &str,
|
||||
entry_id: &RedisEntryId,
|
||||
fields: F,
|
||||
) -> CustomResult<(), errors::RedisError>
|
||||
where
|
||||
F: TryInto<MultipleOrderedPairs> + Debug,
|
||||
F::Error: Into<fred::error::RedisError>,
|
||||
{
|
||||
self.pool
|
||||
.xadd(stream, false, None, entry_id, fields)
|
||||
.await
|
||||
.into_report()
|
||||
.change_context(errors::RedisError::StreamAppendFailed)
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn stream_delete_entries<Ids>(
|
||||
&self,
|
||||
stream: &str,
|
||||
ids: Ids,
|
||||
) -> CustomResult<usize, errors::RedisError>
|
||||
where
|
||||
Ids: Into<MultipleStrings> + Debug,
|
||||
{
|
||||
self.pool
|
||||
.xdel(stream, ids)
|
||||
.await
|
||||
.into_report()
|
||||
.change_context(errors::RedisError::StreamDeleteFailed)
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn stream_acknowledge_entries<Ids>(
|
||||
&self,
|
||||
stream: &str,
|
||||
group: &str,
|
||||
ids: Ids,
|
||||
) -> CustomResult<usize, errors::RedisError>
|
||||
where
|
||||
Ids: Into<MultipleIDs> + Debug,
|
||||
{
|
||||
self.pool
|
||||
.xack(stream, group, ids)
|
||||
.await
|
||||
.into_report()
|
||||
.change_context(errors::RedisError::StreamAcknowledgeFailed)
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn stream_read_entries<K, Ids>(
|
||||
&self,
|
||||
streams: K,
|
||||
ids: Ids,
|
||||
) -> CustomResult<XReadResponse<String, String, String, String>, errors::RedisError>
|
||||
where
|
||||
K: Into<MultipleKeys> + Debug,
|
||||
Ids: Into<MultipleIDs> + Debug,
|
||||
{
|
||||
self.pool
|
||||
.xread(
|
||||
Some(self.config.default_stream_read_count),
|
||||
None,
|
||||
streams,
|
||||
ids,
|
||||
)
|
||||
.await
|
||||
.into_report()
|
||||
.change_context(errors::RedisError::StreamReadFailed)
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn stream_read_with_options<K, Ids>(
|
||||
&self,
|
||||
streams: K,
|
||||
ids: Ids,
|
||||
count: Option<u64>,
|
||||
block: Option<u64>, // timeout in milliseconds
|
||||
group: Option<(&str, &str)>, // (group_name, consumer_name)
|
||||
) -> CustomResult<XReadResponse<String, String, String, Option<String>>, errors::RedisError>
|
||||
where
|
||||
K: Into<MultipleKeys> + Debug,
|
||||
Ids: Into<MultipleIDs> + Debug,
|
||||
{
|
||||
match group {
|
||||
Some((group_name, consumer_name)) => {
|
||||
self.pool
|
||||
.xreadgroup_map(group_name, consumer_name, count, block, false, streams, ids)
|
||||
.await
|
||||
}
|
||||
None => self.pool.xread_map(count, block, streams, ids).await,
|
||||
}
|
||||
.into_report()
|
||||
.change_context(errors::RedisError::StreamReadFailed)
|
||||
}
|
||||
|
||||
// Consumer Group API
|
||||
|
||||
//TODO: Handle RedisEntryId Enum cases which are not valid
|
||||
//implement xgroup_create_mkstream if needed
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn consumer_group_create(
|
||||
&self,
|
||||
stream: &str,
|
||||
group: &str,
|
||||
id: &RedisEntryId,
|
||||
) -> CustomResult<(), errors::RedisError> {
|
||||
self.pool
|
||||
.xgroup_create(stream, group, id, true)
|
||||
.await
|
||||
.into_report()
|
||||
.change_context(errors::RedisError::ConsumerGroupCreateFailed)
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn consumer_group_destroy(
|
||||
&self,
|
||||
stream: &str,
|
||||
group: &str,
|
||||
) -> CustomResult<usize, errors::RedisError> {
|
||||
self.pool
|
||||
.xgroup_destroy(stream, group)
|
||||
.await
|
||||
.into_report()
|
||||
.change_context(errors::RedisError::ConsumerGroupDestroyFailed)
|
||||
}
|
||||
|
||||
// the number of pending messages that the consumer had before it was deleted
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn consumer_group_delete_consumer(
|
||||
&self,
|
||||
stream: &str,
|
||||
group: &str,
|
||||
consumer: &str,
|
||||
) -> CustomResult<usize, errors::RedisError> {
|
||||
self.pool
|
||||
.xgroup_delconsumer(stream, group, consumer)
|
||||
.await
|
||||
.into_report()
|
||||
.change_context(errors::RedisError::ConsumerGroupRemoveConsumerFailed)
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn consumer_group_set_last_id(
|
||||
&self,
|
||||
stream: &str,
|
||||
group: &str,
|
||||
id: &RedisEntryId,
|
||||
) -> CustomResult<String, errors::RedisError> {
|
||||
self.pool
|
||||
.xgroup_setid(stream, group, id)
|
||||
.await
|
||||
.into_report()
|
||||
.change_context(errors::RedisError::ConsumerGroupSetIdFailed)
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn consumer_group_set_message_owner<Ids, R>(
|
||||
&self,
|
||||
stream: &str,
|
||||
group: &str,
|
||||
consumer: &str,
|
||||
min_idle_time: u64,
|
||||
ids: Ids,
|
||||
) -> CustomResult<R, errors::RedisError>
|
||||
where
|
||||
Ids: Into<MultipleIDs> + Debug,
|
||||
R: FromRedis + Unpin + Send + 'static,
|
||||
{
|
||||
self.pool
|
||||
.xclaim(
|
||||
stream,
|
||||
group,
|
||||
consumer,
|
||||
min_idle_time,
|
||||
ids,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.into_report()
|
||||
.change_context(errors::RedisError::ConsumerGroupClaimFailed)
|
||||
}
|
||||
}
|
||||
@ -1,69 +0,0 @@
|
||||
#[derive(Debug)]
|
||||
pub enum RedisEntryId {
|
||||
UserSpecifiedID {
|
||||
milliseconds: String,
|
||||
sequence_number: String,
|
||||
},
|
||||
AutoGeneratedID,
|
||||
AfterLastID,
|
||||
/// Applicable only with consumer groups
|
||||
UndeliveredEntryID,
|
||||
}
|
||||
|
||||
impl From<RedisEntryId> for fred::types::XID {
|
||||
fn from(id: RedisEntryId) -> Self {
|
||||
use fred::types::XID;
|
||||
|
||||
match id {
|
||||
RedisEntryId::UserSpecifiedID {
|
||||
milliseconds,
|
||||
sequence_number,
|
||||
} => XID::Manual(fred::bytes_utils::format_bytes!(
|
||||
"{milliseconds}-{sequence_number}"
|
||||
)),
|
||||
RedisEntryId::AutoGeneratedID => XID::Auto,
|
||||
RedisEntryId::AfterLastID => XID::Max,
|
||||
RedisEntryId::UndeliveredEntryID => XID::NewInGroup,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&RedisEntryId> for fred::types::XID {
|
||||
fn from(id: &RedisEntryId) -> Self {
|
||||
use fred::types::XID;
|
||||
|
||||
match id {
|
||||
RedisEntryId::UserSpecifiedID {
|
||||
milliseconds,
|
||||
sequence_number,
|
||||
} => XID::Manual(fred::bytes_utils::format_bytes!(
|
||||
"{milliseconds}-{sequence_number}"
|
||||
)),
|
||||
RedisEntryId::AutoGeneratedID => XID::Auto,
|
||||
RedisEntryId::AfterLastID => XID::Max,
|
||||
RedisEntryId::UndeliveredEntryID => XID::NewInGroup,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq)]
|
||||
pub enum SetNXReply {
|
||||
KeySet,
|
||||
KeyNotSet, // Existing key
|
||||
}
|
||||
|
||||
impl fred::types::FromRedis for SetNXReply {
|
||||
fn from_value(value: fred::types::RedisValue) -> Result<Self, fred::error::RedisError> {
|
||||
match value {
|
||||
// Returns String ( "OK" ) in case of success
|
||||
fred::types::RedisValue::String(_) => Ok(SetNXReply::KeySet),
|
||||
// Return Null in case of failure
|
||||
fred::types::RedisValue::Null => Ok(SetNXReply::KeyNotSet),
|
||||
// Unexpected behaviour
|
||||
_ => Err(fred::error::RedisError::new(
|
||||
fred::error::RedisErrorKind::Unknown,
|
||||
"Unexpected SETNX command reply",
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,12 +1,11 @@
|
||||
pub use common_utils::ext_traits::{ByteSliceExt, BytesExt, Encode, StringExt, ValueExt};
|
||||
use error_stack::{report, IntoReport, Report, ResultExt};
|
||||
use once_cell::sync::Lazy;
|
||||
use regex::Regex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
core::errors::{self, ApiErrorResponse, CustomResult, RouterResult, ValidateError},
|
||||
logger,
|
||||
pii::{ExposeInterface, Secret, Strategy},
|
||||
types::api::AddressDetails,
|
||||
utils::when,
|
||||
};
|
||||
@ -89,217 +88,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait StringExt<T> {
|
||||
fn parse_enum(self, enum_name: &str) -> CustomResult<T, errors::ParsingError>
|
||||
where
|
||||
T: std::str::FromStr,
|
||||
// Requirement for converting the `Err` variant of `FromStr` to `Report<Err>`
|
||||
<T as std::str::FromStr>::Err: std::error::Error + Send + Sync + 'static;
|
||||
|
||||
fn parse_struct<'de>(&'de self, type_name: &str) -> CustomResult<T, errors::ParsingError>
|
||||
where
|
||||
T: Deserialize<'de>;
|
||||
}
|
||||
|
||||
impl<T> StringExt<T> for String {
|
||||
fn parse_enum(self, enum_name: &str) -> CustomResult<T, errors::ParsingError>
|
||||
where
|
||||
T: std::str::FromStr,
|
||||
<T as std::str::FromStr>::Err: std::error::Error + Send + Sync + 'static,
|
||||
{
|
||||
T::from_str(&self)
|
||||
.into_report()
|
||||
.change_context(errors::ParsingError)
|
||||
.attach_printable_lazy(|| format!("Invalid enum variant {self:?} for enum {enum_name}"))
|
||||
}
|
||||
|
||||
fn parse_struct<'de>(&'de self, type_name: &str) -> CustomResult<T, errors::ParsingError>
|
||||
where
|
||||
T: Deserialize<'de>,
|
||||
{
|
||||
serde_json::from_str::<T>(self)
|
||||
.into_report()
|
||||
.change_context(errors::ParsingError)
|
||||
.attach_printable_lazy(|| format!("Unable to parse {type_name} from string"))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait BytesExt<T> {
|
||||
fn parse_struct<'de>(&'de self, type_name: &str) -> CustomResult<T, errors::ParsingError>
|
||||
where
|
||||
T: Deserialize<'de>;
|
||||
}
|
||||
|
||||
impl<T> BytesExt<T> for bytes::Bytes {
|
||||
fn parse_struct<'de>(&'de self, type_name: &str) -> CustomResult<T, errors::ParsingError>
|
||||
where
|
||||
T: Deserialize<'de>,
|
||||
{
|
||||
use bytes::Buf;
|
||||
|
||||
serde_json::from_slice::<T>(self.chunk())
|
||||
.into_report()
|
||||
.change_context(errors::ParsingError)
|
||||
.attach_printable_lazy(|| format!("Unable to parse {type_name} from bytes"))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait ByteSliceExt<T> {
|
||||
fn parse_struct<'de>(&'de self, type_name: &str) -> CustomResult<T, errors::ParsingError>
|
||||
where
|
||||
T: Deserialize<'de>;
|
||||
}
|
||||
|
||||
impl<T> ByteSliceExt<T> for [u8] {
|
||||
fn parse_struct<'de>(&'de self, type_name: &str) -> CustomResult<T, errors::ParsingError>
|
||||
where
|
||||
T: Deserialize<'de>,
|
||||
{
|
||||
serde_json::from_slice(self)
|
||||
.into_report()
|
||||
.change_context(errors::ParsingError)
|
||||
.attach_printable_lazy(|| format!("Unable to parse {type_name} from &[u8]"))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait ValueExt<T> {
|
||||
fn parse_value(self, type_name: &str) -> CustomResult<T, errors::ParsingError>
|
||||
where
|
||||
T: serde::de::DeserializeOwned;
|
||||
}
|
||||
|
||||
impl<T> ValueExt<T> for serde_json::Value {
|
||||
fn parse_value(self, type_name: &str) -> CustomResult<T, errors::ParsingError>
|
||||
where
|
||||
T: serde::de::DeserializeOwned,
|
||||
{
|
||||
let debug = format!(
|
||||
"Unable to parse {type_name} from serde_json::Value: {:?}",
|
||||
&self
|
||||
);
|
||||
serde_json::from_value::<T>(self)
|
||||
.into_report()
|
||||
.change_context(errors::ParsingError)
|
||||
.attach_printable_lazy(|| debug)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, MaskingStrategy> ValueExt<T> for Secret<serde_json::Value, MaskingStrategy>
|
||||
where
|
||||
MaskingStrategy: Strategy<serde_json::Value>,
|
||||
{
|
||||
fn parse_value(self, type_name: &str) -> CustomResult<T, errors::ParsingError>
|
||||
where
|
||||
T: serde::de::DeserializeOwned,
|
||||
{
|
||||
self.expose().parse_value(type_name)
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Encode<'e, P>
|
||||
where
|
||||
Self: 'e + std::fmt::Debug,
|
||||
{
|
||||
// If needed get type information/custom error implementation.
|
||||
fn convert_and_encode(&'e self) -> CustomResult<String, errors::ParsingError>
|
||||
where
|
||||
P: TryFrom<&'e Self> + Serialize,
|
||||
Result<P, <P as TryFrom<&'e Self>>::Error>: error_stack::ResultExt,
|
||||
<Result<P, <P as TryFrom<&'e Self>>::Error> as ResultExt>::Ok: Serialize;
|
||||
|
||||
fn convert_and_url_encode(&'e self) -> CustomResult<String, errors::ParsingError>
|
||||
where
|
||||
P: TryFrom<&'e Self> + Serialize,
|
||||
Result<P, <P as TryFrom<&'e Self>>::Error>: error_stack::ResultExt,
|
||||
<Result<P, <P as TryFrom<&'e Self>>::Error> as ResultExt>::Ok: Serialize;
|
||||
|
||||
fn encode(&'e self) -> CustomResult<String, errors::ParsingError>
|
||||
where
|
||||
Self: Serialize;
|
||||
|
||||
fn encode_to_string_of_json(&'e self) -> CustomResult<String, errors::ParsingError>
|
||||
where
|
||||
Self: Serialize;
|
||||
|
||||
fn encode_to_value(&'e self) -> CustomResult<serde_json::Value, errors::ParsingError>
|
||||
where
|
||||
Self: Serialize;
|
||||
|
||||
fn encode_to_vec(&'e self) -> CustomResult<Vec<u8>, errors::ParsingError>
|
||||
where
|
||||
Self: Serialize;
|
||||
}
|
||||
|
||||
impl<'e, P, A> Encode<'e, P> for A
|
||||
where
|
||||
Self: 'e + std::fmt::Debug,
|
||||
{
|
||||
fn convert_and_encode(&'e self) -> CustomResult<String, errors::ParsingError>
|
||||
where
|
||||
P: TryFrom<&'e Self> + Serialize,
|
||||
Result<P, <P as TryFrom<&'e Self>>::Error>: error_stack::ResultExt,
|
||||
<Result<P, <P as TryFrom<&'e Self>>::Error> as ResultExt>::Ok: Serialize,
|
||||
{
|
||||
serde_json::to_string(&P::try_from(self).change_context(errors::ParsingError)?)
|
||||
.into_report()
|
||||
.change_context(errors::ParsingError)
|
||||
.attach_printable_lazy(|| format!("Unable to convert {:?} to a request", self))
|
||||
}
|
||||
|
||||
fn convert_and_url_encode(&'e self) -> CustomResult<String, errors::ParsingError>
|
||||
where
|
||||
P: TryFrom<&'e Self> + Serialize,
|
||||
Result<P, <P as TryFrom<&'e Self>>::Error>: error_stack::ResultExt,
|
||||
<Result<P, <P as TryFrom<&'e Self>>::Error> as ResultExt>::Ok: Serialize,
|
||||
{
|
||||
serde_urlencoded::to_string(&P::try_from(self).change_context(errors::ParsingError)?)
|
||||
.into_report()
|
||||
.change_context(errors::ParsingError)
|
||||
.attach_printable_lazy(|| format!("Unable to convert {:?} to a request", self))
|
||||
}
|
||||
|
||||
// Check without two functions can we combine this
|
||||
fn encode(&'e self) -> CustomResult<String, errors::ParsingError>
|
||||
where
|
||||
Self: Serialize,
|
||||
{
|
||||
serde_urlencoded::to_string(self)
|
||||
.into_report()
|
||||
.change_context(errors::ParsingError)
|
||||
.attach_printable_lazy(|| format!("Unable to convert {:?} to a request", self))
|
||||
}
|
||||
|
||||
fn encode_to_string_of_json(&'e self) -> CustomResult<String, errors::ParsingError>
|
||||
where
|
||||
Self: Serialize,
|
||||
{
|
||||
serde_json::to_string(self)
|
||||
.into_report()
|
||||
.change_context(errors::ParsingError)
|
||||
.attach_printable_lazy(|| format!("Unable to convert {:?} to a request", self))
|
||||
}
|
||||
|
||||
fn encode_to_value(&'e self) -> CustomResult<serde_json::Value, errors::ParsingError>
|
||||
where
|
||||
Self: Serialize,
|
||||
{
|
||||
serde_json::to_value(self)
|
||||
.into_report()
|
||||
.change_context(errors::ParsingError)
|
||||
.attach_printable_lazy(|| format!("Unable to convert {:?} to a value", self))
|
||||
}
|
||||
|
||||
fn encode_to_vec(&'e self) -> CustomResult<Vec<u8>, errors::ParsingError>
|
||||
where
|
||||
Self: Serialize,
|
||||
{
|
||||
serde_json::to_vec(self)
|
||||
.into_report()
|
||||
.change_context(errors::ParsingError)
|
||||
.attach_printable_lazy(|| format!("Unable to convert {:?} to a value", self))
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
/// Merge two `serde_json::Value` instances. Will need to be updated to handle merging arrays.
|
||||
pub(crate) fn merge_json_values(a: &mut serde_json::Value, b: &serde_json::Value) {
|
||||
|
||||
Reference in New Issue
Block a user