diff --git a/Cargo.lock b/Cargo.lock index e9205501b5..db913f6c00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1140,6 +1140,21 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257" +[[package]] +name = "drainer" +version = "0.1.0" +dependencies = [ + "error-stack", + "fred", + "redis_interface", + "router", + "serde_json", + "storage_models", + "structopt", + "thiserror", + "tokio", +] + [[package]] name = "dyn-clone" version = "1.0.9" diff --git a/config/config.example.toml b/config/config.example.toml index eea540ff83..1187de15f0 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -143,3 +143,4 @@ batch_size = 200 # Specifies the batch size the producer will push under a singl [drainer] stream_name = "DRAINER_STREAM" # Specifies the stream name to be used by the drainer num_partitions = 64 # Specifies the number of partitions the stream will be divided into +max_read_count = 100 # Specifies the maximum number of entries that would be read from redis stream in one call \ No newline at end of file diff --git a/crates/drainer/Cargo.toml b/crates/drainer/Cargo.toml new file mode 100644 index 0000000000..1ec8d37ad0 --- /dev/null +++ b/crates/drainer/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "drainer" +description = "App that reads redis streams and executes queries in DB" +version = "0.1.0" +edition = "2021" +rust-version = "1.64" +readme = "README.md" +license = "Apache-2.0" + +[dependencies] +error-stack = "0.2.4" +fred = { version = "5.2.0", features = ["metrics", "partial-tracing"] } +serde_json = "1.0.89" +structopt = "0.3.26" +thiserror = "1.0.37" +tokio = { version = "1.21.2", features = ["macros", "rt-multi-thread"] } +# First Party Crates +redis_interface = { version = "0.1.0", path = "../redis_interface" } +router = { version = "0.2.0", path = "../router", features = ["kv_store"] } +storage_models = { version = "0.1.0", path = "../storage_models", features = ["kv_store"] } \ No newline at end of file diff --git a/crates/drainer/README.md b/crates/drainer/README.md new file mode 100644 index 0000000000..25e0320a75 --- /dev/null +++ b/crates/drainer/README.md @@ -0,0 +1,3 @@ +# Drainer + +App that reads redis streams and executes queries in DB \ No newline at end of file diff --git a/crates/drainer/src/errors.rs b/crates/drainer/src/errors.rs new file mode 100644 index 0000000000..463f6de342 --- /dev/null +++ b/crates/drainer/src/errors.rs @@ -0,0 +1,19 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum DrainerError { + #[error("Error in parsing config : {0}")] + ConfigParsingError(String), + #[error("Error fetching stream length for stream : {0}")] + StreamGetLengthError(String), + #[error("Error reading from stream : {0}")] + StreamReadError(String), + #[error("Error triming from stream: {0}")] + StreamTrimFailed(String), + #[error("No entries found for stream: {0}")] + NoStreamEntry(String), + #[error("Error in making stream: {0} available")] + DeleteKeyFailed(String), +} + +pub type DrainerResult = error_stack::Result; diff --git a/crates/drainer/src/lib.rs b/crates/drainer/src/lib.rs new file mode 100644 index 0000000000..2e881b813b --- /dev/null +++ b/crates/drainer/src/lib.rs @@ -0,0 +1,107 @@ +pub mod errors; +mod utils; +use std::sync::Arc; + +use router::{connection::pg_connection, services::Store}; +use storage_models::kv; + +pub async fn start_drainer( + store: Arc, + number_of_streams: u8, + max_read_count: u64, +) -> errors::DrainerResult<()> { + let mut stream_index: u8 = 0; + + loop { + if utils::is_stream_available(stream_index, store.clone()).await { + tokio::spawn(drainer_handler(store.clone(), stream_index, max_read_count)); + } + stream_index = utils::increment_stream_index(stream_index, number_of_streams); + } +} + +async fn drainer_handler( + store: Arc, + stream_index: u8, + max_read_count: u64, +) -> errors::DrainerResult<()> { + let stream_name = utils::get_drainer_stream(store.clone(), stream_index); + let drainer_result = drainer(store.clone(), max_read_count, stream_name.as_str()).await; + + if let Err(_e) = drainer_result { + //TODO: LOG ERRORs + } + + let flag_stream_name = utils::get_stream_key_flag(store.clone(), stream_index); + //TODO: USE THE RESULT FOR LOGGING + utils::make_stream_available(flag_stream_name.as_str(), store.redis_conn.as_ref()).await +} + +async fn drainer( + store: Arc, + max_read_count: u64, + stream_name: &str, +) -> errors::DrainerResult<()> { + let stream_read = + utils::read_from_stream(stream_name, max_read_count, store.redis_conn.as_ref()).await?; // this returns the error. + + // parse_stream_entries returns error if no entries is found, handle it + let (entries, last_entry_id) = utils::parse_stream_entries(&stream_read, stream_name)?; + + let read_count = entries.len(); + + // TODO: Handle errors when deserialization fails and when DB error occurs + for entry in entries { + let typed_sql = entry.1.get("typed_sql").map_or(String::new(), Clone::clone); + let result = serde_json::from_str::(&typed_sql); + let db_op = match result { + Ok(f) => f, + Err(_err) => continue, // TODO: handle error + }; + + let conn = pg_connection(&store.master_pool).await; + + match db_op { + // TODO: Handle errors + kv::DBOperation::Insert { insertable } => match insertable { + kv::Insertable::PaymentIntent(a) => { + macro_util::handle_resp!(a.insert(&conn).await, "ins", "pi") + } + kv::Insertable::PaymentAttempt(a) => { + macro_util::handle_resp!(a.insert(&conn).await, "ins", "pa") + } + }, + kv::DBOperation::Update { updatable } => match updatable { + kv::Updateable::PaymentIntentUpdate(a) => { + macro_util::handle_resp!(a.orig.update(&conn, a.update_data).await, "up", "pi") + } + kv::Updateable::PaymentAttemptUpdate(a) => { + macro_util::handle_resp!(a.orig.update(&conn, a.update_data).await, "up", "pa") + } + }, + kv::DBOperation::Delete => todo!(), + }; + } + + let entries_trimmed = + utils::trim_from_stream(stream_name, last_entry_id.as_str(), &store.redis_conn).await?; + + if read_count != entries_trimmed { + // TODO: log + } + + Ok(()) +} + +mod macro_util { + + macro_rules! handle_resp { + ($result:expr,$op_type:expr, $table:expr) => { + match $result { + Ok(aa) => println!("Ok|{}|{}|{:?}|", $op_type, $table, aa), + Err(err) => println!("Err|{}|{}|{:?}|", $op_type, $table, err), + } + }; + } + pub(crate) use handle_resp; +} diff --git a/crates/drainer/src/main.rs b/crates/drainer/src/main.rs new file mode 100644 index 0000000000..322a615011 --- /dev/null +++ b/crates/drainer/src/main.rs @@ -0,0 +1,20 @@ +use drainer::{errors::DrainerResult, start_drainer}; +use router::configs::settings; +use structopt::StructOpt; + +#[tokio::main] +async fn main() -> DrainerResult<()> { + // Get configuration + let cmd_line = settings::CmdLineConf::from_args(); + let conf = settings::Settings::with_config_path(cmd_line.config_path).unwrap(); + + let store = router::services::Store::new(&conf, false).await; + let store = std::sync::Arc::new(store); + + let number_of_drainers = conf.drainer.num_partitions; + let max_read_count = conf.drainer.max_read_count; + + start_drainer(store, number_of_drainers, max_read_count).await?; + + Ok(()) +} diff --git a/crates/drainer/src/utils.rs b/crates/drainer/src/utils.rs new file mode 100644 index 0000000000..c931a1bbfd --- /dev/null +++ b/crates/drainer/src/utils.rs @@ -0,0 +1,121 @@ +use std::{collections::HashMap, sync::Arc}; + +use error_stack::{IntoReport, ResultExt}; +use fred::types as fred; +use redis_interface as redis; +use router::services::Store; + +use crate::errors; + +pub type StreamEntries = Vec<(String, HashMap)>; +pub type StreamReadResult = HashMap; + +pub async fn is_stream_available(stream_index: u8, store: Arc) -> bool { + let stream_key_flag = get_stream_key_flag(store.clone(), stream_index); + + match store + .redis_conn + .set_key_if_not_exist(stream_key_flag.as_str(), true) + .await + { + Ok(resp) => resp == redis::types::SetnxReply::KeySet, + Err(_e) => { + // Add metrics or logs + false + } + } +} + +pub async fn read_from_stream( + stream_name: &str, + max_read_count: u64, + redis: &redis::RedisConnectionPool, +) -> errors::DrainerResult { + let stream_key = fred::MultipleKeys::from(stream_name); + // "0-0" id gives first entry + let stream_id = "0-0"; + let entries = redis + .stream_read_entries(stream_key, stream_id, Some(max_read_count)) + .await + .change_context(errors::DrainerError::StreamReadError( + stream_name.to_owned(), + ))?; + Ok(entries) +} + +pub async fn trim_from_stream( + stream_name: &str, + minimum_entry_id: &str, + redis: &redis::RedisConnectionPool, +) -> errors::DrainerResult { + let trim_kind = fred::XCapKind::MinID; + let trim_type = fred::XCapTrim::Exact; + let trim_id = fred::StringOrNumber::String(minimum_entry_id.into()); + let xcap = fred::XCap::try_from((trim_kind, trim_type, trim_id)) + .into_report() + .change_context(errors::DrainerError::StreamTrimFailed( + stream_name.to_owned(), + ))?; + + let trim_result = redis + .stream_trim_entries(stream_name, xcap) + .await + .change_context(errors::DrainerError::StreamTrimFailed( + stream_name.to_owned(), + ))?; + + // Since xtrim deletes entires below given id excluding the given id. + // Hence, deleting the minimum entry id + redis + .stream_delete_entries(stream_name, minimum_entry_id) + .await + .change_context(errors::DrainerError::StreamTrimFailed( + stream_name.to_owned(), + ))?; + + // adding 1 because we are deleting the given id too + Ok(trim_result + 1) +} + +pub async fn make_stream_available( + stream_name_flag: &str, + redis: &redis::RedisConnectionPool, +) -> errors::DrainerResult<()> { + redis + .delete_key(stream_name_flag) + .await + .change_context(errors::DrainerError::DeleteKeyFailed( + stream_name_flag.to_owned(), + )) +} + +pub fn parse_stream_entries<'a>( + read_result: &'a StreamReadResult, + stream_name: &str, +) -> errors::DrainerResult<(&'a StreamEntries, String)> { + read_result + .get(stream_name) + .and_then(|entries| { + entries + .last() + .map(|last_entry| (entries, last_entry.0.clone())) + }) + .ok_or_else(|| errors::DrainerError::NoStreamEntry(stream_name.to_owned())) + .into_report() +} + +pub fn increment_stream_index(index: u8, total_streams: u8) -> u8 { + if index == total_streams - 1 { + 0 + } else { + index + 1 + } +} + +pub(crate) fn get_stream_key_flag(store: Arc, stream_index: u8) -> String { + format!("{}_in_use", get_drainer_stream(store, stream_index)) +} + +pub(crate) fn get_drainer_stream(store: Arc, stream_index: u8) -> String { + store.drainer_stream(format!("shard_{}", stream_index).as_str()) +} diff --git a/crates/redis_interface/src/commands.rs b/crates/redis_interface/src/commands.rs index ef29c5d867..919a38e49d 100644 --- a/crates/redis_interface/src/commands.rs +++ b/crates/redis_interface/src/commands.rs @@ -17,7 +17,7 @@ use fred::{ interfaces::{HashesInterface, KeysInterface, StreamsInterface}, types::{ Expiration, FromRedis, MultipleIDs, MultipleKeys, MultipleOrderedPairs, MultipleStrings, - RedisMap, RedisValue, SetOptions, XReadResponse, + RedisKey, RedisMap, RedisValue, SetOptions, XCap, XReadResponse, }, }; use router_env::{tracing, tracing::instrument}; @@ -313,6 +313,23 @@ impl super::RedisConnectionPool { .change_context(errors::RedisError::StreamDeleteFailed) } + #[instrument(level = "DEBUG", skip(self))] + pub async fn stream_trim_entries( + &self, + stream: &str, + xcap: C, + ) -> CustomResult + where + C: TryInto + Debug, + C::Error: Into, + { + self.pool + .xtrim(stream, xcap) + .await + .into_report() + .change_context(errors::RedisError::StreamTrimFailed) + } + #[instrument(level = "DEBUG", skip(self))] pub async fn stream_acknowledge_entries( &self, @@ -330,19 +347,32 @@ impl super::RedisConnectionPool { .change_context(errors::RedisError::StreamAcknowledgeFailed) } + #[instrument(level = "DEBUG", skip(self))] + pub async fn stream_get_length(&self, stream: K) -> CustomResult + where + K: Into + Debug, + { + self.pool + .xlen(stream) + .await + .into_report() + .change_context(errors::RedisError::GetLengthFailed) + } + #[instrument(level = "DEBUG", skip(self))] pub async fn stream_read_entries( &self, streams: K, ids: Ids, + read_count: Option, ) -> CustomResult, errors::RedisError> where K: Into + Debug, Ids: Into + Debug, { self.pool - .xread( - Some(self.config.default_stream_read_count), + .xread_map( + Some(read_count.unwrap_or(self.config.default_stream_read_count)), None, streams, ids, diff --git a/crates/redis_interface/src/errors.rs b/crates/redis_interface/src/errors.rs index 54db5663a5..43318ee06a 100644 --- a/crates/redis_interface/src/errors.rs +++ b/crates/redis_interface/src/errors.rs @@ -18,8 +18,12 @@ pub enum RedisError { StreamAppendFailed, #[error("Failed to read entries from Redis stream")] StreamReadFailed, + #[error("Failed to get stream length")] + GetLengthFailed, #[error("Failed to delete entries from Redis stream")] StreamDeleteFailed, + #[error("Failed to trim entries from Redis stream")] + StreamTrimFailed, #[error("Failed to acknowledge Redis stream entry")] StreamAcknowledgeFailed, #[error("Failed to create Redis consumer group")] diff --git a/crates/router/Cargo.toml b/crates/router/Cargo.toml index be6a728a86..61d70737da 100644 --- a/crates/router/Cargo.toml +++ b/crates/router/Cargo.toml @@ -75,7 +75,7 @@ masking = { version = "0.1.0", path = "../masking" } redis_interface = { version = "0.1.0", path = "../redis_interface" } router_derive = { version = "0.1.0", path = "../router_derive" } router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] } -storage_models = { version = "0.1.0", path = "../storage_models" } +storage_models = { version = "0.1.0", path = "../storage_models", features = ["kv_store"] } [build-dependencies] router_env = { version = "0.1.0", path = "../router_env", default-features = false, features = ["vergen"] } diff --git a/crates/router/src/configs/defaults.toml b/crates/router/src/configs/defaults.toml index 59e24b38e8..c6680ebaac 100644 --- a/crates/router/src/configs/defaults.toml +++ b/crates/router/src/configs/defaults.toml @@ -53,8 +53,9 @@ batch_size = 200 [drainer] stream_name = "DRAINER_STREAM" -num_partitions = 4 +num_partitions = 64 +max_read_count = 100 [connectors.supported] -wallets = ["klarna","braintree"] -cards = ["stripe","adyen","authorizedotnet","checkout","braintree"] \ No newline at end of file +wallets = ["klarna", "braintree"] +cards = ["stripe", "adyen", "authorizedotnet", "checkout", "braintree"] diff --git a/crates/router/src/configs/settings.rs b/crates/router/src/configs/settings.rs index 55568caf25..fa7fdefe7a 100644 --- a/crates/router/src/configs/settings.rs +++ b/crates/router/src/configs/settings.rs @@ -145,6 +145,7 @@ pub struct ProducerSettings { pub struct DrainerSettings { pub stream_name: String, pub num_partitions: u8, + pub max_read_count: u64, } impl Settings { diff --git a/crates/router/src/db/payment_attempt.rs b/crates/router/src/db/payment_attempt.rs index edf1c6ccac..e6cb06fc70 100644 --- a/crates/router/src/db/payment_attempt.rs +++ b/crates/router/src/db/payment_attempt.rs @@ -314,7 +314,7 @@ mod storage { connection::pg_connection, core::errors::{self, CustomResult}, services::Store, - types::storage::{enums, payment_attempt::*}, + types::storage::{enums, kv, payment_attempt::*}, utils::storage_partitioning::KvStorePartition, }; @@ -386,11 +386,11 @@ mod storage { )) .into_report(), Ok(HsetnxReply::KeySet) => { - let conn = pg_connection(&self.master_pool).await; - let query = payment_attempt - .insert_query(&conn) - .await - .change_context(errors::StorageError::KVError)?; + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Insert { + insertable: kv::Insertable::PaymentAttempt(payment_attempt), + }, + }; let stream_name = self.drainer_stream(&PaymentAttempt::shard_key( crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId { merchant_id: &created_attempt.merchant_id, @@ -402,7 +402,9 @@ mod storage { .stream_append_entry( &stream_name, &RedisEntryId::AutoGeneratedID, - query.to_field_value_pairs(), + redis_entry + .to_field_value_pairs() + .change_context(errors::StorageError::KVError)?, ) .await .change_context(errors::StorageError::KVError)?; @@ -444,11 +446,17 @@ mod storage { .map(|_| updated_attempt) .change_context(errors::StorageError::KVError)?; - let conn = pg_connection(&self.master_pool).await; - let query = this - .update_query(&conn, payment_attempt) - .await - .change_context(errors::StorageError::KVError)?; + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Update { + updatable: kv::Updateable::PaymentAttemptUpdate( + kv::PaymentAttemptUpdateMems { + orig: this, + update_data: payment_attempt, + }, + ), + }, + }; + let stream_name = self.drainer_stream(&PaymentAttempt::shard_key( crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId { merchant_id: &updated_attempt.merchant_id, @@ -460,7 +468,9 @@ mod storage { .stream_append_entry( &stream_name, &RedisEntryId::AutoGeneratedID, - query.to_field_value_pairs(), + redis_entry + .to_field_value_pairs() + .change_context(errors::StorageError::KVError)?, ) .await .change_context(errors::StorageError::KVError)?; diff --git a/crates/router/src/db/payment_intent.rs b/crates/router/src/db/payment_intent.rs index c88419f8a7..494337714f 100644 --- a/crates/router/src/db/payment_intent.rs +++ b/crates/router/src/db/payment_intent.rs @@ -50,7 +50,7 @@ mod storage { services::Store, types::{ api, - storage::{enums, payment_intent::*}, + storage::{enums, kv, payment_intent::*}, }, utils::storage_partitioning::KvStorePartition, }; @@ -105,11 +105,11 @@ mod storage { )) .into_report(), Ok(HsetnxReply::KeySet) => { - let conn = pg_connection(&self.master_pool).await; - let query = new - .insert_query(&conn) - .await - .change_context(errors::StorageError::KVError)?; + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Insert { + insertable: kv::Insertable::PaymentIntent(new), + }, + }; let stream_name = self.drainer_stream(&PaymentIntent::shard_key( crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId { merchant_id: &created_intent.merchant_id, @@ -121,7 +121,9 @@ mod storage { .stream_append_entry( &stream_name, &RedisEntryId::AutoGeneratedID, - query.to_field_value_pairs(), + redis_entry + .to_field_value_pairs() + .change_context(errors::StorageError::KVError)?, ) .await .change_context(errors::StorageError::KVError)?; @@ -163,11 +165,17 @@ mod storage { .map(|_| updated_intent) .change_context(errors::StorageError::KVError)?; - let conn = pg_connection(&self.master_pool).await; - let query = this - .update_query(&conn, payment_intent) - .await - .change_context(errors::StorageError::KVError)?; + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Update { + updatable: kv::Updateable::PaymentIntentUpdate( + kv::PaymentIntentUpdateMems { + orig: this, + update_data: payment_intent, + }, + ), + }, + }; + let stream_name = self.drainer_stream(&PaymentIntent::shard_key( crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId { merchant_id: &updated_intent.merchant_id, @@ -179,7 +187,9 @@ mod storage { .stream_append_entry( &stream_name, &RedisEntryId::AutoGeneratedID, - query.to_field_value_pairs(), + redis_entry + .to_field_value_pairs() + .change_context(errors::StorageError::KVError)?, ) .await .change_context(errors::StorageError::KVError)?; diff --git a/crates/router/src/services.rs b/crates/router/src/services.rs index 085336b71c..97adc84e86 100644 --- a/crates/router/src/services.rs +++ b/crates/router/src/services.rs @@ -41,7 +41,7 @@ impl Store { #[cfg(feature = "kv_store")] pub fn drainer_stream(&self, shard_key: &str) -> String { - // "{shard_key}_stream_name" + // Example: {shard_5}_drainer_stream format!("{{{}}}_{}", shard_key, self.config.drainer_stream_name,) } } diff --git a/crates/router/src/types/storage.rs b/crates/router/src/types/storage.rs index 8f6a4fe4d9..74f3c3493c 100644 --- a/crates/router/src/types/storage.rs +++ b/crates/router/src/types/storage.rs @@ -18,6 +18,9 @@ mod query; pub mod refund; pub mod temp_card; +#[cfg(feature = "kv_store")] +pub mod kv; + pub use self::{ address::*, configs::*, connector_response::*, customers::*, events::*, locker_mock_up::*, mandate::*, merchant_account::*, merchant_connector_account::*, payment_attempt::*, diff --git a/crates/router/src/types/storage/kv.rs b/crates/router/src/types/storage/kv.rs new file mode 100644 index 0000000000..cf30f4d2f5 --- /dev/null +++ b/crates/router/src/types/storage/kv.rs @@ -0,0 +1,4 @@ +pub use storage_models::kv::{ + DBOperation, Insertable, PaymentAttemptUpdateMems, PaymentIntentUpdateMems, TypedSql, + Updateable, +}; diff --git a/crates/router/src/types/storage/payment_intent.rs b/crates/router/src/types/storage/payment_intent.rs index 3a24b787f6..55739e9e87 100644 --- a/crates/router/src/types/storage/payment_intent.rs +++ b/crates/router/src/types/storage/payment_intent.rs @@ -38,7 +38,6 @@ impl PaymentIntentDbExt for PaymentIntent { //TODO: Replace this with Boxable Expression and pass it into generic filter // when https://github.com/rust-lang/rust/issues/52662 becomes stable - let mut filter = ::table() .filter(dsl::merchant_id.eq(merchant_id.to_owned())) .order_by(dsl::id) diff --git a/crates/storage_models/Cargo.toml b/crates/storage_models/Cargo.toml index 2d0d2309ea..7ff50dc3b2 100644 --- a/crates/storage_models/Cargo.toml +++ b/crates/storage_models/Cargo.toml @@ -5,6 +5,10 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = ["kv_store"] +kv_store = [] + [dependencies] async-bb8-diesel = { git = "https://github.com/juspay/async-bb8-diesel", rev = "412663e16802dbc58a1b98bfcbe78fa0090311eb" } async-trait = "0.1.57" @@ -23,4 +27,4 @@ time = { version = "0.3.14", features = ["serde", "serde-well-known", "std"] } common_utils = { version = "0.1.0", path = "../common_utils" } masking = { version = "0.1.0", path = "../masking" } router_derive = { version = "0.1.0", path = "../router_derive" } -router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] } +router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] } \ No newline at end of file diff --git a/crates/storage_models/src/errors.rs b/crates/storage_models/src/errors.rs index ded9155f3f..0a8422131a 100644 --- a/crates/storage_models/src/errors.rs +++ b/crates/storage_models/src/errors.rs @@ -8,7 +8,7 @@ pub enum DatabaseError { UniqueViolation, #[error("No fields were provided to be updated")] NoFieldsToUpdate, - #[error("An error occurred when generating raw SQL query")] + #[error("An error occurred when generating typed SQL query")] QueryGenerationFailed, // InsertFailed, #[error("An unknown error occurred")] diff --git a/crates/storage_models/src/kv.rs b/crates/storage_models/src/kv.rs new file mode 100644 index 0000000000..3a23a8323e --- /dev/null +++ b/crates/storage_models/src/kv.rs @@ -0,0 +1,61 @@ +use error_stack::{IntoReport, ResultExt}; +use serde::{Deserialize, Serialize}; + +use crate::{ + errors, + payment_attempt::{PaymentAttempt, PaymentAttemptNew, PaymentAttemptUpdate}, + payment_intent::{PaymentIntent, PaymentIntentNew, PaymentIntentUpdate}, +}; + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case", tag = "db_op", content = "data")] +pub enum DBOperation { + Insert { insertable: Insertable }, + Update { updatable: Updateable }, + Delete, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct TypedSql { + #[serde(flatten)] + pub op: DBOperation, +} + +impl TypedSql { + pub fn to_field_value_pairs( + &self, + ) -> crate::CustomResult, errors::DatabaseError> { + Ok(vec![( + "typed_sql", + serde_json::to_string(self) + .into_report() + .change_context(errors::DatabaseError::QueryGenerationFailed)?, + )]) + } +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case", tag = "table", content = "data")] +pub enum Insertable { + PaymentIntent(PaymentIntentNew), + PaymentAttempt(PaymentAttemptNew), +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case", tag = "table", content = "data")] +pub enum Updateable { + PaymentIntentUpdate(PaymentIntentUpdateMems), + PaymentAttemptUpdate(PaymentAttemptUpdateMems), +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct PaymentIntentUpdateMems { + pub orig: PaymentIntent, + pub update_data: PaymentIntentUpdate, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct PaymentAttemptUpdateMems { + pub orig: PaymentAttempt, + pub update_data: PaymentAttemptUpdate, +} diff --git a/crates/storage_models/src/lib.rs b/crates/storage_models/src/lib.rs index 37c637fccc..ba3f4f8616 100644 --- a/crates/storage_models/src/lib.rs +++ b/crates/storage_models/src/lib.rs @@ -7,6 +7,8 @@ pub mod enums; pub mod ephemeral_key; pub mod errors; pub mod events; +#[cfg(feature = "kv_store")] +pub mod kv; pub mod locker_mock_up; pub mod mandate; pub mod merchant_account; diff --git a/crates/storage_models/src/payment_attempt.rs b/crates/storage_models/src/payment_attempt.rs index 7cd6ca9eb6..b7bc28d849 100644 --- a/crates/storage_models/src/payment_attempt.rs +++ b/crates/storage_models/src/payment_attempt.rs @@ -40,7 +40,9 @@ pub struct PaymentAttempt { pub error_code: Option, } -#[derive(Clone, Debug, Default, Insertable, router_derive::DebugAsDisplay)] +#[derive( + Clone, Debug, Default, Insertable, router_derive::DebugAsDisplay, Serialize, Deserialize, +)] #[diesel(table_name = payment_attempt)] pub struct PaymentAttemptNew { pub payment_id: String, @@ -76,7 +78,7 @@ pub struct PaymentAttemptNew { pub error_code: Option, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum PaymentAttemptUpdate { Update { amount: i64, diff --git a/crates/storage_models/src/payment_intent.rs b/crates/storage_models/src/payment_intent.rs index e13bdebb36..3077c884f8 100644 --- a/crates/storage_models/src/payment_intent.rs +++ b/crates/storage_models/src/payment_intent.rs @@ -31,7 +31,17 @@ pub struct PaymentIntent { pub client_secret: Option, } -#[derive(Clone, Debug, Default, Eq, PartialEq, Insertable, router_derive::DebugAsDisplay)] +#[derive( + Clone, + Debug, + Default, + Eq, + PartialEq, + Insertable, + router_derive::DebugAsDisplay, + Serialize, + Deserialize, +)] #[diesel(table_name = payment_intent)] pub struct PaymentIntentNew { pub payment_id: String, @@ -57,7 +67,7 @@ pub struct PaymentIntentNew { pub off_session: Option, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum PaymentIntentUpdate { ResponseUpdate { status: storage_enums::IntentStatus, diff --git a/crates/storage_models/src/query/address.rs b/crates/storage_models/src/query/address.rs index b472290b51..636ff1ec4e 100644 --- a/crates/storage_models/src/query/address.rs +++ b/crates/storage_models/src/query/address.rs @@ -1,7 +1,7 @@ use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods}; use router_env::{tracing, tracing::instrument}; -use super::generics::{self, ExecuteQuery}; +use super::generics; use crate::{ address::{Address, AddressNew, AddressUpdate, AddressUpdateInternal}, errors, @@ -12,7 +12,7 @@ use crate::{ impl AddressNew { #[instrument(skip(conn))] pub async fn insert(self, conn: &PgPooledConn) -> CustomResult { - generics::generic_insert::<_, _, Address, _>(conn, self, ExecuteQuery::new()).await + generics::generic_insert(conn, self).await } } @@ -23,11 +23,10 @@ impl Address { address_id: String, address: AddressUpdate, ) -> CustomResult { - match generics::generic_update_by_id::<::Table, _, _, Self, _>( + match generics::generic_update_by_id::<::Table, _, _, _>( conn, address_id.clone(), AddressUpdateInternal::from(address), - ExecuteQuery::new(), ) .await { @@ -53,10 +52,9 @@ impl Address { conn: &PgPooledConn, address_id: &str, ) -> CustomResult { - generics::generic_delete::<::Table, _, _>( + generics::generic_delete::<::Table, _>( conn, dsl::address_id.eq(address_id.to_owned()), - ExecuteQuery::::new(), ) .await } @@ -67,13 +65,12 @@ impl Address { merchant_id: &str, address: AddressUpdate, ) -> CustomResult, errors::DatabaseError> { - generics::generic_update_with_results::<::Table, _, _, Self, _>( + generics::generic_update_with_results::<::Table, _, _, _>( conn, dsl::merchant_id .eq(merchant_id.to_owned()) .and(dsl::customer_id.eq(customer_id.to_owned())), AddressUpdateInternal::from(address), - ExecuteQuery::new(), ) .await } diff --git a/crates/storage_models/src/query/configs.rs b/crates/storage_models/src/query/configs.rs index d86e4f08de..05fdefbc6a 100644 --- a/crates/storage_models/src/query/configs.rs +++ b/crates/storage_models/src/query/configs.rs @@ -1,7 +1,7 @@ use diesel::associations::HasTable; use router_env::tracing::{self, instrument}; -use super::generics::{self, ExecuteQuery}; +use super::generics; use crate::{ configs::{Config, ConfigNew, ConfigUpdate, ConfigUpdateInternal}, errors, CustomResult, PgPooledConn, @@ -10,7 +10,7 @@ use crate::{ impl ConfigNew { #[instrument(skip(conn))] pub async fn insert(self, conn: &PgPooledConn) -> CustomResult { - generics::generic_insert::<_, _, Config, _>(conn, self, ExecuteQuery::new()).await + generics::generic_insert(conn, self).await } } @@ -29,11 +29,10 @@ impl Config { key: &str, config_update: ConfigUpdate, ) -> CustomResult { - match generics::generic_update_by_id::<::Table, _, _, Self, _>( + match generics::generic_update_by_id::<::Table, _, _, _>( conn, key.to_owned(), ConfigUpdateInternal::from(config_update), - ExecuteQuery::new(), ) .await { diff --git a/crates/storage_models/src/query/connector_response.rs b/crates/storage_models/src/query/connector_response.rs index 38febb82fa..6a390f0c84 100644 --- a/crates/storage_models/src/query/connector_response.rs +++ b/crates/storage_models/src/query/connector_response.rs @@ -1,7 +1,7 @@ use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods}; use router_env::{tracing, tracing::instrument}; -use super::generics::{self, ExecuteQuery}; +use super::generics; use crate::{ connector_response::{ ConnectorResponse, ConnectorResponseNew, ConnectorResponseUpdate, @@ -18,8 +18,7 @@ impl ConnectorResponseNew { self, conn: &PgPooledConn, ) -> CustomResult { - generics::generic_insert::<_, _, ConnectorResponse, _>(conn, self, ExecuteQuery::new()) - .await + generics::generic_insert(conn, self).await } } @@ -30,11 +29,10 @@ impl ConnectorResponse { conn: &PgPooledConn, connector_response: ConnectorResponseUpdate, ) -> CustomResult { - match generics::generic_update_by_id::<::Table, _, _, Self, _>( + match generics::generic_update_by_id::<::Table, _, _, _>( conn, self.id, ConnectorResponseUpdateInternal::from(connector_response), - ExecuteQuery::new(), ) .await { diff --git a/crates/storage_models/src/query/customers.rs b/crates/storage_models/src/query/customers.rs index b458aa2ad5..294f73b695 100644 --- a/crates/storage_models/src/query/customers.rs +++ b/crates/storage_models/src/query/customers.rs @@ -1,7 +1,7 @@ use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods}; use router_env::{tracing, tracing::instrument}; -use super::generics::{self, ExecuteQuery}; +use super::generics; use crate::{ customers::{Customer, CustomerNew, CustomerUpdate, CustomerUpdateInternal}, errors, @@ -15,7 +15,7 @@ impl CustomerNew { self, conn: &PgPooledConn, ) -> CustomResult { - generics::generic_insert::<_, _, Customer, _>(conn, self, ExecuteQuery::new()).await + generics::generic_insert(conn, self).await } } @@ -27,11 +27,10 @@ impl Customer { merchant_id: String, customer: CustomerUpdate, ) -> CustomResult { - match generics::generic_update_by_id::<::Table, _, _, Self, _>( + match generics::generic_update_by_id::<::Table, _, _, _>( conn, (customer_id.clone(), merchant_id.clone()), CustomerUpdateInternal::from(customer), - ExecuteQuery::new(), ) .await { @@ -55,12 +54,11 @@ impl Customer { customer_id: &str, merchant_id: &str, ) -> CustomResult { - generics::generic_delete::<::Table, _, _>( + generics::generic_delete::<::Table, _>( conn, dsl::customer_id .eq(customer_id.to_owned()) .and(dsl::merchant_id.eq(merchant_id.to_owned())), - ExecuteQuery::::new(), ) .await } diff --git a/crates/storage_models/src/query/events.rs b/crates/storage_models/src/query/events.rs index 341d029d2d..2a53dcc316 100644 --- a/crates/storage_models/src/query/events.rs +++ b/crates/storage_models/src/query/events.rs @@ -1,6 +1,6 @@ use router_env::tracing::{self, instrument}; -use super::generics::{self, ExecuteQuery}; +use super::generics; use crate::{ errors, events::{Event, EventNew}, @@ -10,6 +10,6 @@ use crate::{ impl EventNew { #[instrument(skip(conn))] pub async fn insert(self, conn: &PgPooledConn) -> CustomResult { - generics::generic_insert::<_, _, Event, _>(conn, self, ExecuteQuery::new()).await + generics::generic_insert(conn, self).await } } diff --git a/crates/storage_models/src/query/generics.rs b/crates/storage_models/src/query/generics.rs index df72cf3d7b..bd2591e0c1 100644 --- a/crates/storage_models/src/query/generics.rs +++ b/crates/storage_models/src/query/generics.rs @@ -1,7 +1,6 @@ -use std::{fmt::Debug, marker::PhantomData}; +use std::fmt::Debug; use async_bb8_diesel::{AsyncRunQueryDsl, ConnectionError}; -use async_trait::async_trait; use diesel::{ associations::HasTable, debug_query, @@ -24,402 +23,44 @@ use router_env::{logger, tracing, tracing::instrument}; use crate::{errors, CustomResult, PgPooledConn}; -#[derive(Debug)] -pub struct RawSqlQuery { - pub sql: String, - // The inner `Vec` can be considered to be byte array - pub binds: Vec>>, -} - -impl RawSqlQuery { - pub fn to_field_value_pairs(&self) -> Vec<(&str, String)> { - vec![ - ("sql", self.sql.clone()), - ( - "binds", - serde_json::to_string( - &self - .binds - .iter() - .map(|bytes| bytes.as_ref().map(hex::encode)) - .collect::>(), - ) - .unwrap(), - ), - ] - } -} - -pub struct ExecuteQuery(PhantomData); - -impl ExecuteQuery { - pub fn new() -> Self { - Self(PhantomData) - } -} - -impl Default for ExecuteQuery { - fn default() -> Self { - Self::new() - } -} - -pub struct RawQuery; - -#[async_trait] -pub trait QueryExecutionMode -where - Q: QueryFragment + Send + 'static, -{ - type InsertOutput; - type UpdateOutput; - type UpdateWithResultsOutput; - type UpdateByIdOutput; - type DeleteOutput; - type DeleteWithResultsOutput; - type DeleteOneWithResultOutput; - - async fn insert( - &self, - conn: &PgPooledConn, - query: Q, - debug_values: String, - ) -> CustomResult - where - Q: AsQuery + QueryFragment + RunQueryDsl; - - async fn update( - &self, - conn: &PgPooledConn, - query: Q, - debug_values: String, - ) -> CustomResult - where - Q: QueryId; - - async fn update_with_results( - &self, - conn: &PgPooledConn, - query: Q, - debug_values: String, - ) -> CustomResult; - - async fn update_by_id( - &self, - conn: &PgPooledConn, - query: Q, - debug_values: String, - ) -> CustomResult - where - Q: Clone; - - async fn delete( - &self, - conn: &PgPooledConn, - query: Q, - ) -> CustomResult - where - Q: QueryId; - - async fn delete_with_results( - &self, - conn: &PgPooledConn, - query: Q, - ) -> CustomResult; - - async fn delete_one_with_result( - &self, - conn: &PgPooledConn, - query: Q, - ) -> CustomResult; -} - -#[async_trait] -impl QueryExecutionMode for ExecuteQuery -where - Q: LoadQuery<'static, PgConnection, R> + QueryFragment + Send + Sync + 'static, - R: Send + Sync + 'static, -{ - type InsertOutput = R; - type UpdateOutput = usize; - type UpdateWithResultsOutput = Vec; - type UpdateByIdOutput = R; - type DeleteOutput = bool; - type DeleteWithResultsOutput = Vec; - type DeleteOneWithResultOutput = R; - - async fn insert( - &self, - conn: &PgPooledConn, - query: Q, - debug_values: String, - ) -> CustomResult - where - Q: AsQuery + QueryFragment + RunQueryDsl, - { - match query.get_result_async(conn).await { - Ok(value) => Ok(value), - Err(error) => match error { - ConnectionError::Query(DieselError::DatabaseError( - diesel::result::DatabaseErrorKind::UniqueViolation, - _, - )) => Err(report!(error)).change_context(errors::DatabaseError::UniqueViolation), - _ => Err(report!(error)).change_context(errors::DatabaseError::Others), - } - .attach_printable_lazy(|| format!("Error while inserting {}", debug_values)), - } - } - - async fn update( - &self, - conn: &PgPooledConn, - query: Q, - debug_values: String, - ) -> CustomResult - where - Q: QueryId, - { - query - .execute_async(conn) - .await - .into_report() - .change_context(errors::DatabaseError::Others) - .attach_printable_lazy(|| format!("Error while updating {}", debug_values)) - } - - async fn update_with_results( - &self, - conn: &PgPooledConn, - query: Q, - debug_values: String, - ) -> CustomResult { - query - .get_results_async(conn) - .await - .into_report() - .change_context(errors::DatabaseError::Others) - .attach_printable_lazy(|| format!("Error while updating {}", debug_values)) - } - - async fn update_by_id( - &self, - conn: &PgPooledConn, - query: Q, - debug_values: String, - ) -> CustomResult - where - Q: Clone, - { - // Cloning query for calling `debug_query` later - match query.to_owned().get_result_async(conn).await { - Ok(result) => { - logger::debug!(query = %debug_query::(&query).to_string()); - Ok(result) - } - Err(error) => match error { - // Failed to generate query, no fields were provided to be updated - ConnectionError::Query(DieselError::QueryBuilderError(_)) => { - Err(report!(error)).change_context(errors::DatabaseError::NoFieldsToUpdate) - } - ConnectionError::Query(DieselError::NotFound) => { - Err(report!(error)).change_context(errors::DatabaseError::NotFound) - } - _ => Err(report!(error)).change_context(errors::DatabaseError::Others), - } - .attach_printable_lazy(|| format!("Error while updating by ID {}", debug_values)), - } - } - - async fn delete( - &self, - conn: &PgPooledConn, - query: Q, - ) -> CustomResult - where - Q: QueryId, - { - query - .execute_async(conn) - .await - .into_report() - .change_context(errors::DatabaseError::Others) - .attach_printable("Error while deleting") - .and_then(|result| match result { - n if n > 0 => { - logger::debug!("{n} records deleted"); - Ok(true) - } - 0 => { - Err(report!(errors::DatabaseError::NotFound) - .attach_printable("No records deleted")) - } - _ => Ok(true), // n is usize, rustc requires this for exhaustive check - }) - } - - async fn delete_with_results( - &self, - conn: &PgPooledConn, - query: Q, - ) -> CustomResult { - query - .get_results_async(conn) - .await - .into_report() - .change_context(errors::DatabaseError::Others) - .attach_printable("Error while deleting") - } - - async fn delete_one_with_result( - &self, - conn: &PgPooledConn, - query: Q, - ) -> CustomResult { - match query.get_result_async(conn).await { - Ok(value) => Ok(value), - Err(error) => match error { - ConnectionError::Query(DieselError::NotFound) => { - Err(report!(error)).change_context(errors::DatabaseError::NotFound) - } - _ => Err(report!(error)).change_context(errors::DatabaseError::Others), - } - .attach_printable("Error while deleting"), - } - } -} - -#[async_trait] -impl QueryExecutionMode for RawQuery -where - Q: QueryFragment + Send + 'static, -{ - type InsertOutput = RawSqlQuery; - type UpdateOutput = RawSqlQuery; - type UpdateWithResultsOutput = RawSqlQuery; - type UpdateByIdOutput = RawSqlQuery; - type DeleteOutput = RawSqlQuery; - type DeleteWithResultsOutput = RawSqlQuery; - type DeleteOneWithResultOutput = RawSqlQuery; - - async fn insert( - &self, - _conn: &PgPooledConn, - query: Q, - _debug_values: String, - ) -> CustomResult - where - Q: AsQuery + QueryFragment + RunQueryDsl, - { - generate_raw_query(query) - } - - async fn update( - &self, - _conn: &PgPooledConn, - query: Q, - _debug_values: String, - ) -> CustomResult - where - Q: QueryId, - { - generate_raw_query(query) - } - - async fn update_with_results( - &self, - _conn: &PgPooledConn, - query: Q, - _debug_values: String, - ) -> CustomResult { - generate_raw_query(query) - } - - async fn update_by_id( - &self, - _conn: &PgPooledConn, - query: Q, - _debug_values: String, - ) -> CustomResult - where - Q: Clone, - { - generate_raw_query(query) - } - - async fn delete( - &self, - _conn: &PgPooledConn, - query: Q, - ) -> CustomResult - where - Q: QueryId, - { - generate_raw_query(query) - } - - async fn delete_with_results( - &self, - _conn: &PgPooledConn, - query: Q, - ) -> CustomResult { - generate_raw_query(query) - } - - async fn delete_one_with_result( - &self, - _conn: &PgPooledConn, - query: Q, - ) -> CustomResult { - generate_raw_query(query) - } -} - -pub fn generate_raw_query(query: Q) -> CustomResult -where - Q: QueryFragment, -{ - let raw_query = diesel::query_builder::raw_query(&query) - .into_report() - .change_context(errors::DatabaseError::QueryGenerationFailed)?; - - Ok(RawSqlQuery { - sql: raw_query.raw_sql, - binds: raw_query.raw_binds, - }) -} - #[instrument(level = "DEBUG", skip_all)] -pub async fn generic_insert( +pub(super) async fn generic_insert( conn: &PgPooledConn, values: V, - execution_mode: Q, -) -> CustomResult +) -> CustomResult where T: HasTable + Table + 'static, V: Debug + Insertable, - ::FromClause: QueryFragment + Send, + ::FromClause: QueryFragment, >::Values: CanInsertInSingleQuery + QueryFragment + 'static, InsertStatement>::Values>: - AsQuery + LoadQuery<'static, PgConnection, R> + Clone + Send, + AsQuery + LoadQuery<'static, PgConnection, R> + Send, R: Send + 'static, - - Q: QueryExecutionMode>::Values>>, { let debug_values = format!("{:?}", values); let query = diesel::insert_into(::table()).values(values); logger::debug!(query = %debug_query::(&query).to_string()); - execution_mode.insert(conn, query, debug_values).await + match query.get_result_async(conn).await.into_report() { + Ok(value) => Ok(value), + Err(err) => match err.current_context() { + ConnectionError::Query(DieselError::DatabaseError( + diesel::result::DatabaseErrorKind::UniqueViolation, + _, + )) => Err(err).change_context(errors::DatabaseError::UniqueViolation), + _ => Err(err).change_context(errors::DatabaseError::Others), + }, + } + .attach_printable_lazy(|| format!("Error while inserting {}", debug_values)) } #[instrument(level = "DEBUG", skip_all)] -pub async fn generic_update( +pub(super) async fn generic_update( conn: &PgPooledConn, predicate: P, values: V, - execution_mode: Q, -) -> CustomResult +) -> CustomResult where T: FilterDsl

