diff --git a/crates/drainer/src/main.rs b/crates/drainer/src/main.rs index c9a8533a43..a8b63e1447 100644 --- a/crates/drainer/src/main.rs +++ b/crates/drainer/src/main.rs @@ -33,6 +33,5 @@ async fn main() -> DrainerResult<()> { ) .await?; - store.close().await; Ok(()) } diff --git a/crates/drainer/src/services.rs b/crates/drainer/src/services.rs index 5d72d83621..fd1f6c0eff 100644 --- a/crates/drainer/src/services.rs +++ b/crates/drainer/src/services.rs @@ -37,13 +37,4 @@ impl Store { // Example: {shard_5}_drainer_stream format!("{{{}}}_{}", shard_key, self.config.drainer_stream_name,) } - - #[allow(clippy::expect_used)] - pub async fn close(mut self: Arc) { - Arc::get_mut(&mut self) - .and_then(|inner| Arc::get_mut(&mut inner.redis_conn)) - .expect("Redis connection pool cannot be closed") - .close_connections() - .await; - } } diff --git a/crates/redis_interface/src/lib.rs b/crates/redis_interface/src/lib.rs index bcf9ee1d80..3a4ffdebc5 100644 --- a/crates/redis_interface/src/lib.rs +++ b/crates/redis_interface/src/lib.rs @@ -155,6 +155,13 @@ impl RedisConnectionPool { } } +impl Drop for RedisConnectionPool { + fn drop(&mut self) { + let rt = tokio::runtime::Handle::current(); + rt.block_on(self.close_connections()) + } +} + struct RedisConfig { default_ttl: u32, default_stream_read_count: u64, diff --git a/crates/router/src/bin/router.rs b/crates/router/src/bin/router.rs index f5c029a4b8..02ff2c241e 100644 --- a/crates/router/src/bin/router.rs +++ b/crates/router/src/bin/router.rs @@ -39,13 +39,11 @@ async fn main() -> ApplicationResult<()> { logger::info!("Application started [{:?}] [{:?}]", conf.server, conf.log); #[allow(clippy::expect_used)] - let (server, mut state) = router::start_server(conf) + let server = router::start_server(conf) .await .expect("Failed to create the server"); let _ = server.await; - state.store.close().await; - Err(ApplicationError::from(std::io::Error::new( std::io::ErrorKind::Other, "Server shut down", diff --git a/crates/router/src/bin/scheduler.rs b/crates/router/src/bin/scheduler.rs index 1418eef7b1..810a24d828 100644 --- a/crates/router/src/bin/scheduler.rs +++ b/crates/router/src/bin/scheduler.rs @@ -21,7 +21,7 @@ async fn main() -> CustomResult<(), errors::ProcessTrackerError> { .expect("Unable to construct application configuration"); // channel for listening to redis disconnect events let (redis_shutdown_signal_tx, redis_shutdown_signal_rx) = oneshot::channel(); - let mut state = routes::AppState::new(conf, redis_shutdown_signal_tx).await; + let state = routes::AppState::new(conf, redis_shutdown_signal_tx).await; // channel to shutdown scheduler gracefully let (tx, rx) = mpsc::channel(1); tokio::spawn(router::receiver_for_error( @@ -34,8 +34,6 @@ async fn main() -> CustomResult<(), errors::ProcessTrackerError> { start_scheduler(&state, (tx, rx)).await?; - state.store.close().await; - eprintln!("Scheduler shut down"); Ok(()) } diff --git a/crates/router/src/db.rs b/crates/router/src/db.rs index e8a00809cb..a8dadd2ab8 100644 --- a/crates/router/src/db.rs +++ b/crates/router/src/db.rs @@ -63,19 +63,10 @@ pub trait StorageInterface: + cards_info::CardsInfoInterface + 'static { - async fn close(&mut self) {} } #[async_trait::async_trait] -impl StorageInterface for Store { - #[allow(clippy::expect_used)] - async fn close(&mut self) { - std::sync::Arc::get_mut(&mut self.redis_conn) - .expect("Redis connection pool cannot be closed") - .close_connections() - .await; - } -} +impl StorageInterface for Store {} #[derive(Clone)] pub struct MockDb { @@ -107,15 +98,7 @@ impl MockDb { } #[async_trait::async_trait] -impl StorageInterface for MockDb { - #[allow(clippy::expect_used)] - async fn close(&mut self) { - std::sync::Arc::get_mut(&mut self.redis) - .expect("Redis connection pool cannot be closed") - .close_connections() - .await; - } -} +impl StorageInterface for MockDb {} pub async fn get_and_deserialize_key( db: &dyn StorageInterface, diff --git a/crates/router/src/lib.rs b/crates/router/src/lib.rs index 3fb44938a1..b35e503f2a 100644 --- a/crates/router/src/lib.rs +++ b/crates/router/src/lib.rs @@ -139,13 +139,11 @@ pub fn mk_app( /// /// Unwrap used because without the value we can't start the server #[allow(clippy::expect_used, clippy::unwrap_used)] -pub async fn start_server(conf: settings::Settings) -> ApplicationResult<(Server, AppState)> { +pub async fn start_server(conf: settings::Settings) -> ApplicationResult { logger::debug!(startup_config=?conf); let server = conf.server.clone(); let (tx, rx) = oneshot::channel(); let state = routes::AppState::new(conf, tx).await; - // Cloning to close connections before shutdown - let app_state = state.clone(); let request_body_limit = server.request_body_limit; let server = actix_web::HttpServer::new(move || mk_app(state.clone(), request_body_limit)) .bind((server.host.as_str(), server.port))? @@ -153,7 +151,7 @@ pub async fn start_server(conf: settings::Settings) -> ApplicationResult<(Server .shutdown_timeout(server.shutdown_timeout) .run(); tokio::spawn(receiver_for_error(rx, server.handle())); - Ok((server, app_state)) + Ok(server) } pub async fn receiver_for_error(rx: oneshot::Receiver<()>, mut server: impl Stop) { diff --git a/crates/router/tests/utils.rs b/crates/router/tests/utils.rs index e7c34afc31..5181cbff18 100644 --- a/crates/router/tests/utils.rs +++ b/crates/router/tests/utils.rs @@ -20,7 +20,7 @@ static SERVER: OnceCell = OnceCell::const_new(); async fn spawn_server() -> bool { let conf = Settings::new().expect("invalid settings"); - let (server, _state) = router::start_server(conf) + let server = router::start_server(conf) .await .expect("failed to create server");