fix(redis): fix recreation on redis connection pool (#1063)

This commit is contained in:
Nishant Joshi
2023-05-08 14:31:49 +05:30
committed by GitHub
parent 3131bc84af
commit 982c27fce7
7 changed files with 31 additions and 13 deletions

10
Cargo.lock generated
View File

@ -2950,7 +2950,7 @@ dependencies = [
[[package]] [[package]]
name = "opentelemetry" name = "opentelemetry"
version = "0.18.0" version = "0.18.0"
source = "git+https://github.com/open-telemetry/opentelemetry-rust/?rev=44b90202fd744598db8b0ace5b8f0bad7ec45658#44b90202fd744598db8b0ace5b8f0bad7ec45658" source = "git+https://github.com/open-telemetry/opentelemetry-rust?rev=44b90202fd744598db8b0ace5b8f0bad7ec45658#44b90202fd744598db8b0ace5b8f0bad7ec45658"
dependencies = [ dependencies = [
"opentelemetry_api", "opentelemetry_api",
"opentelemetry_sdk", "opentelemetry_sdk",
@ -2959,7 +2959,7 @@ dependencies = [
[[package]] [[package]]
name = "opentelemetry-otlp" name = "opentelemetry-otlp"
version = "0.11.0" version = "0.11.0"
source = "git+https://github.com/open-telemetry/opentelemetry-rust/?rev=44b90202fd744598db8b0ace5b8f0bad7ec45658#44b90202fd744598db8b0ace5b8f0bad7ec45658" source = "git+https://github.com/open-telemetry/opentelemetry-rust?rev=44b90202fd744598db8b0ace5b8f0bad7ec45658#44b90202fd744598db8b0ace5b8f0bad7ec45658"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"futures", "futures",
@ -2976,7 +2976,7 @@ dependencies = [
[[package]] [[package]]
name = "opentelemetry-proto" name = "opentelemetry-proto"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/open-telemetry/opentelemetry-rust/?rev=44b90202fd744598db8b0ace5b8f0bad7ec45658#44b90202fd744598db8b0ace5b8f0bad7ec45658" source = "git+https://github.com/open-telemetry/opentelemetry-rust?rev=44b90202fd744598db8b0ace5b8f0bad7ec45658#44b90202fd744598db8b0ace5b8f0bad7ec45658"
dependencies = [ dependencies = [
"futures", "futures",
"futures-util", "futures-util",
@ -2988,7 +2988,7 @@ dependencies = [
[[package]] [[package]]
name = "opentelemetry_api" name = "opentelemetry_api"
version = "0.18.0" version = "0.18.0"
source = "git+https://github.com/open-telemetry/opentelemetry-rust/?rev=44b90202fd744598db8b0ace5b8f0bad7ec45658#44b90202fd744598db8b0ace5b8f0bad7ec45658" source = "git+https://github.com/open-telemetry/opentelemetry-rust?rev=44b90202fd744598db8b0ace5b8f0bad7ec45658#44b90202fd744598db8b0ace5b8f0bad7ec45658"
dependencies = [ dependencies = [
"fnv", "fnv",
"futures-channel", "futures-channel",
@ -3003,7 +3003,7 @@ dependencies = [
[[package]] [[package]]
name = "opentelemetry_sdk" name = "opentelemetry_sdk"
version = "0.18.0" version = "0.18.0"
source = "git+https://github.com/open-telemetry/opentelemetry-rust/?rev=44b90202fd744598db8b0ace5b8f0bad7ec45658#44b90202fd744598db8b0ace5b8f0bad7ec45658" source = "git+https://github.com/open-telemetry/opentelemetry-rust?rev=44b90202fd744598db8b0ace5b8f0bad7ec45658#44b90202fd744598db8b0ace5b8f0bad7ec45658"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"crossbeam-channel", "crossbeam-channel",

View File

@ -156,6 +156,7 @@ impl RedisConnectionPool {
} }
impl Drop for RedisConnectionPool { impl Drop for RedisConnectionPool {
// safety: panics when invoked without a current tokio runtime
fn drop(&mut self) { fn drop(&mut self) {
let rt = tokio::runtime::Handle::current(); let rt = tokio::runtime::Handle::current();
rt.block_on(self.close_connections()) rt.block_on(self.close_connections())

View File

@ -23,7 +23,6 @@ use storage_models::{enums as storage_enums, payment_method};
use crate::scheduler::metrics as scheduler_metrics; use crate::scheduler::metrics as scheduler_metrics;
use crate::{ use crate::{
configs::settings, configs::settings,
connection,
core::{ core::{
errors::{self, StorageErrorExt}, errors::{self, StorageErrorExt},
payment_methods::{ payment_methods::{
@ -1531,7 +1530,7 @@ pub async fn list_customer_payment_method(
}; };
customer_pms.push(pma.to_owned()); customer_pms.push(pma.to_owned());
let redis_conn = connection::redis_connection(&state.conf).await; let redis_conn = state.store.get_redis_conn();
let key_for_hyperswitch_token = format!( let key_for_hyperswitch_token = format!(
"pm_token_{}_{}_hyperswitch", "pm_token_{}_{}_hyperswitch",
parent_payment_method_token, pma.payment_method parent_payment_method_token, pma.payment_method

View File

@ -24,7 +24,6 @@ use self::{
operations::{payment_complete_authorize, BoxedOperation, Operation}, operations::{payment_complete_authorize, BoxedOperation, Operation},
}; };
use crate::{ use crate::{
connection,
core::{ core::{
errors::{self, CustomResult, RouterResponse, RouterResult}, errors::{self, CustomResult, RouterResponse, RouterResult},
payment_methods::vault, payment_methods::vault,
@ -672,7 +671,7 @@ async fn decide_payment_method_tokenize_action(
} }
} }
Some(token) => { Some(token) => {
let redis_conn = connection::redis_connection(&state.conf).await; let redis_conn = state.store.get_redis_conn();
let key = format!( let key = format!(
"pm_token_{}_{}_{}", "pm_token_{}_{}_{}",
token.to_owned(), token.to_owned(),

View File

@ -19,7 +19,7 @@ use super::{
}; };
use crate::{ use crate::{
configs::settings::Server, configs::settings::Server,
connection, consts, consts,
core::{ core::{
errors::{self, CustomResult, RouterResult, StorageErrorExt}, errors::{self, CustomResult, RouterResult, StorageErrorExt},
payment_methods::{cards, vault}, payment_methods::{cards, vault},
@ -691,7 +691,7 @@ pub async fn make_pm_data<'a, F: Clone, R>(
let request = &payment_data.payment_method_data; let request = &payment_data.payment_method_data;
let token = payment_data.token.clone(); let token = payment_data.token.clone();
let hyperswitch_token = if let Some(token) = token { let hyperswitch_token = if let Some(token) = token {
let redis_conn = connection::redis_connection(&state.conf).await; let redis_conn = state.store.get_redis_conn();
let key = format!( let key = format!(
"pm_token_{}_{}_hyperswitch", "pm_token_{}_{}_hyperswitch",
token, token,

View File

@ -25,7 +25,10 @@ use std::sync::Arc;
use futures::lock::Mutex; use futures::lock::Mutex;
use crate::{services::Store, types::storage}; use crate::{
services::{self, Store},
types::storage,
};
#[derive(PartialEq, Eq)] #[derive(PartialEq, Eq)]
pub enum StorageImpl { pub enum StorageImpl {
@ -61,10 +64,10 @@ pub trait StorageInterface:
+ refund::RefundInterface + refund::RefundInterface
+ reverse_lookup::ReverseLookupInterface + reverse_lookup::ReverseLookupInterface
+ cards_info::CardsInfoInterface + cards_info::CardsInfoInterface
+ services::RedisConnInterface
+ 'static + 'static
{ {
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl StorageInterface for Store {} impl StorageInterface for Store {}
@ -117,4 +120,10 @@ where
.change_context(redis_interface::errors::RedisError::JsonDeserializationFailed) .change_context(redis_interface::errors::RedisError::JsonDeserializationFailed)
} }
impl services::RedisConnInterface for MockDb {
fn get_redis_conn(&self) -> Arc<redis_interface::RedisConnectionPool> {
self.redis.clone()
}
}
dyn_clone::clone_trait_object!(StorageInterface); dyn_clone::clone_trait_object!(StorageInterface);

View File

@ -78,6 +78,10 @@ impl PubSubInterface for redis_interface::RedisConnectionPool {
} }
} }
pub trait RedisConnInterface {
fn get_redis_conn(&self) -> Arc<redis_interface::RedisConnectionPool>;
}
#[derive(Clone)] #[derive(Clone)]
pub struct Store { pub struct Store {
pub master_pool: PgPool, pub master_pool: PgPool,
@ -185,3 +189,9 @@ impl Store {
.change_context(crate::core::errors::StorageError::KVError) .change_context(crate::core::errors::StorageError::KVError)
} }
} }
impl RedisConnInterface for Store {
fn get_redis_conn(&self) -> Arc<redis_interface::RedisConnectionPool> {
self.redis_conn.clone()
}
}