feat(Core): gracefully shutdown router/scheduler if Redis is unavailable (#891)

Co-authored-by: prajjwal kumar <prajjwal.kumar@prajjwalkumar-DWKH9NPY4R.local>
Co-authored-by: Nishant Joshi <nishant.joshi@juspay.in>
Co-authored-by: Arun Raj M <jarnura47@gmail.com>
This commit is contained in:
Prajjwal Kumar
2023-04-24 13:08:00 +05:30
committed by GitHub
parent 85c7629061
commit 13185999d5
21 changed files with 166 additions and 74 deletions

View File

@ -152,7 +152,7 @@ mod pii_masking_strategy_tests {
#[test] #[test]
fn test_invalid_card_number_masking() { fn test_invalid_card_number_masking() {
let secret: Secret<String, CardNumber> = Secret::new("1234567890".to_string()); let secret: Secret<String, CardNumber> = Secret::new("1234567890".to_string());
assert_eq!("123456****", format!("{secret:?}")); assert_eq!("*** alloc::string::String ***", format!("{secret:?}",));
} }
/* /*

View File

@ -2,20 +2,21 @@
use futures::StreamExt; use futures::StreamExt;
use router_env::logger; use router_env::logger;
pub use tokio::sync::oneshot; use tokio::sync::mpsc;
/// ///
/// This functions is meant to run in parallel to the application. /// This functions is meant to run in parallel to the application.
/// It will send a signal to the receiver when a SIGTERM or SIGINT is received /// It will send a signal to the receiver when a SIGTERM or SIGINT is received
/// ///
pub async fn signal_handler(mut sig: signal_hook_tokio::Signals, sender: oneshot::Sender<()>) { pub async fn signal_handler(mut sig: signal_hook_tokio::Signals, sender: mpsc::Sender<()>) {
if let Some(signal) = sig.next().await { if let Some(signal) = sig.next().await {
logger::info!( logger::info!(
"Received signal: {:?}", "Received signal: {:?}",
signal_hook::low_level::signal_name(signal) signal_hook::low_level::signal_name(signal)
); );
match signal { match signal {
signal_hook::consts::SIGTERM | signal_hook::consts::SIGINT => match sender.send(()) { signal_hook::consts::SIGTERM | signal_hook::consts::SIGINT => match sender.try_send(())
{
Ok(_) => { Ok(_) => {
logger::info!("Request for force shutdown received") logger::info!("Request for force shutdown received")
} }

View File

@ -7,10 +7,11 @@ pub mod settings;
mod utils; mod utils;
use std::sync::{atomic, Arc}; use std::sync::{atomic, Arc};
use common_utils::signals::{get_allowed_signals, oneshot}; use common_utils::signals::get_allowed_signals;
pub use env as logger; pub use env as logger;
use error_stack::{IntoReport, ResultExt}; use error_stack::{IntoReport, ResultExt};
use storage_models::kv; use storage_models::kv;
use tokio::sync::mpsc;
use crate::{connection::pg_connection, services::Store}; use crate::{connection::pg_connection, services::Store};
@ -35,14 +36,15 @@ pub async fn start_drainer(
.change_context(errors::DrainerError::SignalError( .change_context(errors::DrainerError::SignalError(
"Failed while getting allowed signals".to_string(), "Failed while getting allowed signals".to_string(),
))?; ))?;
let (tx, mut rx) = oneshot::channel();
let (tx, mut rx) = mpsc::channel(1);
let handle = signal.handle(); let handle = signal.handle();
let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, tx)); let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, tx));
let active_tasks = Arc::new(atomic::AtomicU64::new(0)); let active_tasks = Arc::new(atomic::AtomicU64::new(0));
'event: loop { 'event: loop {
match rx.try_recv() { match rx.try_recv() {
Err(oneshot::error::TryRecvError::Empty) => { Err(mpsc::error::TryRecvError::Empty) => {
if utils::is_stream_available(stream_index, store.clone()).await { if utils::is_stream_available(stream_index, store.clone()).await {
tokio::spawn(drainer_handler( tokio::spawn(drainer_handler(
store.clone(), store.clone(),
@ -59,17 +61,17 @@ pub async fn start_drainer(
) )
.await; .await;
} }
Ok(()) | Err(oneshot::error::TryRecvError::Closed) => { Ok(()) | Err(mpsc::error::TryRecvError::Disconnected) => {
logger::info!("Awaiting shutdown!"); logger::info!("Awaiting shutdown!");
metrics::SHUTDOWN_SIGNAL_RECEIVED.add(&metrics::CONTEXT, 1, &[]); metrics::SHUTDOWN_SIGNAL_RECEIVED.add(&metrics::CONTEXT, 1, &[]);
let shutdown_started = tokio::time::Instant::now(); let shutdown_started = tokio::time::Instant::now();
rx.close();
loop { loop {
if active_tasks.load(atomic::Ordering::Acquire) == 0 { if active_tasks.load(atomic::Ordering::Acquire) == 0 {
logger::info!("Terminating drainer"); logger::info!("Terminating drainer");
metrics::SUCCESSFUL_SHUTDOWN.add(&metrics::CONTEXT, 1, &[]); metrics::SUCCESSFUL_SHUTDOWN.add(&metrics::CONTEXT, 1, &[]);
let shutdown_ended = shutdown_started.elapsed().as_secs_f64() * 1000f64; let shutdown_ended = shutdown_started.elapsed().as_secs_f64() * 1000f64;
metrics::CLEANUP_TIME.record(&metrics::CONTEXT, shutdown_ended, &[]); metrics::CLEANUP_TIME.record(&metrics::CONTEXT, shutdown_ended, &[]);
break 'event; break 'event;
} }
shutdown_interval.tick().await; shutdown_interval.tick().await;

View File

@ -14,6 +14,7 @@ fred = { version = "6.0.0", features = ["metrics", "partial-tracing"] }
futures = "0.3" futures = "0.3"
serde = { version = "1.0.160", features = ["derive"] } serde = { version = "1.0.160", features = ["derive"] }
thiserror = "1.0.40" thiserror = "1.0.40"
tokio = "1.26.0"
# First party crates # First party crates
common_utils = { version = "0.1.0", path = "../common_utils", features = ["async_ext"] } common_utils = { version = "0.1.0", path = "../common_utils", features = ["async_ext"] }

View File

@ -138,12 +138,18 @@ impl RedisConnectionPool {
}; };
} }
} }
pub async fn on_error(&self) {
pub async fn on_error(&self, tx: tokio::sync::oneshot::Sender<()>) {
while let Ok(redis_error) = self.pool.on_error().recv().await { while let Ok(redis_error) = self.pool.on_error().recv().await {
logger::error!(?redis_error, "Redis protocol or connection error"); logger::error!(?redis_error, "Redis protocol or connection error");
logger::error!("current state: {:#?}", self.pool.state());
if self.pool.state() == fred::types::ClientState::Disconnected { if self.pool.state() == fred::types::ClientState::Disconnected {
if tx.send(()).is_err() {
logger::error!("The redis shutdown signal sender failed to signal");
}
self.is_redis_available self.is_redis_available
.store(false, atomic::Ordering::SeqCst); .store(false, atomic::Ordering::SeqCst);
break;
} }
} }
} }

View File

@ -42,7 +42,6 @@ async fn main() -> ApplicationResult<()> {
let (server, mut state) = router::start_server(conf) let (server, mut state) = router::start_server(conf)
.await .await
.expect("Failed to create the server"); .expect("Failed to create the server");
let _ = server.await; let _ = server.await;
state.store.close().await; state.store.close().await;

View File

@ -6,6 +6,7 @@ use router::{
core::errors::{self, CustomResult}, core::errors::{self, CustomResult},
logger, routes, scheduler, logger, routes, scheduler,
}; };
use tokio::sync::{mpsc, oneshot};
const SCHEDULER_FLOW: &str = "SCHEDULER_FLOW"; const SCHEDULER_FLOW: &str = "SCHEDULER_FLOW";
@ -18,14 +19,21 @@ async fn main() -> CustomResult<(), errors::ProcessTrackerError> {
#[allow(clippy::expect_used)] #[allow(clippy::expect_used)]
let conf = Settings::with_config_path(cmd_line.config_path) let conf = Settings::with_config_path(cmd_line.config_path)
.expect("Unable to construct application configuration"); .expect("Unable to construct application configuration");
// channel for listening to redis disconnect events
let mut state = routes::AppState::new(conf).await; let (redis_shutdown_signal_tx, redis_shutdown_signal_rx) = oneshot::channel();
let mut state = routes::AppState::new(conf, redis_shutdown_signal_tx).await;
// channel to shutdown scheduler gracefully
let (tx, rx) = mpsc::channel(1);
tokio::spawn(router::receiver_for_error(
redis_shutdown_signal_rx,
tx.clone(),
));
let _guard = let _guard =
logger::setup(&state.conf.log).map_err(|_| errors::ProcessTrackerError::UnexpectedFlow)?; logger::setup(&state.conf.log).map_err(|_| errors::ProcessTrackerError::UnexpectedFlow)?;
logger::debug!(startup_config=?state.conf); logger::debug!(startup_config=?state.conf);
start_scheduler(&state).await?; start_scheduler(&state, (tx, rx)).await?;
state.store.close().await; state.store.close().await;
@ -35,6 +43,7 @@ async fn main() -> CustomResult<(), errors::ProcessTrackerError> {
async fn start_scheduler( async fn start_scheduler(
state: &routes::AppState, state: &routes::AppState,
channel: (mpsc::Sender<()>, mpsc::Receiver<()>),
) -> CustomResult<(), errors::ProcessTrackerError> { ) -> CustomResult<(), errors::ProcessTrackerError> {
use std::str::FromStr; use std::str::FromStr;
@ -49,5 +58,5 @@ async fn start_scheduler(
.scheduler .scheduler
.clone() .clone()
.ok_or(errors::ProcessTrackerError::ConfigurationError)?; .ok_or(errors::ProcessTrackerError::ConfigurationError)?;
scheduler::start_process_tracker(state, flow, Arc::new(scheduler_settings)).await scheduler::start_process_tracker(state, flow, Arc::new(scheduler_settings), channel).await
} }

View File

@ -25,11 +25,12 @@ pub mod utils;
use actix_web::{ use actix_web::{
body::MessageBody, body::MessageBody,
dev::{Server, ServiceFactory, ServiceRequest}, dev::{Server, ServerHandle, ServiceFactory, ServiceRequest},
middleware::ErrorHandlers, middleware::ErrorHandlers,
}; };
use http::StatusCode; use http::StatusCode;
use routes::AppState; use routes::AppState;
use tokio::sync::{mpsc, oneshot};
pub use self::env::logger; pub use self::env::logger;
use crate::{ use crate::{
@ -140,7 +141,8 @@ pub fn mk_app(
pub async fn start_server(conf: settings::Settings) -> ApplicationResult<(Server, AppState)> { pub async fn start_server(conf: settings::Settings) -> ApplicationResult<(Server, AppState)> {
logger::debug!(startup_config=?conf); logger::debug!(startup_config=?conf);
let server = conf.server.clone(); let server = conf.server.clone();
let state = routes::AppState::new(conf).await; let (tx, rx) = oneshot::channel();
let state = routes::AppState::new(conf, tx).await;
// Cloning to close connections before shutdown // Cloning to close connections before shutdown
let app_state = state.clone(); let app_state = state.clone();
let request_body_limit = server.request_body_limit; let request_body_limit = server.request_body_limit;
@ -149,10 +151,40 @@ pub async fn start_server(conf: settings::Settings) -> ApplicationResult<(Server
.workers(server.workers) .workers(server.workers)
.shutdown_timeout(server.shutdown_timeout) .shutdown_timeout(server.shutdown_timeout)
.run(); .run();
tokio::spawn(receiver_for_error(rx, server.handle()));
Ok((server, app_state)) Ok((server, app_state))
} }
pub async fn receiver_for_error(rx: oneshot::Receiver<()>, mut server: impl Stop) {
match rx.await {
Ok(_) => {
logger::error!("The redis server failed ");
server.stop_server().await;
}
Err(err) => {
logger::error!("Channel receiver error{err}");
}
}
}
#[async_trait::async_trait]
pub trait Stop {
async fn stop_server(&mut self);
}
#[async_trait::async_trait]
impl Stop for ServerHandle {
async fn stop_server(&mut self) {
let _ = self.stop(true).await;
}
}
#[async_trait::async_trait]
impl Stop for mpsc::Sender<()> {
async fn stop_server(&mut self) {
let _ = self.send(()).await.map_err(|err| logger::error!("{err}"));
}
}
pub fn get_application_builder( pub fn get_application_builder(
request_body_limit: usize, request_body_limit: usize,
) -> actix_web::App< ) -> actix_web::App<

View File

@ -1,4 +1,5 @@
use actix_web::{web, Scope}; use actix_web::{web, Scope};
use tokio::sync::oneshot;
use super::health::*; use super::health::*;
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
@ -40,11 +41,15 @@ impl AppStateInfo for AppState {
} }
impl AppState { impl AppState {
pub async fn with_storage(conf: Settings, storage_impl: StorageImpl) -> Self { pub async fn with_storage(
conf: Settings,
storage_impl: StorageImpl,
shut_down_signal: oneshot::Sender<()>,
) -> Self {
let testable = storage_impl == StorageImpl::PostgresqlTest; let testable = storage_impl == StorageImpl::PostgresqlTest;
let store: Box<dyn StorageInterface> = match storage_impl { let store: Box<dyn StorageInterface> = match storage_impl {
StorageImpl::Postgresql | StorageImpl::PostgresqlTest => { StorageImpl::Postgresql | StorageImpl::PostgresqlTest => {
Box::new(Store::new(&conf, testable).await) Box::new(Store::new(&conf, testable, shut_down_signal).await)
} }
StorageImpl::Mock => Box::new(MockDb::new(&conf).await), StorageImpl::Mock => Box::new(MockDb::new(&conf).await),
}; };
@ -56,8 +61,8 @@ impl AppState {
} }
} }
pub async fn new(conf: Settings) -> Self { pub async fn new(conf: Settings, shut_down_signal: oneshot::Sender<()>) -> Self {
Self::with_storage(conf, StorageImpl::Postgresql).await Self::with_storage(conf, StorageImpl::Postgresql, shut_down_signal).await
} }
} }

View File

@ -9,6 +9,8 @@ pub mod workflows;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc;
pub use self::types::*; pub use self::types::*;
use crate::{ use crate::{
configs::settings::SchedulerSettings, configs::settings::SchedulerSettings,
@ -21,11 +23,20 @@ pub async fn start_process_tracker(
state: &AppState, state: &AppState,
scheduler_flow: SchedulerFlow, scheduler_flow: SchedulerFlow,
scheduler_settings: Arc<SchedulerSettings>, scheduler_settings: Arc<SchedulerSettings>,
channel: (mpsc::Sender<()>, mpsc::Receiver<()>),
) -> CustomResult<(), errors::ProcessTrackerError> { ) -> CustomResult<(), errors::ProcessTrackerError> {
match scheduler_flow { match scheduler_flow {
SchedulerFlow::Producer => producer::start_producer(state, scheduler_settings).await?, SchedulerFlow::Producer => {
producer::start_producer(state, scheduler_settings, channel).await?
}
SchedulerFlow::Consumer => { SchedulerFlow::Consumer => {
consumer::start_consumer(state, scheduler_settings, workflows::runner_from_task).await? consumer::start_consumer(
state,
scheduler_settings,
workflows::runner_from_task,
channel,
)
.await?
} }
SchedulerFlow::Cleaner => { SchedulerFlow::Cleaner => {
error!("This flow has not been implemented yet!"); error!("This flow has not been implemented yet!");

View File

@ -5,12 +5,13 @@ use std::{
sync::{self, atomic}, sync::{self, atomic},
}; };
use common_utils::signals::{get_allowed_signals, oneshot}; use common_utils::signals::get_allowed_signals;
use error_stack::{IntoReport, ResultExt}; use error_stack::{IntoReport, ResultExt};
use futures::future; use futures::future;
use redis_interface::{RedisConnectionPool, RedisEntryId}; use redis_interface::{RedisConnectionPool, RedisEntryId};
use router_env::{instrument, tracing}; use router_env::{instrument, tracing};
use time::PrimitiveDateTime; use time::PrimitiveDateTime;
use tokio::sync::mpsc;
use uuid::Uuid; use uuid::Uuid;
use super::{ use super::{
@ -37,6 +38,7 @@ pub async fn start_consumer(
state: &AppState, state: &AppState,
settings: sync::Arc<settings::SchedulerSettings>, settings: sync::Arc<settings::SchedulerSettings>,
workflow_selector: workflows::WorkflowSelectorFn, workflow_selector: workflows::WorkflowSelectorFn,
(tx, mut rx): (mpsc::Sender<()>, mpsc::Receiver<()>),
) -> CustomResult<(), errors::ProcessTrackerError> { ) -> CustomResult<(), errors::ProcessTrackerError> {
use std::time::Duration; use std::time::Duration;
@ -58,13 +60,12 @@ pub async fn start_consumer(
}) })
.into_report() .into_report()
.attach_printable("Failed while creating a signals handler")?; .attach_printable("Failed while creating a signals handler")?;
let (sx, mut rx) = oneshot::channel();
let handle = signal.handle(); let handle = signal.handle();
let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, sx)); let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, tx));
loop { loop {
match rx.try_recv() { match rx.try_recv() {
Err(oneshot::error::TryRecvError::Empty) => { Err(mpsc::error::TryRecvError::Empty) => {
interval.tick().await; interval.tick().await;
// A guard from env to disable the consumer // A guard from env to disable the consumer
@ -82,11 +83,11 @@ pub async fn start_consumer(
workflow_selector, workflow_selector,
)); ));
} }
Ok(()) | Err(oneshot::error::TryRecvError::Closed) => { Ok(()) | Err(mpsc::error::TryRecvError::Disconnected) => {
logger::debug!("Awaiting shutdown!"); logger::debug!("Awaiting shutdown!");
rx.close();
shutdown_interval.tick().await; shutdown_interval.tick().await;
let active_tasks = consumer_operation_counter.load(atomic::Ordering::Acquire); let active_tasks = consumer_operation_counter.load(atomic::Ordering::Acquire);
match active_tasks { match active_tasks {
0 => { 0 => {
logger::info!("Terminating consumer"); logger::info!("Terminating consumer");

View File

@ -1,9 +1,9 @@
use std::sync::Arc; use std::sync::Arc;
use common_utils::signals::oneshot;
use error_stack::{report, IntoReport, ResultExt}; use error_stack::{report, IntoReport, ResultExt};
use router_env::{instrument, tracing}; use router_env::{instrument, tracing};
use time::Duration; use time::Duration;
use tokio::sync::mpsc;
use super::metrics; use super::metrics;
use crate::{ use crate::{
@ -20,9 +20,9 @@ use crate::{
pub async fn start_producer( pub async fn start_producer(
state: &AppState, state: &AppState,
scheduler_settings: Arc<SchedulerSettings>, scheduler_settings: Arc<SchedulerSettings>,
(tx, mut rx): (mpsc::Sender<()>, mpsc::Receiver<()>),
) -> CustomResult<(), errors::ProcessTrackerError> { ) -> CustomResult<(), errors::ProcessTrackerError> {
use rand::Rng; use rand::Rng;
let timeout = rand::thread_rng().gen_range(0..=scheduler_settings.loop_interval); let timeout = rand::thread_rng().gen_range(0..=scheduler_settings.loop_interval);
tokio::time::sleep(std::time::Duration::from_millis(timeout)).await; tokio::time::sleep(std::time::Duration::from_millis(timeout)).await;
@ -41,15 +41,13 @@ pub async fn start_producer(
}) })
.into_report() .into_report()
.attach_printable("Failed while creating a signals handler")?; .attach_printable("Failed while creating a signals handler")?;
let (sx, mut rx) = oneshot::channel();
let handle = signal.handle(); let handle = signal.handle();
let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, sx)); let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, tx));
loop { loop {
match rx.try_recv() { match rx.try_recv() {
Err(oneshot::error::TryRecvError::Empty) => { Err(mpsc::error::TryRecvError::Empty) => {
interval.tick().await; interval.tick().await;
match run_producer_flow(state, &scheduler_settings).await { match run_producer_flow(state, &scheduler_settings).await {
Ok(_) => (), Ok(_) => (),
Err(error) => { Err(error) => {
@ -60,10 +58,10 @@ pub async fn start_producer(
} }
} }
} }
Ok(()) | Err(oneshot::error::TryRecvError::Closed) => { Ok(()) | Err(mpsc::error::TryRecvError::Disconnected) => {
logger::debug!("Awaiting shutdown!"); logger::debug!("Awaiting shutdown!");
rx.close();
shutdown_interval.tick().await; shutdown_interval.tick().await;
logger::info!("Terminating consumer"); logger::info!("Terminating consumer");
break; break;
} }

View File

@ -7,11 +7,13 @@ use std::sync::{atomic, Arc};
use error_stack::{IntoReport, ResultExt}; use error_stack::{IntoReport, ResultExt};
use redis_interface::{errors as redis_errors, PubsubInterface}; use redis_interface::{errors as redis_errors, PubsubInterface};
use tokio::sync::oneshot;
pub use self::{api::*, encryption::*}; pub use self::{api::*, encryption::*};
use crate::{ use crate::{
async_spawn, async_spawn,
cache::CONFIG_CACHE, cache::CONFIG_CACHE,
configs::settings,
connection::{diesel_make_pg_pool, PgPool}, connection::{diesel_make_pg_pool, PgPool},
consts, consts,
core::errors, core::errors,
@ -94,22 +96,24 @@ pub(crate) struct StoreConfig {
} }
impl Store { impl Store {
pub async fn new(config: &crate::configs::settings::Settings, test_transaction: bool) -> Self { pub async fn new(
config: &settings::Settings,
test_transaction: bool,
shut_down_signal: oneshot::Sender<()>,
) -> Self {
let redis_conn = Arc::new(crate::connection::redis_connection(config).await); let redis_conn = Arc::new(crate::connection::redis_connection(config).await);
let redis_clone = redis_conn.clone(); let redis_clone = redis_conn.clone();
let subscriber_conn = redis_conn.clone(); let subscriber_conn = redis_conn.clone();
redis_conn.subscribe(consts::PUB_SUB_CHANNEL).await.ok(); redis_conn.subscribe(consts::PUB_SUB_CHANNEL).await.ok();
async_spawn!({ async_spawn!({
if let Err(e) = subscriber_conn.on_message().await { if let Err(e) = subscriber_conn.on_message().await {
logger::error!(pubsub_err=?e); logger::error!(pubsub_err=?e);
} }
}); });
async_spawn!({ async_spawn!({
redis_clone.on_error().await; redis_clone.on_error(shut_down_signal).await;
}); });
Self { Self {

View File

@ -14,7 +14,7 @@ impl crate::utils::storage_partitioning::KvStorePartition for PaymentAttempt {}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
#![allow(clippy::expect_used, clippy::unwrap_used)] #![allow(clippy::expect_used, clippy::unwrap_used)]
use tokio::sync::oneshot;
use uuid::Uuid; use uuid::Uuid;
use super::*; use super::*;
@ -29,8 +29,8 @@ mod tests {
#[ignore] #[ignore]
async fn test_payment_attempt_insert() { async fn test_payment_attempt_insert() {
let conf = Settings::new().expect("invalid settings"); let conf = Settings::new().expect("invalid settings");
let tx: oneshot::Sender<()> = oneshot::channel().0;
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest).await; let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest, tx).await;
let payment_id = Uuid::new_v4().to_string(); let payment_id = Uuid::new_v4().to_string();
let current_time = common_utils::date_time::now(); let current_time = common_utils::date_time::now();
@ -59,7 +59,8 @@ mod tests {
async fn test_find_payment_attempt() { async fn test_find_payment_attempt() {
use crate::configs::settings::Settings; use crate::configs::settings::Settings;
let conf = Settings::new().expect("invalid settings"); let conf = Settings::new().expect("invalid settings");
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest).await; let tx: oneshot::Sender<()> = oneshot::channel().0;
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest, tx).await;
let current_time = common_utils::date_time::now(); let current_time = common_utils::date_time::now();
let payment_id = Uuid::new_v4().to_string(); let payment_id = Uuid::new_v4().to_string();
@ -105,7 +106,8 @@ mod tests {
use crate::configs::settings::Settings; use crate::configs::settings::Settings;
let conf = Settings::new().expect("invalid settings"); let conf = Settings::new().expect("invalid settings");
let uuid = uuid::Uuid::new_v4().to_string(); let uuid = uuid::Uuid::new_v4().to_string();
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest).await; let tx: oneshot::Sender<()> = oneshot::channel().0;
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest, tx).await;
let current_time = common_utils::date_time::now(); let current_time = common_utils::date_time::now();
let connector = types::Connector::Dummy.to_string(); let connector = types::Connector::Dummy.to_string();

View File

@ -9,6 +9,7 @@ use router::{
routes, services, routes, services,
types::{self, storage::enums, PaymentAddress}, types::{self, storage::enums, PaymentAddress},
}; };
use tokio::sync::oneshot;
use crate::connector_auth::ConnectorAuthentication; use crate::connector_auth::ConnectorAuthentication;
@ -117,7 +118,8 @@ fn construct_refund_router_data<F>() -> types::RefundsRouterData<F> {
async fn payments_create_success() { async fn payments_create_success() {
let conf = Settings::new().unwrap(); let conf = Settings::new().unwrap();
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest).await; let tx: oneshot::Sender<()> = oneshot::channel().0;
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest, tx).await;
static CV: aci::Aci = aci::Aci; static CV: aci::Aci = aci::Aci;
let connector = types::api::ConnectorData { let connector = types::api::ConnectorData {
@ -152,7 +154,8 @@ async fn payments_create_failure() {
{ {
let conf = Settings::new().unwrap(); let conf = Settings::new().unwrap();
static CV: aci::Aci = aci::Aci; static CV: aci::Aci = aci::Aci;
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest).await; let tx: oneshot::Sender<()> = oneshot::channel().0;
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest, tx).await;
let connector = types::api::ConnectorData { let connector = types::api::ConnectorData {
connector: Box::new(&CV), connector: Box::new(&CV),
connector_name: types::Connector::Aci, connector_name: types::Connector::Aci,
@ -199,7 +202,8 @@ async fn refund_for_successful_payments() {
connector_name: types::Connector::Aci, connector_name: types::Connector::Aci,
get_token: types::api::GetToken::Connector, get_token: types::api::GetToken::Connector,
}; };
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest).await; let tx: oneshot::Sender<()> = oneshot::channel().0;
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest, tx).await;
let connector_integration: services::BoxedConnectorIntegration< let connector_integration: services::BoxedConnectorIntegration<
'_, '_,
types::api::Authorize, types::api::Authorize,
@ -257,7 +261,8 @@ async fn refunds_create_failure() {
connector_name: types::Connector::Aci, connector_name: types::Connector::Aci,
get_token: types::api::GetToken::Connector, get_token: types::api::GetToken::Connector,
}; };
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest).await; let tx: oneshot::Sender<()> = oneshot::channel().0;
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest, tx).await;
let connector_integration: services::BoxedConnectorIntegration< let connector_integration: services::BoxedConnectorIntegration<
'_, '_,
types::api::Execute, types::api::Execute,

View File

@ -9,6 +9,7 @@ use router::{
routes, services, routes, services,
types::{self, storage::enums, PaymentAddress}, types::{self, storage::enums, PaymentAddress},
}; };
use tokio::sync::oneshot;
use crate::connector_auth::ConnectorAuthentication; use crate::connector_auth::ConnectorAuthentication;
@ -116,7 +117,8 @@ fn construct_refund_router_data<F>() -> types::RefundsRouterData<F> {
#[ignore] #[ignore]
async fn payments_create_success() { async fn payments_create_success() {
let conf = Settings::new().unwrap(); let conf = Settings::new().unwrap();
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest).await; let tx: oneshot::Sender<()> = oneshot::channel().0;
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest, tx).await;
static CV: Authorizedotnet = Authorizedotnet; static CV: Authorizedotnet = Authorizedotnet;
let connector = types::api::ConnectorData { let connector = types::api::ConnectorData {
connector: Box::new(&CV), connector: Box::new(&CV),
@ -159,7 +161,8 @@ async fn payments_create_failure() {
connector_name: types::Connector::Authorizedotnet, connector_name: types::Connector::Authorizedotnet,
get_token: types::api::GetToken::Connector, get_token: types::api::GetToken::Connector,
}; };
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest).await; let tx: oneshot::Sender<()> = oneshot::channel().0;
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest, tx).await;
let connector_integration: services::BoxedConnectorIntegration< let connector_integration: services::BoxedConnectorIntegration<
'_, '_,
types::api::Authorize, types::api::Authorize,
@ -207,7 +210,8 @@ async fn refunds_create_success() {
connector_name: types::Connector::Authorizedotnet, connector_name: types::Connector::Authorizedotnet,
get_token: types::api::GetToken::Connector, get_token: types::api::GetToken::Connector,
}; };
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest).await; let tx: oneshot::Sender<()> = oneshot::channel().0;
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest, tx).await;
let connector_integration: services::BoxedConnectorIntegration< let connector_integration: services::BoxedConnectorIntegration<
'_, '_,
types::api::Execute, types::api::Execute,
@ -244,7 +248,8 @@ async fn refunds_create_failure() {
connector_name: types::Connector::Authorizedotnet, connector_name: types::Connector::Authorizedotnet,
get_token: types::api::GetToken::Connector, get_token: types::api::GetToken::Connector,
}; };
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest).await; let tx: oneshot::Sender<()> = oneshot::channel().0;
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest, tx).await;
let connector_integration: services::BoxedConnectorIntegration< let connector_integration: services::BoxedConnectorIntegration<
'_, '_,
types::api::Execute, types::api::Execute,

View File

@ -5,7 +5,6 @@ use crate::{
connector_auth, connector_auth,
utils::{self, ConnectorActions}, utils::{self, ConnectorActions},
}; };
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
struct CheckoutTest; struct CheckoutTest;
impl ConnectorActions for CheckoutTest {} impl ConnectorActions for CheckoutTest {}

View File

@ -10,6 +10,7 @@ use router::{
routes, services, routes, services,
types::{self, api, storage::enums, AccessToken, PaymentAddress, RouterData}, types::{self, api, storage::enums, AccessToken, PaymentAddress, RouterData},
}; };
use tokio::sync::oneshot;
use wiremock::{Mock, MockServer}; use wiremock::{Mock, MockServer};
pub trait Connector { pub trait Connector {
@ -49,9 +50,13 @@ pub trait ConnectorActions: Connector {
}, },
payment_info, payment_info,
); );
let state = let tx: oneshot::Sender<()> = oneshot::channel().0;
routes::AppState::with_storage(Settings::new().unwrap(), StorageImpl::PostgresqlTest) let state = routes::AppState::with_storage(
.await; Settings::new().unwrap(),
StorageImpl::PostgresqlTest,
tx,
)
.await;
integration.execute_pretasks(&mut request, &state).await?; integration.execute_pretasks(&mut request, &state).await?;
call_connector(request, integration).await call_connector(request, integration).await
} }
@ -70,9 +75,13 @@ pub trait ConnectorActions: Connector {
}, },
payment_info, payment_info,
); );
let state = let tx: oneshot::Sender<()> = oneshot::channel().0;
routes::AppState::with_storage(Settings::new().unwrap(), StorageImpl::PostgresqlTest) let state = routes::AppState::with_storage(
.await; Settings::new().unwrap(),
StorageImpl::PostgresqlTest,
tx,
)
.await;
integration.execute_pretasks(&mut request, &state).await?; integration.execute_pretasks(&mut request, &state).await?;
call_connector(request, integration).await call_connector(request, integration).await
} }
@ -415,7 +424,8 @@ async fn call_connector<
integration: services::BoxedConnectorIntegration<'_, T, Req, Resp>, integration: services::BoxedConnectorIntegration<'_, T, Req, Resp>,
) -> Result<RouterData<T, Req, Resp>, Report<ConnectorError>> { ) -> Result<RouterData<T, Req, Resp>, Report<ConnectorError>> {
let conf = Settings::new().unwrap(); let conf = Settings::new().unwrap();
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest).await; let tx: oneshot::Sender<()> = oneshot::channel().0;
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest, tx).await;
services::api::execute_connector_processing_step( services::api::execute_connector_processing_step(
&state, &state,
integration, integration,

View File

@ -13,6 +13,7 @@ use router::{
}, },
}; };
use time::macros::datetime; use time::macros::datetime;
use tokio::sync::oneshot;
use uuid::Uuid; use uuid::Uuid;
// setting the connector in environment variables doesn't work when run in parallel. Neither does passing the paymentid // setting the connector in environment variables doesn't work when run in parallel. Neither does passing the paymentid
@ -273,8 +274,8 @@ fn connector_list() {
async fn payments_create_core() { async fn payments_create_core() {
use configs::settings::Settings; use configs::settings::Settings;
let conf = Settings::new().expect("invalid settings"); let conf = Settings::new().expect("invalid settings");
let tx: oneshot::Sender<()> = oneshot::channel().0;
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest).await; let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest, tx).await;
let merchant_account = state let merchant_account = state
.store .store
@ -420,8 +421,8 @@ async fn payments_create_core() {
async fn payments_create_core_adyen_no_redirect() { async fn payments_create_core_adyen_no_redirect() {
use crate::configs::settings::Settings; use crate::configs::settings::Settings;
let conf = Settings::new().expect("invalid settings"); let conf = Settings::new().expect("invalid settings");
let tx: oneshot::Sender<()> = oneshot::channel().0;
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest).await; let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest, tx).await;
let customer_id = format!("cust_{}", Uuid::new_v4()); let customer_id = format!("cust_{}", Uuid::new_v4());
let merchant_id = "arunraj".to_string(); let merchant_id = "arunraj".to_string();

View File

@ -9,6 +9,7 @@ use router::{
*, *,
}; };
use time::macros::datetime; use time::macros::datetime;
use tokio::sync::oneshot;
use uuid::Uuid; use uuid::Uuid;
#[test] #[test]
@ -33,8 +34,8 @@ fn connector_list() {
async fn payments_create_core() { async fn payments_create_core() {
use router::configs::settings::Settings; use router::configs::settings::Settings;
let conf = Settings::new().expect("invalid settings"); let conf = Settings::new().expect("invalid settings");
let tx: oneshot::Sender<()> = oneshot::channel().0;
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest).await; let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest, tx).await;
let merchant_account = state let merchant_account = state
.store .store
@ -186,8 +187,8 @@ async fn payments_create_core() {
async fn payments_create_core_adyen_no_redirect() { async fn payments_create_core_adyen_no_redirect() {
use router::configs::settings::Settings; use router::configs::settings::Settings;
let conf = Settings::new().expect("invalid settings"); let conf = Settings::new().expect("invalid settings");
let tx: oneshot::Sender<()> = oneshot::channel().0;
let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest).await; let state = routes::AppState::with_storage(conf, StorageImpl::PostgresqlTest, tx).await;
let customer_id = format!("cust_{}", Uuid::new_v4()); let customer_id = format!("cust_{}", Uuid::new_v4());
let merchant_id = "arunraj".to_string(); let merchant_id = "arunraj".to_string();

View File

@ -14,7 +14,7 @@ use derive_deref::Deref;
use router::{configs::settings::Settings, routes::AppState}; use router::{configs::settings::Settings, routes::AppState};
use serde::{de::DeserializeOwned, Deserialize}; use serde::{de::DeserializeOwned, Deserialize};
use serde_json::{json, Value}; use serde_json::{json, Value};
use tokio::sync::OnceCell; use tokio::sync::{oneshot, OnceCell};
static SERVER: OnceCell<bool> = OnceCell::const_new(); static SERVER: OnceCell<bool> = OnceCell::const_new();
@ -47,8 +47,8 @@ pub async fn mk_service(
if let Some(url) = stripemock().await { if let Some(url) = stripemock().await {
conf.connectors.stripe.base_url = url; conf.connectors.stripe.base_url = url;
} }
let tx: oneshot::Sender<()> = oneshot::channel().0;
let app_state = AppState::with_storage(conf, router::db::StorageImpl::Mock).await; let app_state = AppState::with_storage(conf, router::db::StorageImpl::Mock, tx).await;
actix_web::test::init_service(router::mk_app(app_state, request_body_limit)).await actix_web::test::init_service(router::mk_app(app_state, request_body_limit)).await
} }