From c9276a30d739ad090bd5ceb5853a6be1ffa0ec25 Mon Sep 17 00:00:00 2001 From: Abhishek Date: Fri, 23 Dec 2022 15:11:06 +0530 Subject: [PATCH] refactor(drainer): removed router dependency from drainer (#209) --- Cargo.lock | 16 ++++-- crates/drainer/Cargo.toml | 13 ++++- crates/drainer/build.rs | 3 ++ crates/drainer/src/connection.rs | 34 +++++++++++++ crates/drainer/src/defaults.toml | 26 ++++++++++ crates/drainer/src/errors.rs | 27 ++++++---- crates/drainer/src/lib.rs | 6 ++- crates/drainer/src/main.rs | 9 ++-- crates/drainer/src/services.rs | 34 +++++++++++++ crates/drainer/src/settings.rs | 85 ++++++++++++++++++++++++++++++++ crates/drainer/src/utils.rs | 42 ++++++++-------- 11 files changed, 252 insertions(+), 43 deletions(-) create mode 100644 crates/drainer/build.rs create mode 100644 crates/drainer/src/connection.rs create mode 100644 crates/drainer/src/defaults.toml create mode 100644 crates/drainer/src/services.rs create mode 100644 crates/drainer/src/settings.rs diff --git a/Cargo.lock b/Cargo.lock index 192e6cc349..9b25596a26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1150,10 +1150,16 @@ checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257" name = "drainer" version = "0.1.0" dependencies = [ + "async-bb8-diesel", + "bb8", + "config", + "diesel", "error-stack", "redis_interface", - "router", + "router_env", + "serde", "serde_json", + "serde_path_to_error", "storage_models", "structopt", "thiserror", @@ -2978,18 +2984,18 @@ checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4" [[package]] name = "serde" -version = "1.0.149" +version = "1.0.151" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "256b9932320c590e707b94576e3cc1f7c9024d0ee6612dfbcf1cb106cbe8e055" +checksum = "97fed41fc1a24994d044e6db6935e69511a1153b52c15eb42493b26fa87feba0" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.149" +version = "1.0.151" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4eae9b04cbffdfd550eb462ed33bc6a1b68c935127d008b27444d08380f94e4" +checksum = "255abe9a125a985c05190d687b320c12f9b1f0b99445e608c21ba0782c719ad8" dependencies = [ "proc-macro2", "quote", diff --git a/crates/drainer/Cargo.toml b/crates/drainer/Cargo.toml index b83f8c0bc7..825894e2af 100644 --- a/crates/drainer/Cargo.toml +++ b/crates/drainer/Cargo.toml @@ -8,13 +8,22 @@ readme = "README.md" license = "Apache-2.0" [dependencies] +async-bb8-diesel = { git = "https://github.com/juspay/async-bb8-diesel", rev = "9a71d142726dbc33f41c1fd935ddaa79841c7be5" } +bb8 = "0.8" +config = { version = "0.13.3", features = ["toml"] } +diesel = { version = "2.0.2", features = ["postgres", "serde_json", "time"] } error-stack = "0.2.4" +serde = "1.0.151" serde_json = "1.0.89" +serde_path_to_error = "0.1.8" structopt = "0.3.26" thiserror = "1.0.37" tokio = { version = "1.21.2", features = ["macros", "rt-multi-thread"] } # First Party Crates redis_interface = { version = "0.1.0", path = "../redis_interface" } -router = { version = "0.2.0", path = "../router", features = ["kv_store"] } -storage_models = { version = "0.1.0", path = "../storage_models", features = ["kv_store"] } \ No newline at end of file +router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] } +storage_models = { version = "0.1.0", path = "../storage_models", features = ["kv_store"] } + +[build-dependencies] +router_env = { version = "0.1.0", path = "../router_env", default-features = false, features = ["vergen"] } \ No newline at end of file diff --git a/crates/drainer/build.rs b/crates/drainer/build.rs new file mode 100644 index 0000000000..4b07385af6 --- /dev/null +++ b/crates/drainer/build.rs @@ -0,0 +1,3 @@ +fn main() { + router_env::vergen::generate_cargo_instructions(); +} diff --git a/crates/drainer/src/connection.rs b/crates/drainer/src/connection.rs new file mode 100644 index 0000000000..209c7e01c1 --- /dev/null +++ b/crates/drainer/src/connection.rs @@ -0,0 +1,34 @@ +use async_bb8_diesel::ConnectionManager; +use bb8::PooledConnection; +use diesel::PgConnection; + +use crate::settings::Database; + +pub type PgPool = bb8::Pool>; + +pub async fn redis_connection( + conf: &crate::settings::Settings, +) -> redis_interface::RedisConnectionPool { + redis_interface::RedisConnectionPool::new(&conf.redis).await +} + +#[allow(clippy::expect_used)] +pub async fn diesel_make_pg_pool(database: &Database, _test_transaction: bool) -> PgPool { + let database_url = format!( + "postgres://{}:{}@{}:{}/{}", + database.username, database.password, database.host, database.port, database.dbname + ); + let manager = async_bb8_diesel::ConnectionManager::::new(database_url); + let pool = bb8::Pool::builder().max_size(database.pool_size); + + pool.build(manager) + .await + .expect("Failed to create PostgreSQL connection pool") +} + +#[allow(clippy::expect_used)] +pub async fn pg_connection(pool: &PgPool) -> PooledConnection> { + pool.get() + .await + .expect("Couldn't retrieve PostgreSQL connection") +} diff --git a/crates/drainer/src/defaults.toml b/crates/drainer/src/defaults.toml new file mode 100644 index 0000000000..4a4a9de2b7 --- /dev/null +++ b/crates/drainer/src/defaults.toml @@ -0,0 +1,26 @@ +# log defaults in /crates/router_env/src/defaults.toml + +[master_database] +username = "none" +password = "none" +host = "localhost" +port = 5432 +dbname = "none" +pool_size = 5 + +[redis] +host = "127.0.0.1" +port = 6379 +pool_size = 5 +reconnect_max_attempts = 5 +reconnect_delay = 5 +default_ttl = 300 +stream_read_count = 1 +cluster_enabled = false +use_legacy_version = false +cluster_urls = [] + +[drainer] +stream_name = "DRAINER_STREAM" +num_partitions = 64 +max_read_count = 100 diff --git a/crates/drainer/src/errors.rs b/crates/drainer/src/errors.rs index 463f6de342..50d5876886 100644 --- a/crates/drainer/src/errors.rs +++ b/crates/drainer/src/errors.rs @@ -1,19 +1,26 @@ +use redis_interface as redis; use thiserror::Error; #[derive(Debug, Error)] pub enum DrainerError { #[error("Error in parsing config : {0}")] ConfigParsingError(String), - #[error("Error fetching stream length for stream : {0}")] - StreamGetLengthError(String), - #[error("Error reading from stream : {0}")] - StreamReadError(String), - #[error("Error triming from stream: {0}")] - StreamTrimFailed(String), - #[error("No entries found for stream: {0}")] - NoStreamEntry(String), - #[error("Error in making stream: {0} available")] - DeleteKeyFailed(String), + #[error("Error during redis operation : {0}")] + RedisError(error_stack::Report), + #[error("Application configuration error: {0}")] + ConfigurationError(config::ConfigError), } pub type DrainerResult = error_stack::Result; + +impl From for DrainerError { + fn from(err: config::ConfigError) -> Self { + Self::ConfigurationError(err) + } +} + +impl From> for DrainerError { + fn from(err: error_stack::Report) -> Self { + Self::RedisError(err) + } +} diff --git a/crates/drainer/src/lib.rs b/crates/drainer/src/lib.rs index 1f83444ea9..a11b374d9b 100644 --- a/crates/drainer/src/lib.rs +++ b/crates/drainer/src/lib.rs @@ -1,10 +1,14 @@ +mod connection; pub mod errors; +pub mod services; +pub mod settings; mod utils; use std::sync::Arc; -use router::{connection::pg_connection, services::Store}; use storage_models::kv; +use crate::{connection::pg_connection, services::Store}; + pub async fn start_drainer( store: Arc, number_of_streams: u8, diff --git a/crates/drainer/src/main.rs b/crates/drainer/src/main.rs index 322a615011..739d7c6448 100644 --- a/crates/drainer/src/main.rs +++ b/crates/drainer/src/main.rs @@ -1,5 +1,4 @@ -use drainer::{errors::DrainerResult, start_drainer}; -use router::configs::settings; +use drainer::{errors::DrainerResult, services, settings, start_drainer}; use structopt::StructOpt; #[tokio::main] @@ -8,13 +7,13 @@ async fn main() -> DrainerResult<()> { let cmd_line = settings::CmdLineConf::from_args(); let conf = settings::Settings::with_config_path(cmd_line.config_path).unwrap(); - let store = router::services::Store::new(&conf, false).await; + let store = services::Store::new(&conf, false).await; let store = std::sync::Arc::new(store); - let number_of_drainers = conf.drainer.num_partitions; + let number_of_streams = store.config.drainer_num_partitions; let max_read_count = conf.drainer.max_read_count; - start_drainer(store, number_of_drainers, max_read_count).await?; + start_drainer(store, number_of_streams, max_read_count).await?; Ok(()) } diff --git a/crates/drainer/src/services.rs b/crates/drainer/src/services.rs new file mode 100644 index 0000000000..dd86010716 --- /dev/null +++ b/crates/drainer/src/services.rs @@ -0,0 +1,34 @@ +use std::sync::Arc; + +use crate::connection::{diesel_make_pg_pool, PgPool}; + +#[derive(Clone)] +pub struct Store { + pub master_pool: PgPool, + pub redis_conn: Arc, + pub config: StoreConfig, +} + +#[derive(Clone)] +pub struct StoreConfig { + pub drainer_stream_name: String, + pub drainer_num_partitions: u8, +} + +impl Store { + pub async fn new(config: &crate::settings::Settings, test_transaction: bool) -> Self { + Self { + master_pool: diesel_make_pg_pool(&config.master_database, test_transaction).await, + redis_conn: Arc::new(crate::connection::redis_connection(config).await), + config: StoreConfig { + drainer_stream_name: config.drainer.stream_name.clone(), + drainer_num_partitions: config.drainer.num_partitions, + }, + } + } + + pub fn drainer_stream(&self, shard_key: &str) -> String { + // Example: {shard_5}_drainer_stream + format!("{{{}}}_{}", shard_key, self.config.drainer_stream_name,) + } +} diff --git a/crates/drainer/src/settings.rs b/crates/drainer/src/settings.rs new file mode 100644 index 0000000000..f9e95791c4 --- /dev/null +++ b/crates/drainer/src/settings.rs @@ -0,0 +1,85 @@ +use std::path::PathBuf; + +use config::{Environment, File, FileFormat}; +use redis_interface as redis; +pub use router_env::config::{Log, LogConsole, LogFile, LogTelemetry}; +use router_env::{env, logger}; +use serde::Deserialize; +use structopt::StructOpt; + +use crate::errors; + +#[derive(StructOpt, Default)] +#[structopt(version = router_env::version!())] +pub struct CmdLineConf { + /// Config file. + /// Application will look for "config/config.toml" if this option isn't specified. + #[structopt(short = "f", long, parse(from_os_str))] + pub config_path: Option, +} + +#[derive(Debug, Deserialize, Clone)] +pub struct Settings { + pub master_database: Database, + pub redis: redis::RedisSettings, + pub log: Log, + pub drainer: DrainerSettings, +} + +#[derive(Debug, Deserialize, Clone)] +pub struct Database { + pub username: String, + pub password: String, + pub host: String, + pub port: u16, + pub dbname: String, + pub pool_size: u32, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct DrainerSettings { + pub stream_name: String, + pub num_partitions: u8, + pub max_read_count: u64, +} + +impl Settings { + pub fn new() -> Result { + Self::with_config_path(None) + } + + pub fn with_config_path(config_path: Option) -> Result { + let environment = env::which(); + let config_path = router_env::Config::config_path(&environment.to_string(), config_path); + + // println!("config_path : {:?}", config_path); + // println!("current_dir : {:?}", std::env::current_dir()); + + let config = router_env::Config::builder(&environment.to_string())? + // FIXME: consider embedding of textual file into bin files has several disadvantages + // 1. larger bin file + // 2. slower initialization of program + // 3. too late ( run-time ) information about broken toml file + // Consider embedding all defaults into code. + // Example: https://github.com/instrumentisto/medea/blob/medea-0.2.0/src/conf/mod.rs#L60-L102 + .add_source(File::from_str( + include_str!("defaults.toml"), + FileFormat::Toml, + )) + .add_source(File::from(config_path).required(true)) + .add_source( + Environment::with_prefix("DRAINER") + .try_parsing(true) + .separator("__") + .list_separator(",") + .with_list_parse_key("redis.cluster_urls"), + ) + .build()?; + + serde_path_to_error::deserialize(config).map_err(|error| { + logger::error!(%error, "Unable to deserialize application configuration"); + eprintln!("Unable to deserialize application configuration: {error}"); + errors::DrainerError::from(error.into_inner()) + }) + } +} diff --git a/crates/drainer/src/utils.rs b/crates/drainer/src/utils.rs index 33c304eae0..b0df588480 100644 --- a/crates/drainer/src/utils.rs +++ b/crates/drainer/src/utils.rs @@ -1,15 +1,17 @@ use std::{collections::HashMap, sync::Arc}; -use error_stack::{IntoReport, ResultExt}; +use error_stack::IntoReport; use redis_interface as redis; -use router::services::Store; -use crate::errors; +use crate::{ + errors::{self, DrainerError}, + services, +}; pub type StreamEntries = Vec<(String, HashMap)>; pub type StreamReadResult = HashMap; -pub async fn is_stream_available(stream_index: u8, store: Arc) -> bool { +pub async fn is_stream_available(stream_index: u8, store: Arc) -> bool { let stream_key_flag = get_stream_key_flag(store.clone(), stream_index); match store @@ -35,9 +37,8 @@ pub async fn read_from_stream( let entries = redis .stream_read_entries(stream_name, stream_id, Some(max_read_count)) .await - .change_context(errors::DrainerError::StreamReadError( - stream_name.to_owned(), - ))?; + .map_err(DrainerError::from) + .into_report()?; Ok(entries) } @@ -52,18 +53,16 @@ pub async fn trim_from_stream( let trim_result = redis .stream_trim_entries(stream_name, (trim_kind, trim_type, trim_id)) .await - .change_context(errors::DrainerError::StreamTrimFailed( - stream_name.to_owned(), - ))?; + .map_err(DrainerError::from) + .into_report()?; // Since xtrim deletes entires below given id excluding the given id. // Hence, deleting the minimum entry id redis .stream_delete_entries(stream_name, minimum_entry_id) .await - .change_context(errors::DrainerError::StreamTrimFailed( - stream_name.to_owned(), - ))?; + .map_err(DrainerError::from) + .into_report()?; // adding 1 because we are deleting the given id too Ok(trim_result + 1) @@ -76,9 +75,8 @@ pub async fn make_stream_available( redis .delete_key(stream_name_flag) .await - .change_context(errors::DrainerError::DeleteKeyFailed( - stream_name_flag.to_owned(), - )) + .map_err(DrainerError::from) + .into_report() } pub fn parse_stream_entries<'a>( @@ -92,7 +90,11 @@ pub fn parse_stream_entries<'a>( .last() .map(|last_entry| (entries, last_entry.0.clone())) }) - .ok_or_else(|| errors::DrainerError::NoStreamEntry(stream_name.to_owned())) + .ok_or_else(|| { + errors::DrainerError::RedisError(error_stack::report!( + redis::errors::RedisError::NotFound + )) + }) .into_report() } @@ -104,10 +106,10 @@ pub fn increment_stream_index(index: u8, total_streams: u8) -> u8 { } } -pub(crate) fn get_stream_key_flag(store: Arc, stream_index: u8) -> String { +pub(crate) fn get_stream_key_flag(store: Arc, stream_index: u8) -> String { format!("{}_in_use", get_drainer_stream_name(store, stream_index)) } -pub(crate) fn get_drainer_stream_name(store: Arc, stream_index: u8) -> String { - store.get_drainer_stream_name(format!("shard_{}", stream_index).as_str()) +pub(crate) fn get_drainer_stream_name(store: Arc, stream_index: u8) -> String { + store.drainer_stream(format!("shard_{}", stream_index).as_str()) }