mirror of
				https://github.com/juspay/hyperswitch.git
				synced 2025-10-30 17:47:54 +08:00 
			
		
		
		
	fix: impl Drop for RedisConnectionPool (#1051)
				
					
				
			This commit is contained in:
		| @ -33,6 +33,5 @@ async fn main() -> DrainerResult<()> { | |||||||
|     ) |     ) | ||||||
|     .await?; |     .await?; | ||||||
|  |  | ||||||
|     store.close().await; |  | ||||||
|     Ok(()) |     Ok(()) | ||||||
| } | } | ||||||
|  | |||||||
| @ -37,13 +37,4 @@ impl Store { | |||||||
|         // Example: {shard_5}_drainer_stream |         // Example: {shard_5}_drainer_stream | ||||||
|         format!("{{{}}}_{}", shard_key, self.config.drainer_stream_name,) |         format!("{{{}}}_{}", shard_key, self.config.drainer_stream_name,) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[allow(clippy::expect_used)] |  | ||||||
|     pub async fn close(mut self: Arc<Self>) { |  | ||||||
|         Arc::get_mut(&mut self) |  | ||||||
|             .and_then(|inner| Arc::get_mut(&mut inner.redis_conn)) |  | ||||||
|             .expect("Redis connection pool cannot be closed") |  | ||||||
|             .close_connections() |  | ||||||
|             .await; |  | ||||||
|     } |  | ||||||
| } | } | ||||||
|  | |||||||
| @ -155,6 +155,13 @@ impl RedisConnectionPool { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | impl Drop for RedisConnectionPool { | ||||||
|  |     fn drop(&mut self) { | ||||||
|  |         let rt = tokio::runtime::Handle::current(); | ||||||
|  |         rt.block_on(self.close_connections()) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
| struct RedisConfig { | struct RedisConfig { | ||||||
|     default_ttl: u32, |     default_ttl: u32, | ||||||
|     default_stream_read_count: u64, |     default_stream_read_count: u64, | ||||||
|  | |||||||
| @ -39,13 +39,11 @@ async fn main() -> ApplicationResult<()> { | |||||||
|     logger::info!("Application started [{:?}] [{:?}]", conf.server, conf.log); |     logger::info!("Application started [{:?}] [{:?}]", conf.server, conf.log); | ||||||
|  |  | ||||||
|     #[allow(clippy::expect_used)] |     #[allow(clippy::expect_used)] | ||||||
|     let (server, mut state) = router::start_server(conf) |     let server = router::start_server(conf) | ||||||
|         .await |         .await | ||||||
|         .expect("Failed to create the server"); |         .expect("Failed to create the server"); | ||||||
|     let _ = server.await; |     let _ = server.await; | ||||||
|  |  | ||||||
|     state.store.close().await; |  | ||||||
|  |  | ||||||
|     Err(ApplicationError::from(std::io::Error::new( |     Err(ApplicationError::from(std::io::Error::new( | ||||||
|         std::io::ErrorKind::Other, |         std::io::ErrorKind::Other, | ||||||
|         "Server shut down", |         "Server shut down", | ||||||
|  | |||||||
| @ -21,7 +21,7 @@ async fn main() -> CustomResult<(), errors::ProcessTrackerError> { | |||||||
|         .expect("Unable to construct application configuration"); |         .expect("Unable to construct application configuration"); | ||||||
|     // channel for listening to redis disconnect events |     // channel for listening to redis disconnect events | ||||||
|     let (redis_shutdown_signal_tx, redis_shutdown_signal_rx) = oneshot::channel(); |     let (redis_shutdown_signal_tx, redis_shutdown_signal_rx) = oneshot::channel(); | ||||||
|     let mut state = routes::AppState::new(conf, redis_shutdown_signal_tx).await; |     let state = routes::AppState::new(conf, redis_shutdown_signal_tx).await; | ||||||
|     // channel to shutdown scheduler gracefully |     // channel to shutdown scheduler gracefully | ||||||
|     let (tx, rx) = mpsc::channel(1); |     let (tx, rx) = mpsc::channel(1); | ||||||
|     tokio::spawn(router::receiver_for_error( |     tokio::spawn(router::receiver_for_error( | ||||||
| @ -34,8 +34,6 @@ async fn main() -> CustomResult<(), errors::ProcessTrackerError> { | |||||||
|  |  | ||||||
|     start_scheduler(&state, (tx, rx)).await?; |     start_scheduler(&state, (tx, rx)).await?; | ||||||
|  |  | ||||||
|     state.store.close().await; |  | ||||||
|  |  | ||||||
|     eprintln!("Scheduler shut down"); |     eprintln!("Scheduler shut down"); | ||||||
|     Ok(()) |     Ok(()) | ||||||
| } | } | ||||||
|  | |||||||
| @ -63,19 +63,10 @@ pub trait StorageInterface: | |||||||
|     + cards_info::CardsInfoInterface |     + cards_info::CardsInfoInterface | ||||||
|     + 'static |     + 'static | ||||||
| { | { | ||||||
|     async fn close(&mut self) {} |  | ||||||
| } | } | ||||||
|  |  | ||||||
| #[async_trait::async_trait] | #[async_trait::async_trait] | ||||||
| impl StorageInterface for Store { | impl StorageInterface for Store {} | ||||||
|     #[allow(clippy::expect_used)] |  | ||||||
|     async fn close(&mut self) { |  | ||||||
|         std::sync::Arc::get_mut(&mut self.redis_conn) |  | ||||||
|             .expect("Redis connection pool cannot be closed") |  | ||||||
|             .close_connections() |  | ||||||
|             .await; |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[derive(Clone)] | #[derive(Clone)] | ||||||
| pub struct MockDb { | pub struct MockDb { | ||||||
| @ -107,15 +98,7 @@ impl MockDb { | |||||||
| } | } | ||||||
|  |  | ||||||
| #[async_trait::async_trait] | #[async_trait::async_trait] | ||||||
| impl StorageInterface for MockDb { | impl StorageInterface for MockDb {} | ||||||
|     #[allow(clippy::expect_used)] |  | ||||||
|     async fn close(&mut self) { |  | ||||||
|         std::sync::Arc::get_mut(&mut self.redis) |  | ||||||
|             .expect("Redis connection pool cannot be closed") |  | ||||||
|             .close_connections() |  | ||||||
|             .await; |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| pub async fn get_and_deserialize_key<T>( | pub async fn get_and_deserialize_key<T>( | ||||||
|     db: &dyn StorageInterface, |     db: &dyn StorageInterface, | ||||||
|  | |||||||
| @ -139,13 +139,11 @@ pub fn mk_app( | |||||||
| /// | /// | ||||||
| ///  Unwrap used because without the value we can't start the server | ///  Unwrap used because without the value we can't start the server | ||||||
| #[allow(clippy::expect_used, clippy::unwrap_used)] | #[allow(clippy::expect_used, clippy::unwrap_used)] | ||||||
| pub async fn start_server(conf: settings::Settings) -> ApplicationResult<(Server, AppState)> { | pub async fn start_server(conf: settings::Settings) -> ApplicationResult<Server> { | ||||||
|     logger::debug!(startup_config=?conf); |     logger::debug!(startup_config=?conf); | ||||||
|     let server = conf.server.clone(); |     let server = conf.server.clone(); | ||||||
|     let (tx, rx) = oneshot::channel(); |     let (tx, rx) = oneshot::channel(); | ||||||
|     let state = routes::AppState::new(conf, tx).await; |     let state = routes::AppState::new(conf, tx).await; | ||||||
|     // Cloning to close connections before shutdown |  | ||||||
|     let app_state = state.clone(); |  | ||||||
|     let request_body_limit = server.request_body_limit; |     let request_body_limit = server.request_body_limit; | ||||||
|     let server = actix_web::HttpServer::new(move || mk_app(state.clone(), request_body_limit)) |     let server = actix_web::HttpServer::new(move || mk_app(state.clone(), request_body_limit)) | ||||||
|         .bind((server.host.as_str(), server.port))? |         .bind((server.host.as_str(), server.port))? | ||||||
| @ -153,7 +151,7 @@ pub async fn start_server(conf: settings::Settings) -> ApplicationResult<(Server | |||||||
|         .shutdown_timeout(server.shutdown_timeout) |         .shutdown_timeout(server.shutdown_timeout) | ||||||
|         .run(); |         .run(); | ||||||
|     tokio::spawn(receiver_for_error(rx, server.handle())); |     tokio::spawn(receiver_for_error(rx, server.handle())); | ||||||
|     Ok((server, app_state)) |     Ok(server) | ||||||
| } | } | ||||||
|  |  | ||||||
| pub async fn receiver_for_error(rx: oneshot::Receiver<()>, mut server: impl Stop) { | pub async fn receiver_for_error(rx: oneshot::Receiver<()>, mut server: impl Stop) { | ||||||
|  | |||||||
| @ -20,7 +20,7 @@ static SERVER: OnceCell<bool> = OnceCell::const_new(); | |||||||
|  |  | ||||||
| async fn spawn_server() -> bool { | async fn spawn_server() -> bool { | ||||||
|     let conf = Settings::new().expect("invalid settings"); |     let conf = Settings::new().expect("invalid settings"); | ||||||
|     let (server, _state) = router::start_server(conf) |     let server = router::start_server(conf) | ||||||
|         .await |         .await | ||||||
|         .expect("failed to create server"); |         .expect("failed to create server"); | ||||||
|  |  | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user
	 Kartikeya Hegde
					Kartikeya Hegde