+ HasTable

+ Table + 'static, V: AsChangeset>::Output as HasTable>::Table> + Debug, @@ -429,30 +70,26 @@ where <>::Output as IntoUpdateTarget>::WhereClause, ::Changeset, >: AsQuery + QueryFragment + QueryId + Send + 'static, - - Q: QueryExecutionMode< - UpdateStatement< - <>::Output as HasTable>::Table, - <>::Output as IntoUpdateTarget>::WhereClause, - ::Changeset, - >, - >, { let debug_values = format!("{:?}", values); let query = diesel::update(::table().filter(predicate)).set(values); logger::debug!(query = %debug_query::(&query).to_string()); - execution_mode.update(conn, query, debug_values).await + query + .execute_async(conn) + .await + .into_report() + .change_context(errors::DatabaseError::Others) + .attach_printable_lazy(|| format!("Error while updating {}", debug_values)) } #[instrument(level = "DEBUG", skip_all)] -pub async fn generic_update_with_results( +pub(super) async fn generic_update_with_results( conn: &PgPooledConn, predicate: P, values: V, - execution_mode: Q, -) -> CustomResult +) -> CustomResult, errors::DatabaseError> where T: FilterDsl

+ HasTable

+ Table + 'static, V: AsChangeset>::Output as HasTable>::Table> + Debug + 'static, @@ -463,32 +100,26 @@ where ::Changeset, >: AsQuery + LoadQuery<'static, PgConnection, R> + QueryFragment + Send, R: Send + 'static, - - Q: QueryExecutionMode< - UpdateStatement< - <>::Output as HasTable>::Table, - <>::Output as IntoUpdateTarget>::WhereClause, - ::Changeset, - >, - >, { let debug_values = format!("{:?}", values); let query = diesel::update(::table().filter(predicate)).set(values); logger::debug!(query = %debug_query::(&query).to_string()); - execution_mode - .update_with_results(conn, query, debug_values) + query + .get_results_async(conn) .await + .into_report() + .change_context(errors::DatabaseError::Others) + .attach_printable_lazy(|| format!("Error while updating {}", debug_values)) } #[instrument(level = "DEBUG", skip_all)] -pub async fn generic_update_by_id( +pub(super) async fn generic_update_by_id( conn: &PgPooledConn, id: Pk, values: V, - execution_mode: Q, -) -> CustomResult +) -> CustomResult where T: FindDsl + HasTable
+ LimitDsl + Table + 'static, V: AsChangeset>::Output as HasTable>::Table> + Debug, @@ -509,28 +140,33 @@ where <>::Output as IntoUpdateTarget>::WhereClause: Clone, ::Changeset: Clone, <<>::Output as HasTable>::Table as QuerySource>::FromClause: Clone, - - Q: QueryExecutionMode< - UpdateStatement< - <>::Output as HasTable>::Table, - <>::Output as IntoUpdateTarget>::WhereClause, - ::Changeset, - >, - >, { let debug_values = format!("{:?}", values); let query = diesel::update(::table().find(id.to_owned())).set(values); - execution_mode.update_by_id(conn, query, debug_values).await + match query.to_owned().get_result_async(conn).await { + Ok(result) => { + logger::debug!(query = %debug_query::(&query).to_string()); + Ok(result) + } + Err(ConnectionError::Query(DieselError::QueryBuilderError(_))) => { + generic_find_by_id_core::(conn, id).await + } + Err(ConnectionError::Query(DieselError::NotFound)) => { + Err(report!(errors::DatabaseError::NotFound)) + .attach_printable_lazy(|| format!("Error while updating by ID {}", debug_values)) + } + _ => Err(report!(errors::DatabaseError::Others)) + .attach_printable_lazy(|| format!("Error while updating by ID {}", debug_values)), + } } #[instrument(level = "DEBUG", skip_all)] -pub async fn generic_delete( +pub(super) async fn generic_delete( conn: &PgPooledConn, predicate: P, - execution_mode: Q, -) -> CustomResult +) -> CustomResult where T: FilterDsl

