diff --git a/crates/drainer/src/env.rs b/crates/drainer/src/env.rs new file mode 100644 index 0000000000..a961a2dddd --- /dev/null +++ b/crates/drainer/src/env.rs @@ -0,0 +1,27 @@ +#[doc(inline)] +pub use router_env::*; + +pub mod logger { + #[doc(inline)] + pub use router_env::{log, logger::*}; + + /// + /// Setup logging sub-system + /// + /// + pub fn setup( + conf: &config::Log, + ) -> error_stack::Result { + Ok(router_env::setup( + conf, + "drainer", + vec![ + "drainer", + "common_utils", + "redis_interface", + "router_env", + "storage_models", + ], + )?) + } +} diff --git a/crates/drainer/src/errors.rs b/crates/drainer/src/errors.rs index 50d5876886..f46620cd33 100644 --- a/crates/drainer/src/errors.rs +++ b/crates/drainer/src/errors.rs @@ -9,6 +9,8 @@ pub enum DrainerError { RedisError(error_stack::Report), #[error("Application configuration error: {0}")] ConfigurationError(config::ConfigError), + #[error("Metrics initialization error")] + MetricsError, } pub type DrainerResult = error_stack::Result; diff --git a/crates/drainer/src/lib.rs b/crates/drainer/src/lib.rs index a0f5afcb32..81aaf159ca 100644 --- a/crates/drainer/src/lib.rs +++ b/crates/drainer/src/lib.rs @@ -1,14 +1,18 @@ mod connection; +pub mod env; pub mod errors; pub mod services; pub mod settings; mod utils; use std::sync::Arc; +pub use env as logger; +use logger::{instrument, tracing}; use storage_models::kv; use crate::{connection::pg_connection, services::Store}; +#[instrument(skip(store))] pub async fn start_drainer( store: Arc, number_of_streams: u8, @@ -32,8 +36,8 @@ async fn drainer_handler( let stream_name = utils::get_drainer_stream_name(store.clone(), stream_index); let drainer_result = drainer(store.clone(), max_read_count, stream_name.as_str()).await; - if let Err(_e) = drainer_result { - //TODO: LOG errors + if let Err(error) = drainer_result { + logger::error!(?error) } let flag_stream_name = utils::get_stream_key_flag(store.clone(), stream_index); @@ -122,8 +126,8 @@ mod macro_util { macro_rules! handle_resp { ($result:expr,$op_type:expr, $table:expr) => { match $result { - Ok(aa) => println!("Ok|{}|{}|{:?}|", $op_type, $table, aa), - Err(err) => println!("Err|{}|{}|{:?}|", $op_type, $table, err), + Ok(aa) => logger::info!("Ok|{}|{}|{:?}|", $op_type, $table, aa), + Err(err) => logger::error!("Err|{}|{}|{:?}|", $op_type, $table, err), } }; } diff --git a/crates/drainer/src/main.rs b/crates/drainer/src/main.rs index 1d928d9fc9..6c1ba08674 100644 --- a/crates/drainer/src/main.rs +++ b/crates/drainer/src/main.rs @@ -1,4 +1,5 @@ -use drainer::{errors::DrainerResult, services, settings, start_drainer}; +use drainer::{errors, errors::DrainerResult, logger::logger, services, settings, start_drainer}; +use error_stack::ResultExt; #[tokio::main] async fn main() -> DrainerResult<()> { @@ -18,7 +19,12 @@ async fn main() -> DrainerResult<()> { let number_of_streams = store.config.drainer_num_partitions; let max_read_count = conf.drainer.max_read_count; - start_drainer(store, number_of_streams, max_read_count).await?; + let _guard = logger::setup(&conf.log).change_context(errors::DrainerError::MetricsError)?; + logger::info!("Drainer started [{:?}] [{:?}]", conf.drainer, conf.log); + + start_drainer(store.clone(), number_of_streams, max_read_count).await?; + + store.close().await; Ok(()) } diff --git a/crates/drainer/src/services.rs b/crates/drainer/src/services.rs index dd86010716..a9cbf12378 100644 --- a/crates/drainer/src/services.rs +++ b/crates/drainer/src/services.rs @@ -31,4 +31,13 @@ impl Store { // Example: {shard_5}_drainer_stream format!("{{{}}}_{}", shard_key, self.config.drainer_stream_name,) } + + #[allow(clippy::expect_used)] + pub async fn close(mut self: Arc) { + Arc::get_mut(&mut self) + .and_then(|inner| Arc::get_mut(&mut inner.redis_conn)) + .expect("Redis connection pool cannot be closed") + .close_connections() + .await; + } } diff --git a/crates/drainer/src/utils.rs b/crates/drainer/src/utils.rs index 91c1527554..45142c4087 100644 --- a/crates/drainer/src/utils.rs +++ b/crates/drainer/src/utils.rs @@ -5,7 +5,7 @@ use redis_interface as redis; use crate::{ errors::{self, DrainerError}, - services, + logger, services, }; pub type StreamEntries = Vec<(String, HashMap)>; @@ -20,7 +20,8 @@ pub async fn is_stream_available(stream_index: u8, store: Arc) .await { Ok(resp) => resp == redis::types::SetnxReply::KeySet, - Err(_e) => { + Err(error) => { + logger::error!(?error); // Add metrics or logs false }