diff --git a/Cargo.lock b/Cargo.lock index cd5960a68c..3fa4c12c34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -569,9 +569,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.73" +version = "0.1.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" +checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2", "quote", @@ -2266,6 +2266,7 @@ name = "drainer" version = "0.1.0" dependencies = [ "async-bb8-diesel", + "async-trait", "bb8", "clap", "common_utils", diff --git a/crates/diesel_models/src/kv.rs b/crates/diesel_models/src/kv.rs index dd12a916c9..d067192e24 100644 --- a/crates/diesel_models/src/kv.rs +++ b/crates/diesel_models/src/kv.rs @@ -7,8 +7,8 @@ use crate::{ payment_attempt::{PaymentAttempt, PaymentAttemptNew, PaymentAttemptUpdate}, payment_intent::{PaymentIntentNew, PaymentIntentUpdate}, refund::{Refund, RefundNew, RefundUpdate}, - reverse_lookup::ReverseLookupNew, - PaymentIntent, + reverse_lookup::{ReverseLookup, ReverseLookupNew}, + PaymentIntent, PgPooledConn, }; #[derive(Debug, Serialize, Deserialize)] @@ -16,7 +16,41 @@ use crate::{ pub enum DBOperation { Insert { insertable: Insertable }, Update { updatable: Updateable }, - Delete, +} + +impl DBOperation { + pub fn operation<'a>(&self) -> &'a str { + match self { + Self::Insert { .. } => "insert", + Self::Update { .. } => "update", + } + } + pub fn table<'a>(&self) -> &'a str { + match self { + Self::Insert { insertable } => match insertable { + Insertable::PaymentIntent(_) => "payment_intent", + Insertable::PaymentAttempt(_) => "payment_attempt", + Insertable::Refund(_) => "refund", + Insertable::Address(_) => "address", + Insertable::ReverseLookUp(_) => "reverse_lookup", + }, + Self::Update { updatable } => match updatable { + Updateable::PaymentIntentUpdate(_) => "payment_intent", + Updateable::PaymentAttemptUpdate(_) => "payment_attempt", + Updateable::RefundUpdate(_) => "refund", + Updateable::AddressUpdate(_) => "address", + }, + } + } +} + +#[derive(Debug)] +pub enum DBResult { + PaymentIntent(Box), + PaymentAttempt(Box), + Refund(Box), + Address(Box
), + ReverseLookUp(Box), } #[derive(Debug, Serialize, Deserialize)] @@ -25,6 +59,40 @@ pub struct TypedSql { pub op: DBOperation, } +impl DBOperation { + pub async fn execute(self, conn: &PgPooledConn) -> crate::StorageResult { + Ok(match self { + Self::Insert { insertable } => match insertable { + Insertable::PaymentIntent(a) => { + DBResult::PaymentIntent(Box::new(a.insert(conn).await?)) + } + Insertable::PaymentAttempt(a) => { + DBResult::PaymentAttempt(Box::new(a.insert(conn).await?)) + } + Insertable::Refund(a) => DBResult::Refund(Box::new(a.insert(conn).await?)), + Insertable::Address(addr) => DBResult::Address(Box::new(addr.insert(conn).await?)), + Insertable::ReverseLookUp(rev) => { + DBResult::ReverseLookUp(Box::new(rev.insert(conn).await?)) + } + }, + Self::Update { updatable } => match updatable { + Updateable::PaymentIntentUpdate(a) => { + DBResult::PaymentIntent(Box::new(a.orig.update(conn, a.update_data).await?)) + } + Updateable::PaymentAttemptUpdate(a) => DBResult::PaymentAttempt(Box::new( + a.orig.update_with_attempt_id(conn, a.update_data).await?, + )), + Updateable::RefundUpdate(a) => { + DBResult::Refund(Box::new(a.orig.update(conn, a.update_data).await?)) + } + Updateable::AddressUpdate(a) => { + DBResult::Address(Box::new(a.orig.update(conn, a.update_data).await?)) + } + }, + }) + } +} + impl TypedSql { pub fn to_field_value_pairs( &self, diff --git a/crates/drainer/Cargo.toml b/crates/drainer/Cargo.toml index db5b6c37e9..50e0effd03 100644 --- a/crates/drainer/Cargo.toml +++ b/crates/drainer/Cargo.toml @@ -25,6 +25,7 @@ serde_json = "1.0.108" serde_path_to_error = "0.1.14" thiserror = "1.0.40" tokio = { version = "1.28.2", features = ["macros", "rt-multi-thread"] } +async-trait = "0.1.74" # First Party Crates common_utils = { version = "0.1.0", path = "../common_utils", features = ["signals"] } diff --git a/crates/drainer/src/errors.rs b/crates/drainer/src/errors.rs index 42ccfc7a0d..3034e849f8 100644 --- a/crates/drainer/src/errors.rs +++ b/crates/drainer/src/errors.rs @@ -11,6 +11,8 @@ pub enum DrainerError { ConfigurationError(config::ConfigError), #[error("Error while configuring signals: {0}")] SignalError(String), + #[error("Error while parsing data from the stream: {0:?}")] + ParsingError(error_stack::Report), #[error("Unexpected error occurred: {0}")] UnexpectedError(String), } diff --git a/crates/drainer/src/handler.rs b/crates/drainer/src/handler.rs new file mode 100644 index 0000000000..5aa902d84c --- /dev/null +++ b/crates/drainer/src/handler.rs @@ -0,0 +1,277 @@ +use std::sync::{atomic, Arc}; + +use tokio::{ + sync::{mpsc, oneshot}, + time::{self, Duration}, +}; + +use crate::{ + errors, instrument, logger, metrics, query::ExecuteQuery, tracing, utils, DrainerSettings, + Store, StreamData, +}; + +/// Handler handles the spawning and closing of drainer +/// Arc is used to enable creating a listener for graceful shutdown +#[derive(Clone)] +pub struct Handler { + inner: Arc, +} + +impl std::ops::Deref for Handler { + type Target = HandlerInner; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +pub struct HandlerInner { + shutdown_interval: Duration, + loop_interval: Duration, + active_tasks: Arc, + conf: DrainerSettings, + store: Arc, + running: Arc, +} + +impl Handler { + pub fn from_conf(conf: DrainerSettings, store: Arc) -> Self { + let shutdown_interval = Duration::from_millis(conf.shutdown_interval.into()); + let loop_interval = Duration::from_millis(conf.loop_interval.into()); + + let active_tasks = Arc::new(atomic::AtomicU64::new(0)); + + let running = Arc::new(atomic::AtomicBool::new(true)); + + let handler = HandlerInner { + shutdown_interval, + loop_interval, + active_tasks, + conf, + store, + running, + }; + + Self { + inner: Arc::new(handler), + } + } + + pub fn close(&self) { + self.running.store(false, atomic::Ordering::SeqCst); + } + + pub async fn spawn(&self) -> errors::DrainerResult<()> { + let mut stream_index: u8 = 0; + let jobs_picked = Arc::new(atomic::AtomicU8::new(0)); + + while self.running.load(atomic::Ordering::SeqCst) { + metrics::DRAINER_HEALTH.add(&metrics::CONTEXT, 1, &[]); + if self.store.is_stream_available(stream_index).await { + tokio::spawn(drainer_handler( + self.store.clone(), + stream_index, + self.conf.max_read_count, + self.active_tasks.clone(), + jobs_picked.clone(), + )); + } + stream_index = utils::increment_stream_index( + (stream_index, jobs_picked.clone()), + self.store.config.drainer_num_partitions, + ) + .await; + time::sleep(self.loop_interval).await; + } + + Ok(()) + } + + pub(crate) async fn shutdown_listener(&self, mut rx: mpsc::Receiver<()>) { + while let Some(_c) = rx.recv().await { + logger::info!("Awaiting shutdown!"); + metrics::SHUTDOWN_SIGNAL_RECEIVED.add(&metrics::CONTEXT, 1, &[]); + let shutdown_started = tokio::time::Instant::now(); + rx.close(); + + //Check until the active tasks are zero. This does not include the tasks in the stream. + while self.active_tasks.load(atomic::Ordering::SeqCst) != 0 { + time::sleep(self.shutdown_interval).await; + } + logger::info!("Terminating drainer"); + metrics::SUCCESSFUL_SHUTDOWN.add(&metrics::CONTEXT, 1, &[]); + let shutdown_ended = shutdown_started.elapsed().as_secs_f64() * 1000f64; + metrics::CLEANUP_TIME.record(&metrics::CONTEXT, shutdown_ended, &[]); + self.close(); + } + logger::info!( + tasks_remaining = self.active_tasks.load(atomic::Ordering::SeqCst), + "Drainer shutdown successfully" + ) + } + + pub fn spawn_error_handlers(&self, tx: mpsc::Sender<()>) -> errors::DrainerResult<()> { + let (redis_error_tx, redis_error_rx) = oneshot::channel(); + + let redis_conn_clone = self.store.redis_conn.clone(); + + // Spawn a task to monitor if redis is down or not + tokio::spawn(async move { redis_conn_clone.on_error(redis_error_tx).await }); + + //Spawns a task to send shutdown signal if redis goes down + tokio::spawn(redis_error_receiver(redis_error_rx, tx)); + + Ok(()) + } +} + +pub async fn redis_error_receiver(rx: oneshot::Receiver<()>, shutdown_channel: mpsc::Sender<()>) { + match rx.await { + Ok(_) => { + logger::error!("The redis server failed "); + let _ = shutdown_channel.send(()).await.map_err(|err| { + logger::error!("Failed to send signal to the shutdown channel {err}") + }); + } + Err(err) => { + logger::error!("Channel receiver error{err}"); + } + } +} + +#[router_env::instrument(skip_all)] +async fn drainer_handler( + store: Arc, + stream_index: u8, + max_read_count: u64, + active_tasks: Arc, + jobs_picked: Arc, +) -> errors::DrainerResult<()> { + active_tasks.fetch_add(1, atomic::Ordering::Release); + + let stream_name = store.get_drainer_stream_name(stream_index); + + let drainer_result = Box::pin(drainer( + store.clone(), + max_read_count, + stream_name.as_str(), + jobs_picked, + )) + .await; + + if let Err(error) = drainer_result { + logger::error!(?error) + } + + let flag_stream_name = store.get_stream_key_flag(stream_index); + + let output = store.make_stream_available(flag_stream_name.as_str()).await; + active_tasks.fetch_sub(1, atomic::Ordering::Release); + output.map_err(|err| { + logger::error!(operation = "unlock_stream", err=?err); + err + }) +} + +#[instrument(skip_all, fields(global_id, request_id, session_id))] +async fn drainer( + store: Arc, + max_read_count: u64, + stream_name: &str, + jobs_picked: Arc, +) -> errors::DrainerResult<()> { + let stream_read = match store.read_from_stream(stream_name, max_read_count).await { + Ok(result) => { + jobs_picked.fetch_add(1, atomic::Ordering::SeqCst); + result + } + Err(error) => { + if let errors::DrainerError::RedisError(redis_err) = error.current_context() { + if let redis_interface::errors::RedisError::StreamEmptyOrNotAvailable = + redis_err.current_context() + { + metrics::STREAM_EMPTY.add(&metrics::CONTEXT, 1, &[]); + return Ok(()); + } else { + return Err(error); + } + } else { + return Err(error); + } + } + }; + + // parse_stream_entries returns error if no entries is found, handle it + let entries = utils::parse_stream_entries(&stream_read, stream_name)?; + let read_count = entries.len(); + + metrics::JOBS_PICKED_PER_STREAM.add( + &metrics::CONTEXT, + u64::try_from(read_count).unwrap_or(u64::MIN), + &[metrics::KeyValue { + key: "stream".into(), + value: stream_name.to_string().into(), + }], + ); + + let session_id = common_utils::generate_id_with_default_len("drainer_session"); + + let mut last_processed_id = String::new(); + + for (entry_id, entry) in entries.clone() { + let data = match StreamData::from_hashmap(entry) { + Ok(data) => data, + Err(err) => { + logger::error!(operation = "deserialization", err=?err); + metrics::STREAM_PARSE_FAIL.add( + &metrics::CONTEXT, + 1, + &[metrics::KeyValue { + key: "operation".into(), + value: "deserialization".into(), + }], + ); + + // break from the loop in case of a deser error + break; + } + }; + + tracing::Span::current().record("request_id", data.request_id); + tracing::Span::current().record("global_id", data.global_id); + tracing::Span::current().record("session_id", &session_id); + + match data.typed_sql.execute_query(&store, data.pushed_at).await { + Ok(_) => { + last_processed_id = entry_id; + } + Err(err) => match err.current_context() { + // In case of Uniqueviolation we can't really do anything to fix it so just clear + // it from the stream + diesel_models::errors::DatabaseError::UniqueViolation => { + last_processed_id = entry_id; + } + // break from the loop in case of an error in query + _ => break, + }, + } + } + + if !last_processed_id.is_empty() { + let entries_trimmed = store + .trim_from_stream(stream_name, &last_processed_id) + .await?; + if read_count != entries_trimmed { + logger::error!( + read_entries = %read_count, + trimmed_entries = %entries_trimmed, + ?entries, + "Assertion Failed no. of entries read from the stream doesn't match no. of entries trimmed" + ); + } + } else { + logger::error!(read_entries = %read_count,?entries,"No streams were processed in this session"); + } + + Ok(()) +} diff --git a/crates/drainer/src/lib.rs b/crates/drainer/src/lib.rs index 796c9aa695..abb32c8779 100644 --- a/crates/drainer/src/lib.rs +++ b/crates/drainer/src/lib.rs @@ -1,34 +1,30 @@ mod connection; pub mod errors; +mod handler; pub mod logger; pub(crate) mod metrics; +mod query; pub mod services; pub mod settings; +mod stream; +mod types; mod utils; -use std::sync::{atomic, Arc}; +use std::sync::Arc; -use common_utils::{ext_traits::StringExt, signals::get_allowed_signals}; +use common_utils::signals::get_allowed_signals; use diesel_models::kv; use error_stack::{IntoReport, ResultExt}; use router_env::{instrument, tracing}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; -use crate::{connection::pg_connection, services::Store}; +use crate::{ + connection::pg_connection, services::Store, settings::DrainerSettings, types::StreamData, +}; -pub async fn start_drainer( - store: Arc, - number_of_streams: u8, - max_read_count: u64, - shutdown_interval: u32, - loop_interval: u32, -) -> errors::DrainerResult<()> { - let mut stream_index: u8 = 0; - let jobs_picked = Arc::new(atomic::AtomicU8::new(0)); +pub async fn start_drainer(store: Arc, conf: DrainerSettings) -> errors::DrainerResult<()> { + let drainer_handler = handler::Handler::from_conf(conf, store); - let mut shutdown_interval = - tokio::time::interval(std::time::Duration::from_millis(shutdown_interval.into())); - let mut loop_interval = - tokio::time::interval(std::time::Duration::from_millis(loop_interval.into())); + let (tx, rx) = mpsc::channel::<()>(1); let signal = get_allowed_signals() @@ -36,328 +32,20 @@ pub async fn start_drainer( .change_context(errors::DrainerError::SignalError( "Failed while getting allowed signals".to_string(), ))?; - - let (redis_error_tx, redis_error_rx) = oneshot::channel(); - let (tx, mut rx) = mpsc::channel(1); let handle = signal.handle(); let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, tx.clone())); - let redis_conn_clone = store.redis_conn.clone(); + let handler_clone = drainer_handler.clone(); - // Spawn a task to monitor if redis is down or not - tokio::spawn(async move { redis_conn_clone.on_error(redis_error_tx).await }); + tokio::task::spawn(async move { handler_clone.shutdown_listener(rx).await }); - //Spawns a task to send shutdown signal if redis goes down - tokio::spawn(redis_error_receiver(redis_error_rx, tx)); + drainer_handler.spawn_error_handlers(tx)?; + drainer_handler.spawn().await?; - let active_tasks = Arc::new(atomic::AtomicU64::new(0)); - 'event: loop { - metrics::DRAINER_HEALTH.add(&metrics::CONTEXT, 1, &[]); - match rx.try_recv() { - Err(mpsc::error::TryRecvError::Empty) => { - if utils::is_stream_available(stream_index, store.clone()).await { - tokio::spawn(drainer_handler( - store.clone(), - stream_index, - max_read_count, - active_tasks.clone(), - jobs_picked.clone(), - )); - } - stream_index = utils::increment_stream_index( - (stream_index, jobs_picked.clone()), - number_of_streams, - ) - .await; - loop_interval.tick().await; - } - Ok(()) | Err(mpsc::error::TryRecvError::Disconnected) => { - logger::info!("Awaiting shutdown!"); - metrics::SHUTDOWN_SIGNAL_RECEIVED.add(&metrics::CONTEXT, 1, &[]); - let shutdown_started = tokio::time::Instant::now(); - rx.close(); - loop { - if active_tasks.load(atomic::Ordering::Acquire) == 0 { - logger::info!("Terminating drainer"); - metrics::SUCCESSFUL_SHUTDOWN.add(&metrics::CONTEXT, 1, &[]); - let shutdown_ended = shutdown_started.elapsed().as_secs_f64() * 1000f64; - metrics::CLEANUP_TIME.record(&metrics::CONTEXT, shutdown_ended, &[]); - break 'event; - } - shutdown_interval.tick().await; - } - } - } - } handle.close(); - task_handle + let _ = task_handle .await - .into_report() - .change_context(errors::DrainerError::UnexpectedError( - "Failed while joining signal handler".to_string(), - ))?; + .map_err(|err| logger::error!("Failed while joining signal handler: {:?}", err)); Ok(()) } - -pub async fn redis_error_receiver(rx: oneshot::Receiver<()>, shutdown_channel: mpsc::Sender<()>) { - match rx.await { - Ok(_) => { - logger::error!("The redis server failed "); - let _ = shutdown_channel.send(()).await.map_err(|err| { - logger::error!("Failed to send signal to the shutdown channel {err}") - }); - } - Err(err) => { - logger::error!("Channel receiver error{err}"); - } - } -} - -#[router_env::instrument(skip_all)] -async fn drainer_handler( - store: Arc, - stream_index: u8, - max_read_count: u64, - active_tasks: Arc, - jobs_picked: Arc, -) -> errors::DrainerResult<()> { - active_tasks.fetch_add(1, atomic::Ordering::Release); - - let stream_name = utils::get_drainer_stream_name(store.clone(), stream_index); - - let drainer_result = Box::pin(drainer( - store.clone(), - max_read_count, - stream_name.as_str(), - jobs_picked, - )) - .await; - - if let Err(error) = drainer_result { - logger::error!(?error) - } - - let flag_stream_name = utils::get_stream_key_flag(store.clone(), stream_index); - - //TODO: USE THE RESULT FOR LOGGING - let output = - utils::make_stream_available(flag_stream_name.as_str(), store.redis_conn.as_ref()).await; - active_tasks.fetch_sub(1, atomic::Ordering::Release); - output -} - -#[instrument(skip_all, fields(global_id, request_id, session_id))] -async fn drainer( - store: Arc, - max_read_count: u64, - stream_name: &str, - jobs_picked: Arc, -) -> errors::DrainerResult<()> { - let stream_read = - match utils::read_from_stream(stream_name, max_read_count, store.redis_conn.as_ref()).await - { - Ok(result) => { - jobs_picked.fetch_add(1, atomic::Ordering::SeqCst); - result - } - Err(error) => { - if let errors::DrainerError::RedisError(redis_err) = error.current_context() { - if let redis_interface::errors::RedisError::StreamEmptyOrNotAvailable = - redis_err.current_context() - { - metrics::STREAM_EMPTY.add(&metrics::CONTEXT, 1, &[]); - return Ok(()); - } else { - return Err(error); - } - } else { - return Err(error); - } - } - }; - // parse_stream_entries returns error if no entries is found, handle it - let (entries, last_entry_id) = utils::parse_stream_entries(&stream_read, stream_name)?; - let read_count = entries.len(); - - metrics::JOBS_PICKED_PER_STREAM.add( - &metrics::CONTEXT, - u64::try_from(read_count).unwrap_or(u64::MIN), - &[metrics::KeyValue { - key: "stream".into(), - value: stream_name.to_string().into(), - }], - ); - - let session_id = common_utils::generate_id_with_default_len("drainer_session"); - - // TODO: Handle errors when deserialization fails and when DB error occurs - for entry in entries { - let typed_sql = entry.1.get("typed_sql").map_or(String::new(), Clone::clone); - let request_id = entry - .1 - .get("request_id") - .map_or(String::new(), Clone::clone); - let global_id = entry.1.get("global_id").map_or(String::new(), Clone::clone); - let pushed_at = entry.1.get("pushed_at"); - - tracing::Span::current().record("request_id", request_id); - tracing::Span::current().record("global_id", global_id); - tracing::Span::current().record("session_id", &session_id); - - let result = typed_sql.parse_struct("DBOperation"); - - let db_op = match result { - Ok(f) => f, - Err(err) => { - logger::error!(operation= "deserialization",error = %err); - metrics::STREAM_PARSE_FAIL.add(&metrics::CONTEXT, 1, &[]); - continue; - } - }; - - let conn = pg_connection(&store.master_pool).await; - let insert_op = "insert"; - let update_op = "update"; - let payment_intent = "payment_intent"; - let payment_attempt = "payment_attempt"; - let refund = "refund"; - let reverse_lookup = "reverse_lookup"; - let address = "address"; - match db_op { - // TODO: Handle errors - kv::DBOperation::Insert { insertable } => { - let (_, execution_time) = common_utils::date_time::time_it(|| async { - match insertable { - kv::Insertable::PaymentIntent(a) => { - macro_util::handle_resp!( - a.insert(&conn).await, - insert_op, - payment_intent - ) - } - kv::Insertable::PaymentAttempt(a) => { - macro_util::handle_resp!( - a.insert(&conn).await, - insert_op, - payment_attempt - ) - } - kv::Insertable::Refund(a) => { - macro_util::handle_resp!(a.insert(&conn).await, insert_op, refund) - } - kv::Insertable::Address(addr) => { - macro_util::handle_resp!(addr.insert(&conn).await, insert_op, address) - } - kv::Insertable::ReverseLookUp(rev) => { - macro_util::handle_resp!( - rev.insert(&conn).await, - insert_op, - reverse_lookup - ) - } - } - }) - .await; - metrics::QUERY_EXECUTION_TIME.record( - &metrics::CONTEXT, - execution_time, - &[metrics::KeyValue { - key: "operation".into(), - value: insert_op.into(), - }], - ); - utils::push_drainer_delay(pushed_at, insert_op.to_string()); - } - kv::DBOperation::Update { updatable } => { - let (_, execution_time) = common_utils::date_time::time_it(|| async { - match updatable { - kv::Updateable::PaymentIntentUpdate(a) => { - macro_util::handle_resp!( - a.orig.update(&conn, a.update_data).await, - update_op, - payment_intent - ) - } - kv::Updateable::PaymentAttemptUpdate(a) => { - macro_util::handle_resp!( - a.orig.update_with_attempt_id(&conn, a.update_data).await, - update_op, - payment_attempt - ) - } - kv::Updateable::RefundUpdate(a) => { - macro_util::handle_resp!( - a.orig.update(&conn, a.update_data).await, - update_op, - refund - ) - } - kv::Updateable::AddressUpdate(a) => macro_util::handle_resp!( - a.orig.update(&conn, a.update_data).await, - update_op, - address - ), - } - }) - .await; - metrics::QUERY_EXECUTION_TIME.record( - &metrics::CONTEXT, - execution_time, - &[metrics::KeyValue { - key: "operation".into(), - value: update_op.into(), - }], - ); - utils::push_drainer_delay(pushed_at, update_op.to_string()); - } - kv::DBOperation::Delete => { - // [#224]: Implement this - logger::error!("Not implemented!"); - } - }; - } - - let entries_trimmed = - utils::trim_from_stream(stream_name, last_entry_id.as_str(), &store.redis_conn).await?; - - if read_count != entries_trimmed { - logger::error!( - read_entries = %read_count, - trimmed_entries = %entries_trimmed, - ?entries, - "Assertion Failed no. of entries read from the stream doesn't match no. of entries trimmed" - ); - } - - Ok(()) -} - -mod macro_util { - - macro_rules! handle_resp { - ($result:expr,$op_type:expr, $table:expr) => { - match $result { - Ok(inner_result) => { - logger::info!(operation = %$op_type, table = %$table, ?inner_result); - metrics::SUCCESSFUL_QUERY_EXECUTION.add(&metrics::CONTEXT, 1, &[ - metrics::KeyValue { - key: "operation".into(), - value: $table.into(), - } - ]); - } - Err(err) => { - logger::error!(operation = %$op_type, table = %$table, ?err); - metrics::ERRORS_WHILE_QUERY_EXECUTION.add(&metrics::CONTEXT, 1, &[ - metrics::KeyValue { - key: "operation".into(), - value: $table.into(), - } - ]); - } - } - }; - } - pub(crate) use handle_resp; -} diff --git a/crates/drainer/src/main.rs b/crates/drainer/src/main.rs index 9e8b8e275c..34c1294d55 100644 --- a/crates/drainer/src/main.rs +++ b/crates/drainer/src/main.rs @@ -15,11 +15,6 @@ async fn main() -> DrainerResult<()> { let store = services::Store::new(&conf, false).await; let store = std::sync::Arc::new(store); - let number_of_streams = store.config.drainer_num_partitions; - let max_read_count = conf.drainer.max_read_count; - let shutdown_intervals = conf.drainer.shutdown_interval; - let loop_interval = conf.drainer.loop_interval; - #[cfg(feature = "vergen")] println!("Starting drainer (Version: {})", router_env::git_tag!()); @@ -32,14 +27,7 @@ async fn main() -> DrainerResult<()> { logger::debug!(startup_config=?conf); logger::info!("Drainer started [{:?}] [{:?}]", conf.drainer, conf.log); - start_drainer( - store.clone(), - number_of_streams, - max_read_count, - shutdown_intervals, - loop_interval, - ) - .await?; + start_drainer(store.clone(), conf.drainer).await?; Ok(()) } diff --git a/crates/drainer/src/query.rs b/crates/drainer/src/query.rs new file mode 100644 index 0000000000..f79291f3ea --- /dev/null +++ b/crates/drainer/src/query.rs @@ -0,0 +1,72 @@ +use std::sync::Arc; + +use common_utils::errors::CustomResult; +use diesel_models::errors::DatabaseError; + +use crate::{kv, logger, metrics, pg_connection, services::Store}; + +#[async_trait::async_trait] +pub trait ExecuteQuery { + async fn execute_query( + self, + store: &Arc, + pushed_at: i64, + ) -> CustomResult<(), DatabaseError>; +} + +#[async_trait::async_trait] +impl ExecuteQuery for kv::DBOperation { + async fn execute_query( + self, + store: &Arc, + pushed_at: i64, + ) -> CustomResult<(), DatabaseError> { + let conn = pg_connection(&store.master_pool).await; + let operation = self.operation(); + let table = self.table(); + + let tags: &[metrics::KeyValue] = &[ + metrics::KeyValue { + key: "operation".into(), + value: operation.into(), + }, + metrics::KeyValue { + key: "table".into(), + value: table.into(), + }, + ]; + + let (result, execution_time) = + common_utils::date_time::time_it(|| self.execute(&conn)).await; + + push_drainer_delay(pushed_at, operation, table, tags); + metrics::QUERY_EXECUTION_TIME.record(&metrics::CONTEXT, execution_time, tags); + + match result { + Ok(result) => { + logger::info!(operation = operation, table = table, ?result); + metrics::SUCCESSFUL_QUERY_EXECUTION.add(&metrics::CONTEXT, 1, tags); + Ok(()) + } + Err(err) => { + logger::error!(operation = operation, table = table, ?err); + metrics::ERRORS_WHILE_QUERY_EXECUTION.add(&metrics::CONTEXT, 1, tags); + Err(err) + } + } + } +} + +#[inline(always)] +fn push_drainer_delay(pushed_at: i64, operation: &str, table: &str, tags: &[metrics::KeyValue]) { + let drained_at = common_utils::date_time::now_unix_timestamp(); + let delay = drained_at - pushed_at; + + logger::debug!( + operation = operation, + table = table, + delay = format!("{delay} secs") + ); + + metrics::DRAINER_DELAY_SECONDS.record(&metrics::CONTEXT, delay, tags); +} diff --git a/crates/drainer/src/services.rs b/crates/drainer/src/services.rs index 73f66f27db..481fcc0722 100644 --- a/crates/drainer/src/services.rs +++ b/crates/drainer/src/services.rs @@ -34,9 +34,4 @@ impl Store { request_id: None, } } - - pub fn drainer_stream(&self, shard_key: &str) -> String { - // Example: {shard_5}_drainer_stream - format!("{{{}}}_{}", shard_key, self.config.drainer_stream_name,) - } } diff --git a/crates/drainer/src/stream.rs b/crates/drainer/src/stream.rs new file mode 100644 index 0000000000..b2775ac4ba --- /dev/null +++ b/crates/drainer/src/stream.rs @@ -0,0 +1,119 @@ +use std::collections::HashMap; + +use error_stack::IntoReport; +use redis_interface as redis; +use router_env::{logger, tracing}; + +use crate::{errors, metrics, Store}; + +pub type StreamEntries = Vec<(String, HashMap)>; +pub type StreamReadResult = HashMap; + +impl Store { + #[inline(always)] + pub fn drainer_stream(&self, shard_key: &str) -> String { + // Example: {shard_5}_drainer_stream + format!("{{{}}}_{}", shard_key, self.config.drainer_stream_name,) + } + + #[inline(always)] + pub(crate) fn get_stream_key_flag(&self, stream_index: u8) -> String { + format!("{}_in_use", self.get_drainer_stream_name(stream_index)) + } + + #[inline(always)] + pub(crate) fn get_drainer_stream_name(&self, stream_index: u8) -> String { + self.drainer_stream(format!("shard_{stream_index}").as_str()) + } + + #[router_env::instrument(skip_all)] + pub async fn is_stream_available(&self, stream_index: u8) -> bool { + let stream_key_flag = self.get_stream_key_flag(stream_index); + + match self + .redis_conn + .set_key_if_not_exists_with_expiry(stream_key_flag.as_str(), true, None) + .await + { + Ok(resp) => resp == redis::types::SetnxReply::KeySet, + Err(error) => { + logger::error!(operation="lock_stream",err=?error); + false + } + } + } + + pub async fn make_stream_available(&self, stream_name_flag: &str) -> errors::DrainerResult<()> { + match self.redis_conn.delete_key(stream_name_flag).await { + Ok(redis::DelReply::KeyDeleted) => Ok(()), + Ok(redis::DelReply::KeyNotDeleted) => { + logger::error!("Tried to unlock a stream which is already unlocked"); + Ok(()) + } + Err(error) => Err(errors::DrainerError::from(error).into()), + } + } + + pub async fn read_from_stream( + &self, + stream_name: &str, + max_read_count: u64, + ) -> errors::DrainerResult { + // "0-0" id gives first entry + let stream_id = "0-0"; + let (output, execution_time) = common_utils::date_time::time_it(|| async { + self.redis_conn + .stream_read_entries(stream_name, stream_id, Some(max_read_count)) + .await + .map_err(errors::DrainerError::from) + .into_report() + }) + .await; + + metrics::REDIS_STREAM_READ_TIME.record( + &metrics::CONTEXT, + execution_time, + &[metrics::KeyValue::new("stream", stream_name.to_owned())], + ); + + output + } + pub async fn trim_from_stream( + &self, + stream_name: &str, + minimum_entry_id: &str, + ) -> errors::DrainerResult { + let trim_kind = redis::StreamCapKind::MinID; + let trim_type = redis::StreamCapTrim::Exact; + let trim_id = minimum_entry_id; + let (trim_result, execution_time) = + common_utils::date_time::time_it::, _, _>(|| async { + let trim_result = self + .redis_conn + .stream_trim_entries(stream_name, (trim_kind, trim_type, trim_id)) + .await + .map_err(errors::DrainerError::from) + .into_report()?; + + // Since xtrim deletes entries below given id excluding the given id. + // Hence, deleting the minimum entry id + self.redis_conn + .stream_delete_entries(stream_name, minimum_entry_id) + .await + .map_err(errors::DrainerError::from) + .into_report()?; + + Ok(trim_result) + }) + .await; + + metrics::REDIS_STREAM_TRIM_TIME.record( + &metrics::CONTEXT, + execution_time, + &[metrics::KeyValue::new("stream", stream_name.to_owned())], + ); + + // adding 1 because we are deleting the given id too + Ok(trim_result? + 1) + } +} diff --git a/crates/drainer/src/types.rs b/crates/drainer/src/types.rs new file mode 100644 index 0000000000..f1ddf8ef27 --- /dev/null +++ b/crates/drainer/src/types.rs @@ -0,0 +1,36 @@ +use std::collections::HashMap; + +use common_utils::errors; +use error_stack::{IntoReport, ResultExt}; +use serde::{de::value::MapDeserializer, Deserialize, Serialize}; + +use crate::{ + kv, + utils::{deserialize_db_op, deserialize_i64}, +}; + +#[derive(Deserialize, Serialize)] +pub struct StreamData { + pub request_id: String, + pub global_id: String, + #[serde(deserialize_with = "deserialize_db_op")] + pub typed_sql: kv::DBOperation, + #[serde(deserialize_with = "deserialize_i64")] + pub pushed_at: i64, +} + +impl StreamData { + pub fn from_hashmap( + hashmap: HashMap, + ) -> errors::CustomResult { + let iter = MapDeserializer::< + '_, + std::collections::hash_map::IntoIter, + serde_json::error::Error, + >::new(hashmap.into_iter()); + + Self::deserialize(iter) + .into_report() + .change_context(errors::ParsingError::StructParseFailure("StreamData")) + } +} diff --git a/crates/drainer/src/utils.rs b/crates/drainer/src/utils.rs index 5d3bd241d4..e27e04c30e 100644 --- a/crates/drainer/src/utils.rs +++ b/crates/drainer/src/utils.rs @@ -1,125 +1,20 @@ -use std::{ - collections::HashMap, - sync::{atomic, Arc}, -}; +use std::sync::{atomic, Arc}; use error_stack::IntoReport; use redis_interface as redis; +use serde::de::Deserialize; use crate::{ - errors::{self, DrainerError}, - logger, metrics, services, tracing, + errors, kv, metrics, + stream::{StreamEntries, StreamReadResult}, }; -pub type StreamEntries = Vec<(String, HashMap)>; -pub type StreamReadResult = HashMap; - -#[router_env::instrument(skip_all)] -pub async fn is_stream_available(stream_index: u8, store: Arc) -> bool { - let stream_key_flag = get_stream_key_flag(store.clone(), stream_index); - - match store - .redis_conn - .set_key_if_not_exists_with_expiry(stream_key_flag.as_str(), true, None) - .await - { - Ok(resp) => resp == redis::types::SetnxReply::KeySet, - Err(error) => { - logger::error!(?error); - // Add metrics or logs - false - } - } -} - -pub async fn read_from_stream( - stream_name: &str, - max_read_count: u64, - redis: &redis::RedisConnectionPool, -) -> errors::DrainerResult { - // "0-0" id gives first entry - let stream_id = "0-0"; - let (output, execution_time) = common_utils::date_time::time_it(|| async { - redis - .stream_read_entries(stream_name, stream_id, Some(max_read_count)) - .await - .map_err(DrainerError::from) - .into_report() - }) - .await; - - metrics::REDIS_STREAM_READ_TIME.record( - &metrics::CONTEXT, - execution_time, - &[metrics::KeyValue::new("stream", stream_name.to_owned())], - ); - - output -} - -pub async fn trim_from_stream( - stream_name: &str, - minimum_entry_id: &str, - redis: &redis::RedisConnectionPool, -) -> errors::DrainerResult { - let trim_kind = redis::StreamCapKind::MinID; - let trim_type = redis::StreamCapTrim::Exact; - let trim_id = minimum_entry_id; - let (trim_result, execution_time) = - common_utils::date_time::time_it::, _, _>(|| async { - let trim_result = redis - .stream_trim_entries(stream_name, (trim_kind, trim_type, trim_id)) - .await - .map_err(DrainerError::from) - .into_report()?; - - // Since xtrim deletes entries below given id excluding the given id. - // Hence, deleting the minimum entry id - redis - .stream_delete_entries(stream_name, minimum_entry_id) - .await - .map_err(DrainerError::from) - .into_report()?; - - Ok(trim_result) - }) - .await; - - metrics::REDIS_STREAM_TRIM_TIME.record( - &metrics::CONTEXT, - execution_time, - &[metrics::KeyValue::new("stream", stream_name.to_owned())], - ); - - // adding 1 because we are deleting the given id too - Ok(trim_result? + 1) -} - -pub async fn make_stream_available( - stream_name_flag: &str, - redis: &redis::RedisConnectionPool, -) -> errors::DrainerResult<()> { - match redis.delete_key(stream_name_flag).await { - Ok(redis::DelReply::KeyDeleted) => Ok(()), - Ok(redis::DelReply::KeyNotDeleted) => { - logger::error!("Tried to unlock a stream which is already unlocked"); - Ok(()) - } - Err(error) => Err(DrainerError::from(error).into()), - } -} - pub fn parse_stream_entries<'a>( read_result: &'a StreamReadResult, stream_name: &str, -) -> errors::DrainerResult<(&'a StreamEntries, String)> { +) -> errors::DrainerResult<&'a StreamEntries> { read_result .get(stream_name) - .and_then(|entries| { - entries - .last() - .map(|last_entry| (entries, last_entry.0.clone())) - }) .ok_or_else(|| { errors::DrainerError::RedisError(error_stack::report!( redis::errors::RedisError::NotFound @@ -128,27 +23,43 @@ pub fn parse_stream_entries<'a>( .into_report() } -pub fn push_drainer_delay(pushed_at: Option<&String>, operation: String) { - if let Some(pushed_at) = pushed_at { - if let Ok(time) = pushed_at.parse::() { - let drained_at = common_utils::date_time::now_unix_timestamp(); - let delay = drained_at - time; +pub(crate) fn deserialize_i64<'de, D>(deserializer: D) -> Result +where + D: serde::Deserializer<'de>, +{ + let s = serde_json::Value::deserialize(deserializer)?; + match s { + serde_json::Value::String(str_val) => str_val.parse().map_err(serde::de::Error::custom), + serde_json::Value::Number(num_val) => match num_val.as_i64() { + Some(val) => Ok(val), + None => Err(serde::de::Error::custom(format!( + "could not convert {num_val:?} to i64" + ))), + }, + other => Err(serde::de::Error::custom(format!( + "unexpected data format - expected string or number, got: {other:?}" + ))), + } +} - logger::debug!(operation = operation, delay = delay); - metrics::DRAINER_DELAY_SECONDS.record( - &metrics::CONTEXT, - delay, - &[metrics::KeyValue { - key: "operation".into(), - value: operation.into(), - }], - ); +pub(crate) fn deserialize_db_op<'de, D>(deserializer: D) -> Result +where + D: serde::Deserializer<'de>, +{ + let s = serde_json::Value::deserialize(deserializer)?; + match s { + serde_json::Value::String(str_val) => { + serde_json::from_str(&str_val).map_err(serde::de::Error::custom) } + other => Err(serde::de::Error::custom(format!( + "unexpected data format - expected string got: {other:?}" + ))), } } // Here the output is in the format (stream_index, jobs_picked), // similar to the first argument of the function +#[inline(always)] pub async fn increment_stream_index( (index, jobs_picked): (u8, Arc), total_streams: u8, @@ -164,11 +75,3 @@ pub async fn increment_stream_index( index + 1 } } - -pub(crate) fn get_stream_key_flag(store: Arc, stream_index: u8) -> String { - format!("{}_in_use", get_drainer_stream_name(store, stream_index)) -} - -pub(crate) fn get_drainer_stream_name(store: Arc, stream_index: u8) -> String { - store.drainer_stream(format!("shard_{stream_index}").as_str()) -}