+ HasTable

+ Table + 'static, >::Output: IntoUpdateTarget, @@ -538,55 +174,33 @@ where <>::Output as HasTable>::Table, <>::Output as IntoUpdateTarget>::WhereClause, >: AsQuery + QueryFragment + QueryId + Send + 'static, - - Q: QueryExecutionMode< - DeleteStatement< - <>::Output as HasTable>::Table, - <>::Output as IntoUpdateTarget>::WhereClause, - >, - >, { let query = diesel::delete(::table().filter(predicate)); logger::debug!(query = %debug_query::(&query).to_string()); - execution_mode.delete(conn, query).await -} - -#[allow(dead_code)] -#[instrument(level = "DEBUG", skip_all)] -pub async fn generic_delete_with_results( - conn: &PgPooledConn, - predicate: P, - execution_mode: Q, -) -> CustomResult -where - T: FilterDsl

+ HasTable

+ Table + 'static, - >::Output: IntoUpdateTarget, - DeleteStatement< - <>::Output as HasTable>::Table, - <>::Output as IntoUpdateTarget>::WhereClause, - >: AsQuery + LoadQuery<'static, PgConnection, R> + QueryFragment + Send + 'static, - R: Send + 'static, - - Q: QueryExecutionMode< - DeleteStatement< - <>::Output as HasTable>::Table, - <>::Output as IntoUpdateTarget>::WhereClause, - >, - >, -{ - let query = diesel::delete(::table().filter(predicate)); - logger::debug!(query = %debug_query::(&query).to_string()); - - execution_mode.delete_with_results(conn, query).await + query + .execute_async(conn) + .await + .into_report() + .change_context(errors::DatabaseError::Others) + .attach_printable_lazy(|| "Error while deleting") + .and_then(|result| match result { + n if n > 0 => { + logger::debug!("{n} records deleted"); + Ok(true) + } + 0 => { + Err(report!(errors::DatabaseError::NotFound).attach_printable("No records deleted")) + } + _ => Ok(true), // n is usize, rustc requires this for exhaustive check + }) } #[instrument(level = "DEBUG", skip_all)] -pub async fn generic_delete_one_with_result( +pub(super) async fn generic_delete_one_with_result( conn: &PgPooledConn, predicate: P, - execution_mode: Q, -) -> CustomResult +) -> CustomResult where T: FilterDsl

+ HasTable

+ Table + 'static, >::Output: IntoUpdateTarget, @@ -595,18 +209,22 @@ where <>::Output as IntoUpdateTarget>::WhereClause, >: AsQuery + LoadQuery<'static, PgConnection, R> + QueryFragment + Send + 'static, R: Send + Clone + 'static, - - Q: QueryExecutionMode< - DeleteStatement< - <>::Output as HasTable>::Table, - <>::Output as IntoUpdateTarget>::WhereClause, - >, - >, { let query = diesel::delete(::table().filter(predicate)); logger::debug!(query = %debug_query::(&query).to_string()); - execution_mode.delete_one_with_result(conn, query).await + query + .get_results_async(conn) + .await + .into_report() + .change_context(errors::DatabaseError::Others) + .attach_printable_lazy(|| "Error while deleting") + .and_then(|result| { + result.first().cloned().ok_or_else(|| { + report!(errors::DatabaseError::NotFound) + .attach_printable("Object to be deleted does not exist") + }) + }) } #[instrument(level = "DEBUG", skip_all)] @@ -638,7 +256,7 @@ where } #[instrument(level = "DEBUG", skip_all)] -pub async fn generic_find_by_id( +pub(super) async fn generic_find_by_id( conn: &PgPooledConn, id: Pk, ) -> CustomResult @@ -654,7 +272,7 @@ where } #[instrument(level = "DEBUG", skip_all)] -pub async fn generic_find_by_id_optional( +pub(super) async fn generic_find_by_id_optional( conn: &PgPooledConn, id: Pk, ) -> CustomResult, errors::DatabaseError> @@ -699,7 +317,7 @@ where } #[instrument(level = "DEBUG", skip_all)] -pub async fn generic_find_one( +pub(super) async fn generic_find_one( conn: &PgPooledConn, predicate: P, ) -> CustomResult @@ -713,7 +331,7 @@ where } #[instrument(level = "DEBUG", skip_all)] -pub async fn generic_find_one_optional( +pub(super) async fn generic_find_one_optional( conn: &PgPooledConn, predicate: P, ) -> CustomResult, errors::DatabaseError> @@ -727,7 +345,7 @@ where } #[instrument(level = "DEBUG", skip_all)] -pub async fn generic_filter( +pub(super) async fn generic_filter( conn: &PgPooledConn, predicate: P, limit: Option, @@ -759,7 +377,7 @@ where .attach_printable_lazy(|| "Error filtering records by predicate") } -pub fn to_optional( +fn to_optional( arg: CustomResult, ) -> CustomResult, errors::DatabaseError> { match arg { diff --git a/crates/storage_models/src/query/locker_mock_up.rs b/crates/storage_models/src/query/locker_mock_up.rs index a634110bca..296c3a5695 100644 --- a/crates/storage_models/src/query/locker_mock_up.rs +++ b/crates/storage_models/src/query/locker_mock_up.rs @@ -1,7 +1,7 @@ use diesel::{associations::HasTable, ExpressionMethods}; use router_env::{tracing, tracing::instrument}; -use super::generics::{self, ExecuteQuery}; +use super::generics; use crate::{ errors, locker_mock_up::{LockerMockUp, LockerMockUpNew}, @@ -15,7 +15,7 @@ impl LockerMockUpNew { self, conn: &PgPooledConn, ) -> CustomResult { - generics::generic_insert::<_, _, LockerMockUp, _>(conn, self, ExecuteQuery::new()).await + generics::generic_insert(conn, self).await } } @@ -37,10 +37,9 @@ impl LockerMockUp { conn: &PgPooledConn, card_id: &str, ) -> CustomResult { - generics::generic_delete_one_with_result::<::Table, _, Self, _>( + generics::generic_delete_one_with_result::<::Table, _, _>( conn, dsl::card_id.eq(card_id.to_owned()), - ExecuteQuery::new(), ) .await } diff --git a/crates/storage_models/src/query/mandate.rs b/crates/storage_models/src/query/mandate.rs index 868e5644bd..e0c49b82d2 100644 --- a/crates/storage_models/src/query/mandate.rs +++ b/crates/storage_models/src/query/mandate.rs @@ -2,13 +2,13 @@ use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods}; use error_stack::report; use router_env::tracing::{self, instrument}; -use super::generics::{self, ExecuteQuery}; +use super::generics; use crate::{errors, mandate::*, schema::mandate::dsl, CustomResult, PgPooledConn}; impl MandateNew { #[instrument(skip(conn))] pub async fn insert(self, conn: &PgPooledConn) -> CustomResult { - generics::generic_insert::<_, _, Mandate, _>(conn, self, ExecuteQuery::new()).await + generics::generic_insert(conn, self).await } } @@ -48,13 +48,12 @@ impl Mandate { mandate_id: &str, mandate: MandateUpdate, ) -> CustomResult { - generics::generic_update_with_results::<::Table, _, _, Self, _>( + generics::generic_update_with_results::<::Table, _, _, _>( conn, dsl::merchant_id .eq(merchant_id.to_owned()) .and(dsl::mandate_id.eq(mandate_id.to_owned())), MandateUpdateInternal::from(mandate), - ExecuteQuery::new(), ) .await? .first() diff --git a/crates/storage_models/src/query/merchant_account.rs b/crates/storage_models/src/query/merchant_account.rs index 9aa3727076..13dd241838 100644 --- a/crates/storage_models/src/query/merchant_account.rs +++ b/crates/storage_models/src/query/merchant_account.rs @@ -1,7 +1,7 @@ use diesel::{associations::HasTable, ExpressionMethods}; use router_env::tracing::{self, instrument}; -use super::generics::{self, ExecuteQuery}; +use super::generics; use crate::{ errors, merchant_account::{ @@ -17,7 +17,7 @@ impl MerchantAccountNew { self, conn: &PgPooledConn, ) -> CustomResult { - generics::generic_insert::<_, _, MerchantAccount, _>(conn, self, ExecuteQuery::new()).await + generics::generic_insert(conn, self).await } } @@ -28,11 +28,10 @@ impl MerchantAccount { conn: &PgPooledConn, merchant_account: MerchantAccountUpdate, ) -> CustomResult { - match generics::generic_update_by_id::<::Table, _, _, Self, _>( + match generics::generic_update_by_id::<::Table, _, _, _>( conn, self.id, MerchantAccountUpdateInternal::from(merchant_account), - ExecuteQuery::new(), ) .await { @@ -48,10 +47,9 @@ impl MerchantAccount { conn: &PgPooledConn, merchant_id: &str, ) -> CustomResult { - generics::generic_delete::<::Table, _, _>( + generics::generic_delete::<::Table, _>( conn, dsl::merchant_id.eq(merchant_id.to_owned()), - ExecuteQuery::::new(), ) .await } diff --git a/crates/storage_models/src/query/merchant_connector_account.rs b/crates/storage_models/src/query/merchant_connector_account.rs index cb58d62df4..5293da6262 100644 --- a/crates/storage_models/src/query/merchant_connector_account.rs +++ b/crates/storage_models/src/query/merchant_connector_account.rs @@ -1,7 +1,7 @@ use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods}; use router_env::tracing::{self, instrument}; -use super::generics::{self, ExecuteQuery}; +use super::generics; use crate::{ errors, merchant_connector_account::{ @@ -18,12 +18,7 @@ impl MerchantConnectorAccountNew { self, conn: &PgPooledConn, ) -> CustomResult { - generics::generic_insert::<_, _, MerchantConnectorAccount, _>( - conn, - self, - ExecuteQuery::new(), - ) - .await + generics::generic_insert(conn, self).await } } @@ -34,11 +29,10 @@ impl MerchantConnectorAccount { conn: &PgPooledConn, merchant_connector_account: MerchantConnectorAccountUpdate, ) -> CustomResult { - match generics::generic_update_by_id::<::Table, _, _, Self, _>( + match generics::generic_update_by_id::<::Table, _, _, _>( conn, self.id, MerchantConnectorAccountUpdateInternal::from(merchant_connector_account), - ExecuteQuery::new(), ) .await { @@ -55,12 +49,11 @@ impl MerchantConnectorAccount { merchant_id: &str, merchant_connector_id: &i32, ) -> CustomResult { - generics::generic_delete::<::Table, _, _>( + generics::generic_delete::<::Table, _>( conn, dsl::merchant_id .eq(merchant_id.to_owned()) .and(dsl::merchant_connector_id.eq(merchant_connector_id.to_owned())), - ExecuteQuery::::new(), ) .await } diff --git a/crates/storage_models/src/query/payment_attempt.rs b/crates/storage_models/src/query/payment_attempt.rs index 51ae4446ab..c3ab39b2e4 100644 --- a/crates/storage_models/src/query/payment_attempt.rs +++ b/crates/storage_models/src/query/payment_attempt.rs @@ -2,7 +2,7 @@ use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods}; use error_stack::IntoReport; use router_env::tracing::{self, instrument}; -use super::generics::{self, ExecuteQuery, RawQuery, RawSqlQuery}; +use super::generics; use crate::{ enums, errors, payment_attempt::{ @@ -18,15 +18,7 @@ impl PaymentAttemptNew { self, conn: &PgPooledConn, ) -> CustomResult { - generics::generic_insert::<_, _, PaymentAttempt, _>(conn, self, ExecuteQuery::new()).await - } - - #[instrument(skip(conn))] - pub async fn insert_query( - self, - conn: &PgPooledConn, - ) -> CustomResult { - generics::generic_insert::<_, _, PaymentAttempt, _>(conn, self, RawQuery).await + generics::generic_insert(conn, self).await } } @@ -37,11 +29,12 @@ impl PaymentAttempt { conn: &PgPooledConn, payment_attempt: PaymentAttemptUpdate, ) -> CustomResult { - match generics::generic_update_by_id::<::Table, _, _, Self, _>( + match generics::generic_update_with_results::<::Table, _, _, _>( conn, - self.id, + dsl::payment_id + .eq(self.payment_id.to_owned()) + .and(dsl::merchant_id.eq(self.merchant_id.to_owned())), PaymentAttemptUpdateInternal::from(payment_attempt), - ExecuteQuery::new(), ) .await { @@ -49,25 +42,12 @@ impl PaymentAttempt { errors::DatabaseError::NoFieldsToUpdate => Ok(self), _ => Err(error), }, - result => result, + Ok(mut payment_attempts) => payment_attempts + .pop() + .ok_or(error_stack::report!(errors::DatabaseError::NotFound)), } } - #[instrument(skip(conn))] - pub async fn update_query( - self, - conn: &PgPooledConn, - payment_attempt: PaymentAttemptUpdate, - ) -> CustomResult { - generics::generic_update_by_id::<::Table, _, _, Self, _>( - conn, - self.id, - PaymentAttemptUpdateInternal::from(payment_attempt), - RawQuery, - ) - .await - } - #[instrument(skip(conn))] pub async fn find_by_payment_id_merchant_id( conn: &PgPooledConn, diff --git a/crates/storage_models/src/query/payment_intent.rs b/crates/storage_models/src/query/payment_intent.rs index 4d0db3e8b2..28451f4fff 100644 --- a/crates/storage_models/src/query/payment_intent.rs +++ b/crates/storage_models/src/query/payment_intent.rs @@ -1,7 +1,7 @@ use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods}; use router_env::tracing::{self, instrument}; -use super::generics::{self, ExecuteQuery, RawQuery, RawSqlQuery}; +use super::generics; use crate::{ errors, payment_intent::{ @@ -17,15 +17,7 @@ impl PaymentIntentNew { self, conn: &PgPooledConn, ) -> CustomResult { - generics::generic_insert::<_, _, PaymentIntent, _>(conn, self, ExecuteQuery::new()).await - } - - #[instrument(skip(conn))] - pub async fn insert_query( - self, - conn: &PgPooledConn, - ) -> CustomResult { - generics::generic_insert::<_, _, PaymentIntent, _>(conn, self, RawQuery).await + generics::generic_insert(conn, self).await } } @@ -36,11 +28,12 @@ impl PaymentIntent { conn: &PgPooledConn, payment_intent: PaymentIntentUpdate, ) -> CustomResult { - match generics::generic_update_by_id::<::Table, _, _, Self, _>( + match generics::generic_update_with_results::<::Table, _, _, _>( conn, - self.id, + dsl::payment_id + .eq(self.payment_id.to_owned()) + .and(dsl::merchant_id.eq(self.merchant_id.to_owned())), PaymentIntentUpdateInternal::from(payment_intent), - ExecuteQuery::new(), ) .await { @@ -48,25 +41,12 @@ impl PaymentIntent { errors::DatabaseError::NoFieldsToUpdate => Ok(self), _ => Err(error), }, - result => result, + Ok(mut payment_intents) => payment_intents + .pop() + .ok_or(error_stack::report!(errors::DatabaseError::NotFound)), } } - #[instrument(skip(conn))] - pub async fn update_query( - self, - conn: &PgPooledConn, - payment_intent: PaymentIntentUpdate, - ) -> CustomResult { - generics::generic_update_by_id::<::Table, _, _, Self, _>( - conn, - self.id, - PaymentIntentUpdateInternal::from(payment_intent), - RawQuery, - ) - .await - } - #[instrument(skip(conn))] pub async fn find_by_payment_id_merchant_id( conn: &PgPooledConn, diff --git a/crates/storage_models/src/query/payment_method.rs b/crates/storage_models/src/query/payment_method.rs index f14be34421..e425324ddf 100644 --- a/crates/storage_models/src/query/payment_method.rs +++ b/crates/storage_models/src/query/payment_method.rs @@ -1,8 +1,7 @@ use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods}; -use error_stack::ResultExt; use router_env::tracing::{self, instrument}; -use super::generics::{self, ExecuteQuery}; +use super::generics; use crate::{ errors, payment_method::{PaymentMethod, PaymentMethodNew}, @@ -16,7 +15,7 @@ impl PaymentMethodNew { self, conn: &PgPooledConn, ) -> CustomResult { - generics::generic_insert::<_, _, PaymentMethod, _>(conn, self, ExecuteQuery::new()).await + generics::generic_insert(conn, self).await } } @@ -26,15 +25,11 @@ impl PaymentMethod { conn: &PgPooledConn, payment_method_id: String, ) -> CustomResult { - let result = - generics::generic_delete_one_with_result::<::Table, _, Self, _>( - conn, - dsl::payment_method_id.eq(payment_method_id), - ExecuteQuery::new(), - ) - .await - .attach_printable("Error while deleting by payment method ID")?; - Ok(result) + generics::generic_delete_one_with_result::<::Table, _, Self>( + conn, + dsl::payment_method_id.eq(payment_method_id), + ) + .await } #[instrument(skip(conn))] @@ -43,17 +38,13 @@ impl PaymentMethod { merchant_id: &str, payment_method_id: &str, ) -> CustomResult { - let result = - generics::generic_delete_one_with_result::<::Table, _, Self, _>( - conn, - dsl::merchant_id - .eq(merchant_id.to_owned()) - .and(dsl::payment_method_id.eq(payment_method_id.to_owned())), - ExecuteQuery::new(), - ) - .await?; - - Ok(result) + generics::generic_delete_one_with_result::<::Table, _, Self>( + conn, + dsl::merchant_id + .eq(merchant_id.to_owned()) + .and(dsl::payment_method_id.eq(payment_method_id.to_owned())), + ) + .await } #[instrument(skip(conn))] diff --git a/crates/storage_models/src/query/process_tracker.rs b/crates/storage_models/src/query/process_tracker.rs index b4e8901f88..7fe2a05c52 100644 --- a/crates/storage_models/src/query/process_tracker.rs +++ b/crates/storage_models/src/query/process_tracker.rs @@ -2,7 +2,7 @@ use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods}; use router_env::tracing::{self, instrument}; use time::PrimitiveDateTime; -use super::generics::{self, ExecuteQuery}; +use super::generics; use crate::{ enums, errors, process_tracker::{ @@ -18,7 +18,7 @@ impl ProcessTrackerNew { self, conn: &PgPooledConn, ) -> CustomResult { - generics::generic_insert::<_, _, ProcessTracker, _>(conn, self, ExecuteQuery::new()).await + generics::generic_insert(conn, self).await } } @@ -29,11 +29,10 @@ impl ProcessTracker { conn: &PgPooledConn, process: ProcessTrackerUpdate, ) -> CustomResult { - match generics::generic_update_by_id::<::Table, _, _, Self, _>( + match generics::generic_update_by_id::<::Table, _, _, _>( conn, self.id.clone(), ProcessTrackerUpdateInternal::from(process), - ExecuteQuery::new(), ) .await { @@ -51,11 +50,10 @@ impl ProcessTracker { task_ids: Vec, task_update: ProcessTrackerUpdate, ) -> CustomResult { - generics::generic_update::<::Table, _, _, _>( + generics::generic_update::<::Table, _, _>( conn, dsl::id.eq_any(task_ids), ProcessTrackerUpdateInternal::from(task_update), - ExecuteQuery::::new(), ) .await } @@ -118,7 +116,7 @@ impl ProcessTracker { ids: Vec, schedule_time: PrimitiveDateTime, ) -> CustomResult { - generics::generic_update::<::Table, _, _, _>( + generics::generic_update::<::Table, _, _>( conn, dsl::status .eq(enums::ProcessTrackerStatus::ProcessStarted) @@ -127,7 +125,6 @@ impl ProcessTracker { dsl::status.eq(enums::ProcessTrackerStatus::Processing), dsl::schedule_time.eq(schedule_time), ), - ExecuteQuery::::new(), ) .await } diff --git a/crates/storage_models/src/query/refund.rs b/crates/storage_models/src/query/refund.rs index 371dd972a4..f1630a022a 100644 --- a/crates/storage_models/src/query/refund.rs +++ b/crates/storage_models/src/query/refund.rs @@ -1,7 +1,7 @@ use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods}; use router_env::{tracing, tracing::instrument}; -use super::generics::{self, ExecuteQuery}; +use super::generics; use crate::{ errors, refund::{Refund, RefundNew, RefundUpdate, RefundUpdateInternal}, @@ -14,7 +14,7 @@ use crate::{ impl RefundNew { #[instrument(skip(conn))] pub async fn insert(self, conn: &PgPooledConn) -> CustomResult { - generics::generic_insert::<_, _, Refund, _>(conn, self, ExecuteQuery::new()).await + generics::generic_insert(conn, self).await } } @@ -25,11 +25,10 @@ impl Refund { conn: &PgPooledConn, refund: RefundUpdate, ) -> CustomResult { - match generics::generic_update_by_id::<::Table, _, _, Self, _>( + match generics::generic_update_by_id::<::Table, _, _, _>( conn, self.id, RefundUpdateInternal::from(refund), - ExecuteQuery::new(), ) .await { diff --git a/crates/storage_models/src/query/temp_card.rs b/crates/storage_models/src/query/temp_card.rs index caca43d31e..9aaa52e2cb 100644 --- a/crates/storage_models/src/query/temp_card.rs +++ b/crates/storage_models/src/query/temp_card.rs @@ -1,7 +1,7 @@ use diesel::{associations::HasTable, ExpressionMethods}; use router_env::tracing::{self, instrument}; -use super::generics::{self, ExecuteQuery}; +use super::generics; use crate::{ errors, schema::temp_card::dsl, @@ -15,7 +15,7 @@ impl TempCardNew { self, conn: &PgPooledConn, ) -> CustomResult { - generics::generic_insert::<_, _, TempCard, _>(conn, self, ExecuteQuery::new()).await + generics::generic_insert(conn, self).await } } @@ -25,7 +25,7 @@ impl TempCard { self, conn: &PgPooledConn, ) -> CustomResult { - generics::generic_insert::<_, _, TempCard, _>(conn, self, ExecuteQuery::new()).await + generics::generic_insert(conn, self).await } #[instrument(skip(conn))]