diff --git a/Cargo.lock b/Cargo.lock index 31b4934e31..803f744a16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -182,7 +182,7 @@ dependencies = [ "futures-util", "mio", "num_cpus", - "socket2", + "socket2 0.4.9", "tokio", "tracing", ] @@ -263,8 +263,8 @@ dependencies = [ "serde_json", "serde_urlencoded", "smallvec", - "socket2", - "time 0.3.23", + "socket2 0.4.9", + "time 0.3.22", "url", ] @@ -337,9 +337,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41" +checksum = "86b8f9420f797f2d9e935edf629310eb938a0d839f984e25327f3c7eed22300c" dependencies = [ "memchr", ] @@ -402,7 +402,7 @@ dependencies = [ "serde", "serde_json", "strum 0.24.1", - "time 0.3.23", + "time 0.3.22", "url", "utoipa", ] @@ -493,15 +493,15 @@ dependencies = [ "polling", "rustix 0.37.23", "slab", - "socket2", + "socket2 0.4.9", "waker-fn", ] [[package]] name = "async-lock" -version = "2.7.0" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" dependencies = [ "event-listener", ] @@ -614,7 +614,7 @@ dependencies = [ "http", "hyper", "ring", - "time 0.3.23", + "time 0.3.22", "tokio", "tower", "tracing", @@ -834,7 +834,7 @@ dependencies = [ "percent-encoding", "regex", "sha2", - "time 0.3.23", + "time 0.3.22", "tracing", ] @@ -974,7 +974,7 @@ dependencies = [ "itoa", "num-integer", "ryu", - "time 0.3.23", + "time 0.3.22", ] [[package]] @@ -1246,7 +1246,7 @@ dependencies = [ "serde", "serde_json", "thiserror", - "time 0.3.23", + "time 0.3.22", ] [[package]] @@ -1389,7 +1389,7 @@ dependencies = [ "serde", "serde_json", "strum 0.25.0", - "time 0.3.23", + "time 0.3.22", "utoipa", ] @@ -1422,7 +1422,7 @@ dependencies = [ "signal-hook-tokio", "test-case", "thiserror", - "time 0.3.23", + "time 0.3.22", "tokio", ] @@ -1473,7 +1473,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e859cd57d0710d9e06c381b550c06e76992472a8c6d527aecd2fc673dcc231fb" dependencies = [ "percent-encoding", - "time 0.3.23", + "time 0.3.22", "version_check", ] @@ -1734,7 +1734,7 @@ dependencies = [ "pq-sys", "r2d2", "serde_json", - "time 0.3.23", + "time 0.3.22", ] [[package]] @@ -1767,7 +1767,7 @@ dependencies = [ "serde_json", "strum 0.24.1", "thiserror", - "time 0.3.23", + "time 0.3.22", ] [[package]] @@ -1980,7 +1980,7 @@ dependencies = [ "mime", "serde", "serde_json", - "time 0.3.23", + "time 0.3.22", "tokio", "url", "webdriver", @@ -2470,7 +2470,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.4.9", "tokio", "tower-service", "tracing", @@ -2677,7 +2677,7 @@ dependencies = [ "serde", "serde_json", "thiserror", - "time 0.3.23", + "time 0.3.22", ] [[package]] @@ -3476,9 +3476,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c516611246607d0c04186886dbb3a754368ef82c79e9827a802c6d836dd111c" +checksum = "12cc1b0bf1727a77a54b6654e7b5f1af8604923edc8b81885f8ec92f9e3f0a05" [[package]] name = "pin-utils" @@ -4061,7 +4061,7 @@ dependencies = [ "test_utils", "thirtyfour", "thiserror", - "time 0.3.23", + "time 0.3.22", "tokio", "toml 0.7.6", "url", @@ -4101,7 +4101,7 @@ dependencies = [ "serde_json", "serde_path_to_error", "strum 0.24.1", - "time 0.3.23", + "time 0.3.22", "tokio", "tracing", "tracing-actix-web", @@ -4471,7 +4471,7 @@ dependencies = [ "serde", "serde_json", "serde_with_macros", - "time 0.3.23", + "time 0.3.22", ] [[package]] @@ -4602,7 +4602,7 @@ dependencies = [ "num-bigint", "num-traits", "thiserror", - "time 0.3.23", + "time 0.3.22", ] [[package]] @@ -4645,6 +4645,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "socket2" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877" +dependencies = [ + "libc", + "windows-sys", +] + [[package]] name = "spin" version = "0.5.2" @@ -4658,8 +4668,13 @@ dependencies = [ "async-bb8-diesel", "async-trait", "bb8", + "crc32fast", "diesel", + "diesel_models", + "error-stack", "masking", + "redis_interface", + "tokio", ] [[package]] @@ -4839,7 +4854,7 @@ dependencies = [ "serde_urlencoded", "serial_test", "thirtyfour", - "time 0.3.23", + "time 0.3.22", "tokio", "toml 0.7.6", "uuid", @@ -4937,9 +4952,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.23" +version = "0.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59e399c068f43a5d116fedaf73b203fa4f9c519f17e2b34f63221d3792f81446" +checksum = "ea9e1b3cf1243ae005d9e74085d4d542f3125458f3a81af210d901dcd7411efd" dependencies = [ "itoa", "libc", @@ -4957,9 +4972,9 @@ checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" [[package]] name = "time-macros" -version = "0.2.10" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96ba15a897f3c86766b757e5ac7221554c6750054d74d5b28844fce5fb36a6c4" +checksum = "372950940a5f07bf38dbe211d7283c9e6d7327df53794992d293e534c733d09b" dependencies = [ "time-core", ] @@ -4981,11 +4996,10 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.29.1" +version = "1.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" +checksum = "2d3ce25f50619af8b0aec2eb23deebe84249e19e2ddd393a6e16e3300a6dadfd" dependencies = [ - "autocfg", "backtrace", "bytes", "libc", @@ -4994,7 +5008,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.3", "tokio-macros", "windows-sys", ] @@ -5207,7 +5221,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e" dependencies = [ "crossbeam-channel", - "time 0.3.23", + "time 0.3.22", "tracing-subscriber", ] @@ -5472,7 +5486,7 @@ dependencies = [ "git2", "rustc_version", "rustversion", - "time 0.3.23", + "time 0.3.22", ] [[package]] @@ -5629,7 +5643,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "time 0.3.23", + "time 0.3.22", "unicode-segmentation", "url", ] diff --git a/crates/router/src/connection.rs b/crates/router/src/connection.rs index 8b07d7bb5c..bf435c3067 100644 --- a/crates/router/src/connection.rs +++ b/crates/router/src/connection.rs @@ -1,37 +1,13 @@ -use async_bb8_diesel::{AsyncConnection, ConnectionError}; -use bb8::{CustomizeConnection, PooledConnection}; +use bb8::PooledConnection; use diesel::PgConnection; use error_stack::{IntoReport, ResultExt}; -#[cfg(feature = "kms")] -use external_services::kms; -#[cfg(feature = "kms")] -use external_services::kms::decrypt::KmsDecrypt; -#[cfg(not(feature = "kms"))] -use masking::PeekInterface; -use crate::{configs::settings::Database, errors}; +use crate::errors; pub type PgPool = bb8::Pool>; pub type PgPooledConn = async_bb8_diesel::Connection; -#[derive(Debug)] -struct TestTransaction; - -#[async_trait::async_trait] -impl CustomizeConnection for TestTransaction { - #[allow(clippy::unwrap_used)] - async fn on_acquire(&self, conn: &mut PgPooledConn) -> Result<(), ConnectionError> { - use diesel::Connection; - - conn.run(|conn| { - conn.begin_test_transaction().unwrap(); - Ok(()) - }) - .await - } -} - #[allow(clippy::expect_used)] pub async fn redis_connection( conf: &crate::configs::settings::Settings, @@ -41,41 +17,6 @@ pub async fn redis_connection( .expect("Failed to create Redis Connection Pool") } -#[allow(clippy::expect_used)] -pub async fn diesel_make_pg_pool( - database: &Database, - test_transaction: bool, - #[cfg(feature = "kms")] kms_client: &kms::KmsClient, -) -> PgPool { - #[cfg(feature = "kms")] - let password = database - .password - .clone() - .decrypt_inner(kms_client) - .await - .expect("Failed to KMS decrypt database password"); - - #[cfg(not(feature = "kms"))] - let password = &database.password.peek(); - - let database_url = format!( - "postgres://{}:{}@{}:{}/{}", - database.username, password, database.host, database.port, database.dbname - ); - let manager = async_bb8_diesel::ConnectionManager::::new(database_url); - let mut pool = bb8::Pool::builder() - .max_size(database.pool_size) - .connection_timeout(std::time::Duration::from_secs(database.connection_timeout)); - - if test_transaction { - pool = pool.connection_customizer(Box::new(TestTransaction)); - } - - pool.build(manager) - .await - .expect("Failed to create PostgreSQL connection pool") -} - pub async fn pg_connection_read( store: &crate::services::Store, ) -> errors::CustomResult< diff --git a/crates/router/src/services.rs b/crates/router/src/services.rs index 46d8c51b09..8d5bb95212 100644 --- a/crates/router/src/services.rs +++ b/crates/router/src/services.rs @@ -11,7 +11,7 @@ use external_services::kms::{self, decrypt::KmsDecrypt}; #[cfg(not(feature = "kms"))] use masking::PeekInterface; use redis_interface::{errors as redis_errors, PubsubInterface, RedisValue}; -use storage_impl::diesel as diesel_impl; +use storage_impl::{diesel as diesel_impl, DatabaseStore}; use tokio::sync::oneshot; pub use self::{api::*, encryption::*}; @@ -184,9 +184,9 @@ impl Store { #[cfg(not(feature = "olap"))] diesel_store: diesel_impl::store::Store::new( #[cfg(not(feature = "kms"))] - &config.master_database.clone().into(), + config.master_database.clone().into(), #[cfg(feature = "kms")] - &config + config .master_database .clone() .decrypt_inner(kms_client) @@ -197,24 +197,26 @@ impl Store { .await, #[cfg(feature = "olap")] diesel_store: diesel_impl::store::ReplicaStore::new( - #[cfg(not(feature = "kms"))] - &config.master_database.clone().into(), - #[cfg(feature = "kms")] - &config - .master_database - .clone() - .decrypt_inner(kms_client) - .await - .expect("Failed to decrypt master database"), - #[cfg(not(feature = "kms"))] - &config.replica_database.clone().into(), - #[cfg(feature = "kms")] - &config - .replica_database - .clone() - .decrypt_inner(kms_client) - .await - .expect("Failed to decrypt replica database"), + ( + #[cfg(not(feature = "kms"))] + config.master_database.clone().into(), + #[cfg(feature = "kms")] + config + .master_database + .clone() + .decrypt_inner(kms_client) + .await + .expect("Failed to decrypt master database"), + #[cfg(not(feature = "kms"))] + config.replica_database.clone().into(), + #[cfg(feature = "kms")] + config + .replica_database + .clone() + .decrypt_inner(kms_client) + .await + .expect("Failed to decrypt replica database"), + ), test_transaction, ) .await, diff --git a/crates/router_derive/src/macros/generate_schema.rs b/crates/router_derive/src/macros/generate_schema.rs index ecf9aa431e..2669106cec 100644 --- a/crates/router_derive/src/macros/generate_schema.rs +++ b/crates/router_derive/src/macros/generate_schema.rs @@ -56,7 +56,7 @@ pub fn polymorphic_macro_derive_inner( all_fields .entry(field_without_attributes.to_owned()) - .or_insert(vec![]) + .or_default() .push(attribute.to_owned().to_owned()); }); diff --git a/crates/storage_impl/Cargo.toml b/crates/storage_impl/Cargo.toml index 06fdd25dc8..bf0068c25d 100644 --- a/crates/storage_impl/Cargo.toml +++ b/crates/storage_impl/Cargo.toml @@ -12,9 +12,14 @@ license.workspace = true [dependencies] # First Party dependencies masking = { version = "0.1.0", path = "../masking" } +redis_interface = { version = "0.1.0", path = "../redis_interface" } +diesel_models = { version = "0.1.0", path = "../diesel_models" } # Third party crates bb8 = "0.8.1" diesel = { version = "2.1.0", default-features = false, features = ["postgres"] } async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "be3d9bce50051d8c0e0c06078e8066cc27db3001" } async-trait = "0.1.72" +crc32fast = "1.3.2" +error-stack = "0.3.1" +tokio = { version = "1.28.2", features = ["rt-multi-thread"] } \ No newline at end of file diff --git a/crates/storage_impl/src/diesel/store.rs b/crates/storage_impl/src/diesel/store.rs index 821d1e6715..90e40373e1 100644 --- a/crates/storage_impl/src/diesel/store.rs +++ b/crates/storage_impl/src/diesel/store.rs @@ -8,17 +8,35 @@ use crate::config::Database; pub type PgPool = bb8::Pool>; pub type PgPooledConn = async_bb8_diesel::Connection; +#[async_trait::async_trait] +pub trait DatabaseStore { + type Config; + async fn new(config: Self::Config, test_transaction: bool) -> Self; + fn get_write_pool(&self) -> PgPool; + fn get_read_pool(&self) -> PgPool; +} + #[derive(Clone)] pub struct Store { pub master_pool: PgPool, } -impl Store { - pub async fn new(config: &Database, test_transaction: bool) -> Self { +#[async_trait::async_trait] +impl DatabaseStore for Store { + type Config = Database; + async fn new(config: Database, test_transaction: bool) -> Self { Self { - master_pool: diesel_make_pg_pool(config, test_transaction).await, + master_pool: diesel_make_pg_pool(&config, test_transaction).await, } } + + fn get_write_pool(&self) -> PgPool { + self.master_pool.clone() + } + + fn get_read_pool(&self) -> PgPool { + self.master_pool.clone() + } } #[derive(Clone)] @@ -27,19 +45,26 @@ pub struct ReplicaStore { pub replica_pool: PgPool, } -impl ReplicaStore { - pub async fn new( - master_config: &Database, - replica_config: &Database, - test_transaction: bool, - ) -> Self { - let master_pool = diesel_make_pg_pool(master_config, test_transaction).await; - let replica_pool = diesel_make_pg_pool(replica_config, test_transaction).await; +#[async_trait::async_trait] +impl DatabaseStore for ReplicaStore { + type Config = (Database, Database); + async fn new(config: (Database, Database), test_transaction: bool) -> Self { + let (master_config, replica_config) = config; + let master_pool = diesel_make_pg_pool(&master_config, test_transaction).await; + let replica_pool = diesel_make_pg_pool(&replica_config, test_transaction).await; Self { master_pool, replica_pool, } } + + fn get_write_pool(&self) -> PgPool { + self.master_pool.clone() + } + + fn get_read_pool(&self) -> PgPool { + self.replica_pool.clone() + } } #[allow(clippy::expect_used)] diff --git a/crates/storage_impl/src/lib.rs b/crates/storage_impl/src/lib.rs index 55e9d3d186..f24f184a70 100644 --- a/crates/storage_impl/src/lib.rs +++ b/crates/storage_impl/src/lib.rs @@ -1,2 +1,109 @@ +use error_stack::ResultExt; +use masking::StrongSecret; +use redis::CacheStore; pub mod config; pub mod diesel; +pub mod redis; + +pub use crate::diesel::store::DatabaseStore; + +#[allow(dead_code)] +pub struct RouterStore { + db_store: T, + cache_store: CacheStore, + master_encryption_key: StrongSecret>, +} + +impl RouterStore { + pub async fn new( + db_conf: T::Config, + cache_conf: &redis_interface::RedisSettings, + encryption_key: StrongSecret>, + cache_error_signal: tokio::sync::oneshot::Sender<()>, + inmemory_cache_stream: &str, + ) -> Self { + // TODO: create an error enum and return proper error here + let db_store = T::new(db_conf, false).await; + #[allow(clippy::expect_used)] + let cache_store = CacheStore::new(cache_conf) + .await + .expect("Failed to create cache store"); + cache_store.set_error_callback(cache_error_signal); + #[allow(clippy::expect_used)] + cache_store + .subscribe_to_channel(inmemory_cache_stream) + .await + .expect("Failed to subscribe to inmemory cache stream"); + Self { + db_store, + cache_store, + master_encryption_key: encryption_key, + } + } + pub async fn test_store( + db_conf: T::Config, + cache_conf: &redis_interface::RedisSettings, + encryption_key: StrongSecret>, + ) -> Self { + // TODO: create an error enum and return proper error here + let db_store = T::new(db_conf, true).await; + #[allow(clippy::expect_used)] + let cache_store = CacheStore::new(cache_conf) + .await + .expect("Failed to create cache store"); + Self { + db_store, + cache_store, + master_encryption_key: encryption_key, + } + } +} + +pub struct KVRouterStore { + router_store: RouterStore, + drainer_stream_name: String, + drainer_num_partitions: u8, +} + +impl KVRouterStore { + pub fn from_store( + store: RouterStore, + drainer_stream_name: String, + drainer_num_partitions: u8, + ) -> Self { + Self { + router_store: store, + drainer_stream_name, + drainer_num_partitions, + } + } + + pub fn get_drainer_stream_name(&self, shard_key: &str) -> String { + format!("{{{}}}_{}", shard_key, self.drainer_stream_name) + } + + #[allow(dead_code)] + async fn push_to_drainer_stream( + &self, + redis_entry: diesel_models::kv::TypedSql, + partition_key: redis::kv_store::PartitionKey<'_>, + ) -> error_stack::Result<(), redis_interface::errors::RedisError> + where + R: crate::redis::kv_store::KvStorePartition, + { + let shard_key = R::shard_key(partition_key, self.drainer_num_partitions); + let stream_name = self.get_drainer_stream_name(&shard_key); + self.router_store + .cache_store + .redis_conn + .stream_append_entry( + &stream_name, + &redis_interface::RedisEntryId::AutoGeneratedID, + redis_entry + .to_field_value_pairs() + .change_context(redis_interface::errors::RedisError::JsonSerializationFailed)?, + ) + .await + .change_context(redis_interface::errors::RedisError::StreamAppendFailed) + } +} diff --git a/crates/storage_impl/src/redis.rs b/crates/storage_impl/src/redis.rs index e69de29bb2..c60926596b 100644 --- a/crates/storage_impl/src/redis.rs +++ b/crates/storage_impl/src/redis.rs @@ -0,0 +1,51 @@ +pub mod kv_store; + +use std::sync::Arc; + +use error_stack::{IntoReport, ResultExt}; +use redis_interface::PubsubInterface; + +pub struct CacheStore { + // Maybe expose the redis_conn via traits instead of the making the field public + pub(crate) redis_conn: Arc, +} + +impl CacheStore { + pub async fn new( + conf: &redis_interface::RedisSettings, + ) -> error_stack::Result { + Ok(Self { + redis_conn: Arc::new(redis_interface::RedisConnectionPool::new(conf).await?), + }) + } + + pub fn set_error_callback(&self, callback: tokio::sync::oneshot::Sender<()>) { + let redis_clone = self.redis_conn.clone(); + tokio::spawn(async move { + redis_clone.on_error(callback).await; + }); + } + + pub async fn subscribe_to_channel( + &self, + channel: &str, + ) -> error_stack::Result<(), redis_interface::errors::RedisError> { + self.redis_conn.subscriber.manage_subscriptions(); + + self.redis_conn + .subscriber + .subscribe::<(), _>(channel) + .await + .into_report() + .change_context(redis_interface::errors::RedisError::SubscribeError)?; + + // TODO: Handle on message failures + // let redis_clone = self.redis_conn.clone(); + // tokio::spawn(async move { + // if let Err(e) = redis_clone.on_message().await { + // logger::error!(pubsub_err=?e); + // } + // }); + Ok(()) + } +} diff --git a/crates/storage_impl/src/redis/kv_store.rs b/crates/storage_impl/src/redis/kv_store.rs new file mode 100644 index 0000000000..61e2dafb6f --- /dev/null +++ b/crates/storage_impl/src/redis/kv_store.rs @@ -0,0 +1,28 @@ +pub(crate) trait KvStorePartition { + fn partition_number(key: PartitionKey<'_>, num_partitions: u8) -> u32 { + crc32fast::hash(key.to_string().as_bytes()) % u32::from(num_partitions) + } + + fn shard_key(key: PartitionKey<'_>, num_partitions: u8) -> String { + format!("shard_{}", Self::partition_number(key, num_partitions)) + } +} + +#[allow(unused)] +pub(crate) enum PartitionKey<'a> { + MerchantIdPaymentId { + merchant_id: &'a str, + payment_id: &'a str, + }, +} + +impl<'a> std::fmt::Display for PartitionKey<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match *self { + PartitionKey::MerchantIdPaymentId { + merchant_id, + payment_id, + } => f.write_str(&format!("mid_{merchant_id}_pid_{payment_id}")), + } + } +}