mirror of
				https://github.com/juspay/hyperswitch.git
				synced 2025-10-31 10:06:32 +08:00 
			
		
		
		
	feat: add logging functionality in drainer (#495)
This commit is contained in:
		
							
								
								
									
										27
									
								
								crates/drainer/src/env.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										27
									
								
								crates/drainer/src/env.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,27 @@ | |||||||
|  | #[doc(inline)] | ||||||
|  | pub use router_env::*; | ||||||
|  |  | ||||||
|  | pub mod logger { | ||||||
|  |     #[doc(inline)] | ||||||
|  |     pub use router_env::{log, logger::*}; | ||||||
|  |  | ||||||
|  |     /// | ||||||
|  |     /// Setup logging sub-system | ||||||
|  |     /// | ||||||
|  |     /// | ||||||
|  |     pub fn setup( | ||||||
|  |         conf: &config::Log, | ||||||
|  |     ) -> error_stack::Result<TelemetryGuard, router_env::opentelemetry::metrics::MetricsError> { | ||||||
|  |         Ok(router_env::setup( | ||||||
|  |             conf, | ||||||
|  |             "drainer", | ||||||
|  |             vec![ | ||||||
|  |                 "drainer", | ||||||
|  |                 "common_utils", | ||||||
|  |                 "redis_interface", | ||||||
|  |                 "router_env", | ||||||
|  |                 "storage_models", | ||||||
|  |             ], | ||||||
|  |         )?) | ||||||
|  |     } | ||||||
|  | } | ||||||
| @ -9,6 +9,8 @@ pub enum DrainerError { | |||||||
|     RedisError(error_stack::Report<redis::errors::RedisError>), |     RedisError(error_stack::Report<redis::errors::RedisError>), | ||||||
|     #[error("Application configuration error: {0}")] |     #[error("Application configuration error: {0}")] | ||||||
|     ConfigurationError(config::ConfigError), |     ConfigurationError(config::ConfigError), | ||||||
|  |     #[error("Metrics initialization error")] | ||||||
|  |     MetricsError, | ||||||
| } | } | ||||||
|  |  | ||||||
| pub type DrainerResult<T> = error_stack::Result<T, DrainerError>; | pub type DrainerResult<T> = error_stack::Result<T, DrainerError>; | ||||||
|  | |||||||
| @ -1,14 +1,18 @@ | |||||||
| mod connection; | mod connection; | ||||||
|  | pub mod env; | ||||||
| pub mod errors; | pub mod errors; | ||||||
| pub mod services; | pub mod services; | ||||||
| pub mod settings; | pub mod settings; | ||||||
| mod utils; | mod utils; | ||||||
| use std::sync::Arc; | use std::sync::Arc; | ||||||
|  |  | ||||||
|  | pub use env as logger; | ||||||
|  | use logger::{instrument, tracing}; | ||||||
| use storage_models::kv; | use storage_models::kv; | ||||||
|  |  | ||||||
| use crate::{connection::pg_connection, services::Store}; | use crate::{connection::pg_connection, services::Store}; | ||||||
|  |  | ||||||
|  | #[instrument(skip(store))] | ||||||
| pub async fn start_drainer( | pub async fn start_drainer( | ||||||
|     store: Arc<Store>, |     store: Arc<Store>, | ||||||
|     number_of_streams: u8, |     number_of_streams: u8, | ||||||
| @ -32,8 +36,8 @@ async fn drainer_handler( | |||||||
|     let stream_name = utils::get_drainer_stream_name(store.clone(), stream_index); |     let stream_name = utils::get_drainer_stream_name(store.clone(), stream_index); | ||||||
|     let drainer_result = drainer(store.clone(), max_read_count, stream_name.as_str()).await; |     let drainer_result = drainer(store.clone(), max_read_count, stream_name.as_str()).await; | ||||||
|  |  | ||||||
|     if let Err(_e) = drainer_result { |     if let Err(error) = drainer_result { | ||||||
|         //TODO: LOG errors |         logger::error!(?error) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     let flag_stream_name = utils::get_stream_key_flag(store.clone(), stream_index); |     let flag_stream_name = utils::get_stream_key_flag(store.clone(), stream_index); | ||||||
| @ -122,8 +126,8 @@ mod macro_util { | |||||||
|     macro_rules! handle_resp { |     macro_rules! handle_resp { | ||||||
|         ($result:expr,$op_type:expr, $table:expr) => { |         ($result:expr,$op_type:expr, $table:expr) => { | ||||||
|             match $result { |             match $result { | ||||||
|                 Ok(aa) => println!("Ok|{}|{}|{:?}|", $op_type, $table, aa), |                 Ok(aa) => logger::info!("Ok|{}|{}|{:?}|", $op_type, $table, aa), | ||||||
|                 Err(err) => println!("Err|{}|{}|{:?}|", $op_type, $table, err), |                 Err(err) => logger::error!("Err|{}|{}|{:?}|", $op_type, $table, err), | ||||||
|             } |             } | ||||||
|         }; |         }; | ||||||
|     } |     } | ||||||
|  | |||||||
| @ -1,4 +1,5 @@ | |||||||
| use drainer::{errors::DrainerResult, services, settings, start_drainer}; | use drainer::{errors, errors::DrainerResult, logger::logger, services, settings, start_drainer}; | ||||||
|  | use error_stack::ResultExt; | ||||||
|  |  | ||||||
| #[tokio::main] | #[tokio::main] | ||||||
| async fn main() -> DrainerResult<()> { | async fn main() -> DrainerResult<()> { | ||||||
| @ -18,7 +19,12 @@ async fn main() -> DrainerResult<()> { | |||||||
|     let number_of_streams = store.config.drainer_num_partitions; |     let number_of_streams = store.config.drainer_num_partitions; | ||||||
|     let max_read_count = conf.drainer.max_read_count; |     let max_read_count = conf.drainer.max_read_count; | ||||||
|  |  | ||||||
|     start_drainer(store, number_of_streams, max_read_count).await?; |     let _guard = logger::setup(&conf.log).change_context(errors::DrainerError::MetricsError)?; | ||||||
|  |  | ||||||
|  |     logger::info!("Drainer started [{:?}] [{:?}]", conf.drainer, conf.log); | ||||||
|  |  | ||||||
|  |     start_drainer(store.clone(), number_of_streams, max_read_count).await?; | ||||||
|  |  | ||||||
|  |     store.close().await; | ||||||
|     Ok(()) |     Ok(()) | ||||||
| } | } | ||||||
|  | |||||||
| @ -31,4 +31,13 @@ impl Store { | |||||||
|         // Example: {shard_5}_drainer_stream |         // Example: {shard_5}_drainer_stream | ||||||
|         format!("{{{}}}_{}", shard_key, self.config.drainer_stream_name,) |         format!("{{{}}}_{}", shard_key, self.config.drainer_stream_name,) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     #[allow(clippy::expect_used)] | ||||||
|  |     pub async fn close(mut self: Arc<Self>) { | ||||||
|  |         Arc::get_mut(&mut self) | ||||||
|  |             .and_then(|inner| Arc::get_mut(&mut inner.redis_conn)) | ||||||
|  |             .expect("Redis connection pool cannot be closed") | ||||||
|  |             .close_connections() | ||||||
|  |             .await; | ||||||
|  |     } | ||||||
| } | } | ||||||
|  | |||||||
| @ -5,7 +5,7 @@ use redis_interface as redis; | |||||||
|  |  | ||||||
| use crate::{ | use crate::{ | ||||||
|     errors::{self, DrainerError}, |     errors::{self, DrainerError}, | ||||||
|     services, |     logger, services, | ||||||
| }; | }; | ||||||
|  |  | ||||||
| pub type StreamEntries = Vec<(String, HashMap<String, String>)>; | pub type StreamEntries = Vec<(String, HashMap<String, String>)>; | ||||||
| @ -20,7 +20,8 @@ pub async fn is_stream_available(stream_index: u8, store: Arc<services::Store>) | |||||||
|         .await |         .await | ||||||
|     { |     { | ||||||
|         Ok(resp) => resp == redis::types::SetnxReply::KeySet, |         Ok(resp) => resp == redis::types::SetnxReply::KeySet, | ||||||
|         Err(_e) => { |         Err(error) => { | ||||||
|  |             logger::error!(?error); | ||||||
|             // Add metrics or logs |             // Add metrics or logs | ||||||
|             false |             false | ||||||
|         } |         } | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user
	 Nishant Joshi
					Nishant Joshi