From 88d65a62fc81f217ade71b2d4903d3bbe85e5c94 Mon Sep 17 00:00:00 2001 From: Sampras Lopes Date: Mon, 21 Aug 2023 19:23:03 +0530 Subject: [PATCH] feat(storage_impl): split payment intent interface implementation (#1946) --- Cargo.lock | 20 + config/development.toml | 4 +- crates/api_models/src/payments.rs | 9 +- crates/data_models/Cargo.toml | 29 + crates/data_models/README.md | 3 + crates/data_models/src/errors.rs | 32 + crates/data_models/src/lib.rs | 25 + crates/data_models/src/mandates.rs | 19 + crates/data_models/src/payments.rs | 2 + .../src/payments/payment_attempt.rs | 276 ++++++ .../src/payments/payment_intent.rs | 424 +++++++++ crates/router/Cargo.toml | 7 +- crates/router/src/connector/zen.rs | 11 +- crates/router/src/core/admin.rs | 21 +- crates/router/src/core/errors.rs | 64 +- crates/router/src/core/errors/utils.rs | 36 + crates/router/src/core/payments.rs | 13 +- crates/router/src/core/payments/helpers.rs | 54 +- crates/router/src/core/payments/operations.rs | 8 +- .../operations/payment_complete_authorize.rs | 2 +- .../payments/operations/payment_confirm.rs | 2 +- .../payments/operations/payment_create.rs | 2 +- .../operations/payment_method_validate.rs | 2 +- .../payments/operations/payment_session.rs | 2 +- .../core/payments/operations/payment_start.rs | 2 +- .../payments/operations/payment_status.rs | 18 +- .../payments/operations/payment_update.rs | 2 +- crates/router/src/db.rs | 3 +- crates/router/src/db/payment_attempt.rs | 25 +- crates/router/src/db/payment_intent.rs | 428 +-------- .../src/types/domain/merchant_account.rs | 16 +- crates/router/src/types/storage.rs | 9 +- crates/router/src/types/storage/enums.rs | 1 + .../src/types/storage/payment_intent.rs | 191 ---- crates/router/src/utils/db_utils.rs | 6 +- crates/storage_impl/Cargo.toml | 9 +- crates/storage_impl/src/database/store.rs | 4 +- crates/storage_impl/src/lib.rs | 56 ++ crates/storage_impl/src/metrics.rs | 6 + crates/storage_impl/src/payments.rs | 3 + .../src/payments/payment_attempt.rs | 156 +++ .../src/payments/payment_intent.rs | 898 ++++++++++++++++++ crates/storage_impl/src/utils.rs | 70 ++ 43 files changed, 2280 insertions(+), 690 deletions(-) create mode 100644 crates/data_models/Cargo.toml create mode 100644 crates/data_models/README.md create mode 100644 crates/data_models/src/errors.rs create mode 100644 crates/data_models/src/lib.rs create mode 100644 crates/data_models/src/mandates.rs create mode 100644 crates/data_models/src/payments.rs create mode 100644 crates/data_models/src/payments/payment_attempt.rs create mode 100644 crates/data_models/src/payments/payment_intent.rs delete mode 100644 crates/router/src/types/storage/payment_intent.rs create mode 100644 crates/storage_impl/src/metrics.rs create mode 100644 crates/storage_impl/src/payments/payment_attempt.rs create mode 100644 crates/storage_impl/src/payments/payment_intent.rs create mode 100644 crates/storage_impl/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index e8f0d09863..26538c4f86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1668,6 +1668,22 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "data_models" +version = "0.1.0" +dependencies = [ + "api_models", + "async-trait", + "common_enums", + "common_utils", + "error-stack", + "serde", + "serde_json", + "strum 0.25.0", + "thiserror", + "time 0.3.22", +] + [[package]] name = "deadpool" version = "0.9.5" @@ -4014,6 +4030,7 @@ dependencies = [ "clap", "common_utils", "config", + "data_models", "derive_deref", "diesel", "diesel_models", @@ -4663,15 +4680,18 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" name = "storage_impl" version = "0.1.0" dependencies = [ + "api_models", "async-bb8-diesel", "async-trait", "bb8", "common_utils", "crc32fast", + "data_models", "diesel", "diesel_models", "dyn-clone", "error-stack", + "futures", "masking", "moka", "once_cell", diff --git a/config/development.toml b/config/development.toml index 64502d6e40..1d72dc11da 100644 --- a/config/development.toml +++ b/config/development.toml @@ -22,8 +22,8 @@ pool_size = 5 connection_timeout = 10 [replica_database] -username = "replica_user" -password = "replica_pass" +username = "db_user" +password = "db_pass" host = "localhost" port = 5432 dbname = "hyperswitch_db" diff --git a/crates/api_models/src/payments.rs b/crates/api_models/src/payments.rs index 1afe76e4ec..9efa70da0a 100644 --- a/crates/api_models/src/payments.rs +++ b/crates/api_models/src/payments.rs @@ -13,7 +13,10 @@ use url::Url; use utoipa::ToSchema; use crate::{ - admin, disputes, enums as api_enums, ephemeral_key::EphemeralKeyCreateResponse, refunds, + admin, disputes, + enums::{self as api_enums}, + ephemeral_key::EphemeralKeyCreateResponse, + refunds, }; #[derive(Clone, Copy, Debug, Eq, PartialEq)] @@ -1933,12 +1936,12 @@ pub struct PaymentListFilterConstraints { /// The identifier for payment pub payment_id: Option, /// The starting point within a list of objects, limit on number of object will be some constant for join query - pub offset: Option, + pub offset: Option, /// The time range for which objects are needed. TimeRange has two fields start_time and end_time from which objects can be filtered as per required scenarios (created_at, time less than, greater than etc). #[serde(flatten)] pub time_range: Option, /// The list of connectors to filter payments list - pub connector: Option>, + pub connector: Option>, /// The list of currencies to filter payments list pub currency: Option>, /// The list of payment statuses to filter payments list diff --git a/crates/data_models/Cargo.toml b/crates/data_models/Cargo.toml new file mode 100644 index 0000000000..e0b47bf129 --- /dev/null +++ b/crates/data_models/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "data_models" +description = "Represents the data/domain models used by the business layer" +version = "0.1.0" +edition.workspace = true +rust-version.workspace = true +readme = "README.md" +license.workspace = true + +[features] +default = ["olap", "oltp"] +oltp = [] +olap = [] + +[dependencies] +# First party deps +api_models = { version = "0.1.0", path = "../api_models" } +common_enums = { version = "0.1.0", path = "../common_enums" } +common_utils = { version = "0.1.0", path = "../common_utils" } + + +# Third party deps +async-trait = "0.1.68" +error-stack = "0.3.1" +serde = { version = "1.0.163", features = ["derive"] } +serde_json = "1.0.96" +strum = { version = "0.25", features = [ "derive" ] } +thiserror = "1.0.40" +time = { version = "0.3.21", features = ["serde", "serde-well-known", "std"] } \ No newline at end of file diff --git a/crates/data_models/README.md b/crates/data_models/README.md new file mode 100644 index 0000000000..0c1c517055 --- /dev/null +++ b/crates/data_models/README.md @@ -0,0 +1,3 @@ +# Data models + +Represents the data/domain models used by the business/domain layer \ No newline at end of file diff --git a/crates/data_models/src/errors.rs b/crates/data_models/src/errors.rs new file mode 100644 index 0000000000..263538847e --- /dev/null +++ b/crates/data_models/src/errors.rs @@ -0,0 +1,32 @@ +#[derive(Debug, thiserror::Error)] +pub enum StorageError { + // TODO: deprecate this error type to use a domain error instead + #[error("DatabaseError: {0:?}")] + DatabaseError(String), + #[error("ValueNotFound: {0}")] + ValueNotFound(String), + #[error("DuplicateValue: {entity} already exists {key:?}")] + DuplicateValue { + entity: &'static str, + key: Option, + }, + #[error("Timed out while trying to connect to the database")] + DatabaseConnectionError, + #[error("KV error")] + KVError, + #[error("Serialization failure")] + SerializationFailed, + #[error("MockDb error")] + MockDbError, + #[error("Customer with this id is Redacted")] + CustomerRedacted, + #[error("Deserialization failure")] + DeserializationFailed, + #[error("Error while encrypting data")] + EncryptionError, + #[error("Error while decrypting data from database")] + DecryptionError, + // TODO: deprecate this error type to use a domain error instead + #[error("RedisError: {0:?}")] + RedisError(String), +} diff --git a/crates/data_models/src/lib.rs b/crates/data_models/src/lib.rs new file mode 100644 index 0000000000..a4376be88f --- /dev/null +++ b/crates/data_models/src/lib.rs @@ -0,0 +1,25 @@ +pub mod errors; +pub mod mandates; +pub mod payments; + +// TODO: This decision about using KV mode or not, +// should be taken at a top level rather than pushing it down to individual functions via an enum. +#[derive( + Clone, + Copy, + Debug, + Default, + Eq, + PartialEq, + serde::Deserialize, + serde::Serialize, + strum::Display, + strum::EnumString, +)] +#[serde(rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] +pub enum MerchantStorageScheme { + #[default] + PostgresOnly, + RedisKv, +} diff --git a/crates/data_models/src/mandates.rs b/crates/data_models/src/mandates.rs new file mode 100644 index 0000000000..6ea40a286e --- /dev/null +++ b/crates/data_models/src/mandates.rs @@ -0,0 +1,19 @@ +use common_enums::Currency; +use common_utils::pii; +use time::PrimitiveDateTime; + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum MandateDataType { + SingleUse(MandateAmountData), + MultiUse(Option), +} + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)] +pub struct MandateAmountData { + pub amount: i64, + pub currency: Currency, + pub start_date: Option, + pub end_date: Option, + pub metadata: Option, +} diff --git a/crates/data_models/src/payments.rs b/crates/data_models/src/payments.rs new file mode 100644 index 0000000000..5e8e103301 --- /dev/null +++ b/crates/data_models/src/payments.rs @@ -0,0 +1,2 @@ +pub mod payment_attempt; +pub mod payment_intent; diff --git a/crates/data_models/src/payments/payment_attempt.rs b/crates/data_models/src/payments/payment_attempt.rs new file mode 100644 index 0000000000..d6b9d48f03 --- /dev/null +++ b/crates/data_models/src/payments/payment_attempt.rs @@ -0,0 +1,276 @@ +use common_enums as storage_enums; +use serde::{Deserialize, Serialize}; +use time::PrimitiveDateTime; + +use super::payment_intent::PaymentIntent; +use crate::{errors, mandates::MandateDataType, MerchantStorageScheme}; + +#[async_trait::async_trait] +pub trait PaymentAttemptInterface { + async fn insert_payment_attempt( + &self, + payment_attempt: PaymentAttemptNew, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result; + + async fn update_payment_attempt_with_attempt_id( + &self, + this: PaymentAttempt, + payment_attempt: PaymentAttemptUpdate, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result; + + async fn find_payment_attempt_by_connector_transaction_id_payment_id_merchant_id( + &self, + connector_transaction_id: &str, + payment_id: &str, + merchant_id: &str, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result; + + async fn find_payment_attempt_last_successful_attempt_by_payment_id_merchant_id( + &self, + payment_id: &str, + merchant_id: &str, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result; + + async fn find_payment_attempt_by_merchant_id_connector_txn_id( + &self, + merchant_id: &str, + connector_txn_id: &str, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result; + + async fn find_payment_attempt_by_payment_id_merchant_id_attempt_id( + &self, + payment_id: &str, + merchant_id: &str, + attempt_id: &str, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result; + + async fn find_payment_attempt_by_attempt_id_merchant_id( + &self, + attempt_id: &str, + merchant_id: &str, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result; + + async fn find_payment_attempt_by_preprocessing_id_merchant_id( + &self, + preprocessing_id: &str, + merchant_id: &str, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result; + + async fn find_attempts_by_merchant_id_payment_id( + &self, + merchant_id: &str, + payment_id: &str, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result, errors::StorageError>; + + async fn get_filters_for_payments( + &self, + pi: &[PaymentIntent], + merchant_id: &str, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result; +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct PaymentAttempt { + pub id: i32, + pub payment_id: String, + pub merchant_id: String, + pub attempt_id: String, + pub status: storage_enums::AttemptStatus, + pub amount: i64, + pub currency: Option, + pub save_to_locker: Option, + pub connector: Option, + pub error_message: Option, + pub offer_amount: Option, + pub surcharge_amount: Option, + pub tax_amount: Option, + pub payment_method_id: Option, + pub payment_method: Option, + pub connector_transaction_id: Option, + pub capture_method: Option, + #[serde(default, with = "common_utils::custom_serde::iso8601::option")] + pub capture_on: Option, + pub confirm: bool, + pub authentication_type: Option, + #[serde(with = "common_utils::custom_serde::iso8601")] + pub created_at: PrimitiveDateTime, + #[serde(with = "common_utils::custom_serde::iso8601")] + pub modified_at: PrimitiveDateTime, + #[serde(default, with = "common_utils::custom_serde::iso8601::option")] + pub last_synced: Option, + pub cancellation_reason: Option, + pub amount_to_capture: Option, + pub mandate_id: Option, + pub browser_info: Option, + pub error_code: Option, + pub payment_token: Option, + pub connector_metadata: Option, + pub payment_experience: Option, + pub payment_method_type: Option, + pub payment_method_data: Option, + pub business_sub_label: Option, + pub straight_through_algorithm: Option, + pub preprocessing_step_id: Option, + // providing a location to store mandate details intermediately for transaction + pub mandate_details: Option, + pub error_reason: Option, + pub multiple_capture_count: Option, + // reference to the payment at connector side + pub connector_response_reference_id: Option, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct PaymentListFilters { + pub connector: Vec, + pub currency: Vec, + pub status: Vec, + pub payment_method: Vec, +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct PaymentAttemptNew { + pub payment_id: String, + pub merchant_id: String, + pub attempt_id: String, + pub status: storage_enums::AttemptStatus, + pub amount: i64, + pub currency: Option, + // pub auto_capture: Option, + pub save_to_locker: Option, + pub connector: Option, + pub error_message: Option, + pub offer_amount: Option, + pub surcharge_amount: Option, + pub tax_amount: Option, + pub payment_method_id: Option, + pub payment_method: Option, + pub capture_method: Option, + #[serde(default, with = "common_utils::custom_serde::iso8601::option")] + pub capture_on: Option, + pub confirm: bool, + pub authentication_type: Option, + #[serde(default, with = "common_utils::custom_serde::iso8601::option")] + pub created_at: Option, + #[serde(default, with = "common_utils::custom_serde::iso8601::option")] + pub modified_at: Option, + #[serde(default, with = "common_utils::custom_serde::iso8601::option")] + pub last_synced: Option, + pub cancellation_reason: Option, + pub amount_to_capture: Option, + pub mandate_id: Option, + pub browser_info: Option, + pub payment_token: Option, + pub error_code: Option, + pub connector_metadata: Option, + pub payment_experience: Option, + pub payment_method_type: Option, + pub payment_method_data: Option, + pub business_sub_label: Option, + pub straight_through_algorithm: Option, + pub preprocessing_step_id: Option, + pub mandate_details: Option, + pub error_reason: Option, + pub connector_response_reference_id: Option, + pub multiple_capture_count: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum PaymentAttemptUpdate { + Update { + amount: i64, + currency: storage_enums::Currency, + status: storage_enums::AttemptStatus, + authentication_type: Option, + payment_method: Option, + payment_token: Option, + payment_method_data: Option, + payment_method_type: Option, + payment_experience: Option, + business_sub_label: Option, + amount_to_capture: Option, + capture_method: Option, + }, + UpdateTrackers { + payment_token: Option, + connector: Option, + straight_through_algorithm: Option, + }, + AuthenticationTypeUpdate { + authentication_type: storage_enums::AuthenticationType, + }, + ConfirmUpdate { + amount: i64, + currency: storage_enums::Currency, + status: storage_enums::AttemptStatus, + authentication_type: Option, + payment_method: Option, + browser_info: Option, + connector: Option, + payment_token: Option, + payment_method_data: Option, + payment_method_type: Option, + payment_experience: Option, + business_sub_label: Option, + straight_through_algorithm: Option, + }, + VoidUpdate { + status: storage_enums::AttemptStatus, + cancellation_reason: Option, + }, + ResponseUpdate { + status: storage_enums::AttemptStatus, + connector: Option, + connector_transaction_id: Option, + authentication_type: Option, + payment_method_id: Option>, + mandate_id: Option, + connector_metadata: Option, + payment_token: Option, + error_code: Option>, + error_message: Option>, + error_reason: Option>, + connector_response_reference_id: Option, + }, + UnresolvedResponseUpdate { + status: storage_enums::AttemptStatus, + connector: Option, + connector_transaction_id: Option, + payment_method_id: Option>, + error_code: Option>, + error_message: Option>, + error_reason: Option>, + connector_response_reference_id: Option, + }, + StatusUpdate { + status: storage_enums::AttemptStatus, + }, + ErrorUpdate { + connector: Option, + status: storage_enums::AttemptStatus, + error_code: Option>, + error_message: Option>, + error_reason: Option>, + }, + MultipleCaptureUpdate { + status: Option, + multiple_capture_count: Option, + }, + PreprocessingUpdate { + status: storage_enums::AttemptStatus, + payment_method_id: Option>, + connector_metadata: Option, + preprocessing_step_id: Option, + connector_transaction_id: Option, + connector_response_reference_id: Option, + }, +} diff --git a/crates/data_models/src/payments/payment_intent.rs b/crates/data_models/src/payments/payment_intent.rs new file mode 100644 index 0000000000..cb31c8139e --- /dev/null +++ b/crates/data_models/src/payments/payment_intent.rs @@ -0,0 +1,424 @@ +use common_enums as storage_enums; +use common_utils::pii; +use serde::{Deserialize, Serialize}; +use time::PrimitiveDateTime; + +use crate::{errors, MerchantStorageScheme}; + +#[async_trait::async_trait] +pub trait PaymentIntentInterface { + async fn update_payment_intent( + &self, + this: PaymentIntent, + payment_intent: PaymentIntentUpdate, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result; + + async fn insert_payment_intent( + &self, + new: PaymentIntentNew, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result; + + async fn find_payment_intent_by_payment_id_merchant_id( + &self, + payment_id: &str, + merchant_id: &str, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result; + + #[cfg(feature = "olap")] + async fn filter_payment_intent_by_constraints( + &self, + merchant_id: &str, + filters: &PaymentIntentFetchConstraints, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result, errors::StorageError>; + + #[cfg(feature = "olap")] + async fn filter_payment_intents_by_time_range_constraints( + &self, + merchant_id: &str, + time_range: &api_models::payments::TimeRange, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result, errors::StorageError>; + + #[cfg(feature = "olap")] + async fn get_filtered_payment_intents_attempt( + &self, + merchant_id: &str, + constraints: &PaymentIntentFetchConstraints, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result< + Vec<(PaymentIntent, super::payment_attempt::PaymentAttempt)>, + errors::StorageError, + >; +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct PaymentIntent { + pub id: i32, + pub payment_id: String, + pub merchant_id: String, + pub status: storage_enums::IntentStatus, + pub amount: i64, + pub currency: Option, + pub amount_captured: Option, + pub customer_id: Option, + pub description: Option, + pub return_url: Option, + pub metadata: Option, + pub connector_id: Option, + pub shipping_address_id: Option, + pub billing_address_id: Option, + pub statement_descriptor_name: Option, + pub statement_descriptor_suffix: Option, + #[serde(with = "common_utils::custom_serde::iso8601")] + pub created_at: PrimitiveDateTime, + #[serde(with = "common_utils::custom_serde::iso8601")] + pub modified_at: PrimitiveDateTime, + #[serde(default, with = "common_utils::custom_serde::iso8601::option")] + pub last_synced: Option, + pub setup_future_usage: Option, + pub off_session: Option, + pub client_secret: Option, + pub active_attempt_id: String, + pub business_country: storage_enums::CountryAlpha2, + pub business_label: String, + pub order_details: Option>, + pub allowed_payment_method_types: Option, + pub connector_metadata: Option, + pub feature_metadata: Option, + pub attempt_count: i16, +} + +#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +pub struct PaymentIntentNew { + pub payment_id: String, + pub merchant_id: String, + pub status: storage_enums::IntentStatus, + pub amount: i64, + pub currency: Option, + pub amount_captured: Option, + pub customer_id: Option, + pub description: Option, + pub return_url: Option, + pub metadata: Option, + pub connector_id: Option, + pub shipping_address_id: Option, + pub billing_address_id: Option, + pub statement_descriptor_name: Option, + pub statement_descriptor_suffix: Option, + #[serde(default, with = "common_utils::custom_serde::iso8601::option")] + pub created_at: Option, + #[serde(default, with = "common_utils::custom_serde::iso8601::option")] + pub modified_at: Option, + #[serde(default, with = "common_utils::custom_serde::iso8601::option")] + pub last_synced: Option, + pub setup_future_usage: Option, + pub off_session: Option, + pub client_secret: Option, + pub active_attempt_id: String, + pub business_country: storage_enums::CountryAlpha2, + pub business_label: String, + pub order_details: Option>, + pub allowed_payment_method_types: Option, + pub connector_metadata: Option, + pub feature_metadata: Option, + pub attempt_count: i16, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum PaymentIntentUpdate { + ResponseUpdate { + status: storage_enums::IntentStatus, + amount_captured: Option, + return_url: Option, + }, + MetadataUpdate { + metadata: pii::SecretSerdeValue, + }, + ReturnUrlUpdate { + return_url: Option, + status: Option, + customer_id: Option, + shipping_address_id: Option, + billing_address_id: Option, + }, + MerchantStatusUpdate { + status: storage_enums::IntentStatus, + shipping_address_id: Option, + billing_address_id: Option, + }, + PGStatusUpdate { + status: storage_enums::IntentStatus, + }, + Update { + amount: i64, + currency: storage_enums::Currency, + setup_future_usage: Option, + status: storage_enums::IntentStatus, + customer_id: Option, + shipping_address_id: Option, + billing_address_id: Option, + return_url: Option, + business_country: Option, + business_label: Option, + description: Option, + statement_descriptor_name: Option, + statement_descriptor_suffix: Option, + order_details: Option>, + metadata: Option, + }, + PaymentAttemptAndAttemptCountUpdate { + active_attempt_id: String, + attempt_count: i16, + }, + StatusAndAttemptUpdate { + status: storage_enums::IntentStatus, + active_attempt_id: String, + attempt_count: i16, + }, +} + +#[derive(Clone, Debug, Default)] +pub struct PaymentIntentUpdateInternal { + pub amount: Option, + pub currency: Option, + pub status: Option, + pub amount_captured: Option, + pub customer_id: Option, + pub return_url: Option, + pub setup_future_usage: Option, + pub off_session: Option, + pub metadata: Option, + pub billing_address_id: Option, + pub shipping_address_id: Option, + pub modified_at: Option, + pub active_attempt_id: Option, + pub business_country: Option, + pub business_label: Option, + pub description: Option, + pub statement_descriptor_name: Option, + pub statement_descriptor_suffix: Option, + pub order_details: Option>, + pub attempt_count: Option, +} + +impl PaymentIntentUpdate { + pub fn apply_changeset(self, source: PaymentIntent) -> PaymentIntent { + let internal_update: PaymentIntentUpdateInternal = self.into(); + PaymentIntent { + amount: internal_update.amount.unwrap_or(source.amount), + currency: internal_update.currency.or(source.currency), + status: internal_update.status.unwrap_or(source.status), + amount_captured: internal_update.amount_captured.or(source.amount_captured), + customer_id: internal_update.customer_id.or(source.customer_id), + return_url: internal_update.return_url.or(source.return_url), + setup_future_usage: internal_update + .setup_future_usage + .or(source.setup_future_usage), + off_session: internal_update.off_session.or(source.off_session), + metadata: internal_update.metadata.or(source.metadata), + billing_address_id: internal_update + .billing_address_id + .or(source.billing_address_id), + shipping_address_id: internal_update + .shipping_address_id + .or(source.shipping_address_id), + modified_at: common_utils::date_time::now(), + order_details: internal_update.order_details.or(source.order_details), + ..source + } + } +} + +impl From for PaymentIntentUpdateInternal { + fn from(payment_intent_update: PaymentIntentUpdate) -> Self { + match payment_intent_update { + PaymentIntentUpdate::Update { + amount, + currency, + setup_future_usage, + status, + customer_id, + shipping_address_id, + billing_address_id, + return_url, + business_country, + business_label, + description, + statement_descriptor_name, + statement_descriptor_suffix, + order_details, + metadata, + } => Self { + amount: Some(amount), + currency: Some(currency), + status: Some(status), + setup_future_usage, + customer_id, + shipping_address_id, + billing_address_id, + modified_at: Some(common_utils::date_time::now()), + return_url, + business_country, + business_label, + description, + statement_descriptor_name, + statement_descriptor_suffix, + order_details, + metadata, + ..Default::default() + }, + PaymentIntentUpdate::MetadataUpdate { metadata } => Self { + metadata: Some(metadata), + modified_at: Some(common_utils::date_time::now()), + ..Default::default() + }, + PaymentIntentUpdate::ReturnUrlUpdate { + return_url, + status, + customer_id, + shipping_address_id, + billing_address_id, + } => Self { + return_url, + status, + customer_id, + shipping_address_id, + billing_address_id, + modified_at: Some(common_utils::date_time::now()), + ..Default::default() + }, + PaymentIntentUpdate::PGStatusUpdate { status } => Self { + status: Some(status), + modified_at: Some(common_utils::date_time::now()), + ..Default::default() + }, + PaymentIntentUpdate::MerchantStatusUpdate { + status, + shipping_address_id, + billing_address_id, + } => Self { + status: Some(status), + shipping_address_id, + billing_address_id, + modified_at: Some(common_utils::date_time::now()), + ..Default::default() + }, + PaymentIntentUpdate::ResponseUpdate { + // amount, + // currency, + status, + amount_captured, + // customer_id, + return_url, + } => Self { + // amount, + // currency: Some(currency), + status: Some(status), + amount_captured, + // customer_id, + return_url, + modified_at: Some(common_utils::date_time::now()), + ..Default::default() + }, + PaymentIntentUpdate::PaymentAttemptAndAttemptCountUpdate { + active_attempt_id, + attempt_count, + } => Self { + active_attempt_id: Some(active_attempt_id), + attempt_count: Some(attempt_count), + ..Default::default() + }, + PaymentIntentUpdate::StatusAndAttemptUpdate { + status, + active_attempt_id, + attempt_count, + } => Self { + status: Some(status), + active_attempt_id: Some(active_attempt_id), + attempt_count: Some(attempt_count), + ..Default::default() + }, + } + } +} + +pub enum PaymentIntentFetchConstraints { + Single { + payment_intent_id: String, + }, + List { + offset: Option, + starting_at: Option, + ending_at: Option, + connector: Option>, + currency: Option>, + status: Option>, + payment_methods: Option>, + customer_id: Option, + starting_after_id: Option, + ending_before_id: Option, + limit: Option, + }, +} + +impl From for PaymentIntentFetchConstraints { + fn from(value: api_models::payments::PaymentListConstraints) -> Self { + Self::List { + offset: None, + starting_at: value.created_gte.or(value.created_gt).or(value.created), + ending_at: value.created_lte.or(value.created_lt).or(value.created), + connector: None, + currency: None, + status: None, + payment_methods: None, + customer_id: value.customer_id, + starting_after_id: value.starting_after, + ending_before_id: value.ending_before, + limit: None, + } + } +} + +impl From for PaymentIntentFetchConstraints { + fn from(value: api_models::payments::TimeRange) -> Self { + Self::List { + offset: None, + starting_at: Some(value.start_time), + ending_at: value.end_time, + connector: None, + currency: None, + status: None, + payment_methods: None, + customer_id: None, + starting_after_id: None, + ending_before_id: None, + limit: None, + } + } +} + +impl From for PaymentIntentFetchConstraints { + fn from(value: api_models::payments::PaymentListFilterConstraints) -> Self { + if let Some(payment_intent_id) = value.payment_id { + Self::Single { payment_intent_id } + } else { + Self::List { + offset: value.offset, + starting_at: value.time_range.map(|t| t.start_time), + ending_at: value.time_range.and_then(|t| t.end_time), + connector: value.connector, + currency: value.currency, + status: value.status, + payment_methods: value.payment_methods, + customer_id: None, + starting_after_id: None, + ending_before_id: None, + limit: None, + } + } + } +} diff --git a/crates/router/Cargo.toml b/crates/router/Cargo.toml index b9f374efca..f0af3ad7f7 100644 --- a/crates/router/Cargo.toml +++ b/crates/router/Cargo.toml @@ -16,8 +16,8 @@ email = ["external_services/email", "dep:aws-config"] basilisk = ["kms"] stripe = ["dep:serde_qs"] release = ["kms", "stripe", "basilisk", "s3", "email"] -olap = [] -oltp = [] +olap = ["data_models/olap", "storage_impl/olap"] +oltp = ["data_models/oltp", "storage_impl/oltp"] kv_store = [] accounts_cache = [] openapi = ["olap", "oltp", "payouts"] @@ -94,7 +94,8 @@ redis_interface = { version = "0.1.0", path = "../redis_interface" } router_derive = { version = "0.1.0", path = "../router_derive" } router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] } diesel_models = { version = "0.1.0", path = "../diesel_models", features = ["kv_store"] } -storage_impl = { version = "0.1.0", path = "../storage_impl"} +data_models = { version = "0.1.0", path = "../data_models", default-features = false } +storage_impl = { version = "0.1.0", path = "../storage_impl", default-features = false } [target.'cfg(not(target_os = "windows"))'.dependencies] signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] } diff --git a/crates/router/src/connector/zen.rs b/crates/router/src/connector/zen.rs index 2cce8cf838..679d2eff8c 100644 --- a/crates/router/src/connector/zen.rs +++ b/crates/router/src/connector/zen.rs @@ -546,10 +546,13 @@ impl api::IncomingWebhook for Zen { .body .parse_struct("ZenWebhookBody") .change_context(errors::ConnectorError::WebhookSignatureNotFound)?; - let msg = webhook_body.merchant_transaction_id - + &webhook_body.currency - + &webhook_body.amount - + &webhook_body.status.to_string().to_uppercase(); + let msg = format!( + "{}{}{}{}", + webhook_body.merchant_transaction_id, + webhook_body.currency, + webhook_body.amount, + webhook_body.status.to_string().to_uppercase() + ); Ok(msg.into_bytes()) } diff --git a/crates/router/src/core/admin.rs b/crates/router/src/core/admin.rs index 843c500615..d0d460036b 100644 --- a/crates/router/src/core/admin.rs +++ b/crates/router/src/core/admin.rs @@ -4,7 +4,7 @@ use common_utils::{ date_time, ext_traits::{Encode, ValueExt}, }; -use diesel_models::enums; +use data_models::MerchantStorageScheme; use error_stack::{report, FutureExt, ResultExt}; use masking::{PeekInterface, Secret}; use uuid::Uuid; @@ -141,7 +141,7 @@ pub async fn create_merchant_account( publishable_key, locker_id: req.locker_id, metadata: req.metadata, - storage_scheme: diesel_models::enums::MerchantStorageScheme::PostgresOnly, + storage_scheme: MerchantStorageScheme::PostgresOnly, primary_business_details, created_at: date_time::now(), modified_at: date_time::now(), @@ -742,23 +742,24 @@ pub async fn kv_for_merchant( .to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound)?; let updated_merchant_account = match (enable, merchant_account.storage_scheme) { - (true, enums::MerchantStorageScheme::RedisKv) - | (false, enums::MerchantStorageScheme::PostgresOnly) => Ok(merchant_account), - (true, enums::MerchantStorageScheme::PostgresOnly) => { + (true, MerchantStorageScheme::RedisKv) | (false, MerchantStorageScheme::PostgresOnly) => { + Ok(merchant_account) + } + (true, MerchantStorageScheme::PostgresOnly) => { db.update_merchant( merchant_account, storage::MerchantAccountUpdate::StorageSchemeUpdate { - storage_scheme: enums::MerchantStorageScheme::RedisKv, + storage_scheme: MerchantStorageScheme::RedisKv, }, &key_store, ) .await } - (false, enums::MerchantStorageScheme::RedisKv) => { + (false, MerchantStorageScheme::RedisKv) => { db.update_merchant( merchant_account, storage::MerchantAccountUpdate::StorageSchemeUpdate { - storage_scheme: enums::MerchantStorageScheme::PostgresOnly, + storage_scheme: MerchantStorageScheme::PostgresOnly, }, &key_store, ) @@ -772,7 +773,7 @@ pub async fn kv_for_merchant( })?; let kv_status = matches!( updated_merchant_account.storage_scheme, - enums::MerchantStorageScheme::RedisKv + MerchantStorageScheme::RedisKv ); Ok(service_api::ApplicationResponse::Json( @@ -800,7 +801,7 @@ pub async fn check_merchant_account_kv_status( let kv_status = matches!( merchant_account.storage_scheme, - enums::MerchantStorageScheme::RedisKv + MerchantStorageScheme::RedisKv ); Ok(service_api::ApplicationResponse::Json( diff --git a/crates/router/src/core/errors.rs b/crates/router/src/core/errors.rs index c0c06f8316..a2644878ba 100644 --- a/crates/router/src/core/errors.rs +++ b/crates/router/src/core/errors.rs @@ -6,10 +6,11 @@ pub mod utils; use std::fmt::Display; use actix_web::{body::BoxBody, http::StatusCode, ResponseError}; +use common_utils::errors::ErrorSwitch; pub use common_utils::errors::{CustomResult, ParsingError, ValidationError}; use config::ConfigError; +pub use data_models::errors::StorageError as DataStorageError; use diesel_models::errors as storage_errors; -use error_stack; pub use redis_interface::errors::RedisError; use router_env::opentelemetry::metrics::MetricsError; @@ -80,6 +81,67 @@ pub enum StorageError { RedisError(error_stack::Report), } +impl ErrorSwitch for StorageError { + fn switch(&self) -> DataStorageError { + self.into() + } +} + +#[allow(clippy::from_over_into)] +impl Into for &StorageError { + fn into(self) -> DataStorageError { + match self { + StorageError::DatabaseError(i) => match i.current_context() { + storage_errors::DatabaseError::DatabaseConnectionError => { + DataStorageError::DatabaseConnectionError + } + // TODO: Update this error type to encompass & propagate the missing type (instead of generic `db value not found`) + storage_errors::DatabaseError::NotFound => { + DataStorageError::ValueNotFound(String::from("db value not found")) + } + // TODO: Update this error type to encompass & propagate the duplicate type (instead of generic `db value not found`) + storage_errors::DatabaseError::UniqueViolation => { + DataStorageError::DuplicateValue { + entity: "db entity", + key: None, + } + } + storage_errors::DatabaseError::NoFieldsToUpdate => { + DataStorageError::DatabaseError("No fields to update".to_string()) + } + storage_errors::DatabaseError::QueryGenerationFailed => { + DataStorageError::DatabaseError("Query generation failed".to_string()) + } + storage_errors::DatabaseError::Others => { + DataStorageError::DatabaseError("Unknown database error".to_string()) + } + }, + StorageError::ValueNotFound(i) => DataStorageError::ValueNotFound(i.clone()), + StorageError::DuplicateValue { entity, key } => DataStorageError::DuplicateValue { + entity, + key: key.clone(), + }, + StorageError::DatabaseConnectionError => DataStorageError::DatabaseConnectionError, + StorageError::KVError => DataStorageError::KVError, + StorageError::SerializationFailed => DataStorageError::SerializationFailed, + StorageError::MockDbError => DataStorageError::MockDbError, + StorageError::CustomerRedacted => DataStorageError::CustomerRedacted, + StorageError::DeserializationFailed => DataStorageError::DeserializationFailed, + StorageError::EncryptionError => DataStorageError::EncryptionError, + StorageError::DecryptionError => DataStorageError::DecryptionError, + StorageError::RedisError(i) => match i.current_context() { + // TODO: Update this error type to encompass & propagate the missing type (instead of generic `redis value not found`) + RedisError::NotFound => { + DataStorageError::ValueNotFound("redis value not found".to_string()) + } + RedisError::JsonSerializationFailed => DataStorageError::SerializationFailed, + RedisError::JsonDeserializationFailed => DataStorageError::DeserializationFailed, + i => DataStorageError::RedisError(format!("{:?}", i)), + }, + } + } +} + impl From> for StorageError { fn from(err: error_stack::Report) -> Self { Self::RedisError(err) diff --git a/crates/router/src/core/errors/utils.rs b/crates/router/src/core/errors/utils.rs index c18330182d..4f50655e71 100644 --- a/crates/router/src/core/errors/utils.rs +++ b/crates/router/src/core/errors/utils.rs @@ -10,6 +10,41 @@ pub trait StorageErrorExt { fn to_duplicate_response(self, duplicate_response: E) -> error_stack::Result; } +impl StorageErrorExt + for error_stack::Result +{ + #[track_caller] + fn to_not_found_response( + self, + not_found_response: errors::ApiErrorResponse, + ) -> error_stack::Result { + self.map_err(|err| { + let new_err = match err.current_context() { + data_models::errors::StorageError::ValueNotFound(_) => not_found_response, + data_models::errors::StorageError::CustomerRedacted => { + errors::ApiErrorResponse::CustomerRedacted + } + _ => errors::ApiErrorResponse::InternalServerError, + }; + err.change_context(new_err) + }) + } + + #[track_caller] + fn to_duplicate_response( + self, + duplicate_response: errors::ApiErrorResponse, + ) -> error_stack::Result { + self.map_err(|err| { + let new_err = match err.current_context() { + data_models::errors::StorageError::DuplicateValue { .. } => duplicate_response, + _ => errors::ApiErrorResponse::InternalServerError, + }; + err.change_context(new_err) + }) + } +} + impl StorageErrorExt for error_stack::Result { @@ -31,6 +66,7 @@ impl StorageErrorExt }) } + #[track_caller] fn to_duplicate_response( self, duplicate_response: errors::ApiErrorResponse, diff --git a/crates/router/src/core/payments.rs b/crates/router/src/core/payments.rs index 845cdbce5e..d8617f4231 100644 --- a/crates/router/src/core/payments.rs +++ b/crates/router/src/core/payments.rs @@ -1406,16 +1406,21 @@ pub async fn apply_filters_on_payments( merchant: domain::MerchantAccount, constraints: api::PaymentListFilterConstraints, ) -> RouterResponse { + use storage_impl::DataModelExt; + use crate::types::transformers::ForeignFrom; let list: Vec<(storage::PaymentIntent, storage::PaymentAttempt)> = db - .apply_filters_on_payments_list( + .get_filtered_payment_intents_attempt( &merchant.merchant_id, - &constraints, + &constraints.clone().into(), merchant.storage_scheme, ) .await - .to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?; + .to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)? + .into_iter() + .map(|(pi, pa)| (pi, pa.to_storage_model())) + .collect(); let data: Vec = list.into_iter().map(ForeignFrom::foreign_from).collect(); @@ -1447,7 +1452,7 @@ pub async fn get_filters_for_payments( let filters = db .get_filters_for_payments( - &pi, + pi.as_slice(), &merchant.merchant_id, // since OLAP doesn't have KV. Force to get the data from PSQL. storage_enums::MerchantStorageScheme::PostgresOnly, diff --git a/crates/router/src/core/payments/helpers.rs b/crates/router/src/core/payments/helpers.rs index 649e883058..98b63dc3a1 100644 --- a/crates/router/src/core/payments/helpers.rs +++ b/crates/router/src/core/payments/helpers.rs @@ -5,7 +5,8 @@ use common_utils::{ ext_traits::{AsyncExt, ByteSliceExt, ValueExt}, fp_utils, generate_id, pii, }; -use diesel_models::{enums, payment_attempt, payment_intent}; +use data_models::payments::payment_intent::PaymentIntent; +use diesel_models::{enums, payment_attempt::PaymentAttempt}; // TODO : Evaluate all the helper functions () use error_stack::{report, IntoReport, ResultExt}; use josekit::jwe; @@ -76,7 +77,7 @@ pub fn create_identity_from_certificate_and_key( pub fn filter_mca_based_on_business_details( merchant_connector_accounts: Vec, - payment_intent: Option<&diesel_models::payment_intent::PaymentIntent>, + payment_intent: Option<&PaymentIntent>, ) -> Vec { if let Some(payment_intent) = payment_intent { merchant_connector_accounts @@ -631,8 +632,8 @@ pub fn validate_customer_id_mandatory_cases( pub fn create_startpay_url( server: &Server, - payment_attempt: &storage::PaymentAttempt, - payment_intent: &storage::PaymentIntent, + payment_attempt: &PaymentAttempt, + payment_intent: &PaymentIntent, ) -> String { format!( "{}/payments/redirect/{}/{}/{}", @@ -645,7 +646,7 @@ pub fn create_startpay_url( pub fn create_redirect_url( router_base_url: &String, - payment_attempt: &storage::PaymentAttempt, + payment_attempt: &PaymentAttempt, connector_name: &String, creds_identifier: Option<&str>, ) -> String { @@ -668,7 +669,7 @@ pub fn create_webhook_url( } pub fn create_complete_authorize_url( router_base_url: &String, - payment_attempt: &storage::PaymentAttempt, + payment_attempt: &PaymentAttempt, connector_name: &String, ) -> String { format!( @@ -773,7 +774,7 @@ pub fn payment_intent_status_fsm( pub async fn add_domain_task_to_pt( operation: &Op, state: &AppState, - payment_attempt: &storage::PaymentAttempt, + payment_attempt: &PaymentAttempt, requeue: bool, schedule_time: Option, ) -> CustomResult<(), errors::ApiErrorResponse> @@ -1346,8 +1347,8 @@ pub async fn make_pm_data<'a, F: Clone, R>( pub async fn store_in_vault_and_generate_ppmt( state: &AppState, payment_method_data: &api_models::payments::PaymentMethodData, - payment_intent: &payment_intent::PaymentIntent, - payment_attempt: &payment_attempt::PaymentAttempt, + payment_intent: &PaymentIntent, + payment_attempt: &PaymentAttempt, payment_method: enums::PaymentMethod, ) -> RouterResult { let router_token = vault::Vault::store_payment_method_data_in_locker( @@ -1664,9 +1665,13 @@ pub(super) async fn filter_by_constraints( constraints: &api::PaymentListConstraints, merchant_id: &str, storage_scheme: storage_enums::MerchantStorageScheme, -) -> CustomResult, errors::StorageError> { +) -> CustomResult, errors::DataStorageError> { let result = db - .filter_payment_intent_by_constraints(merchant_id, constraints, storage_scheme) + .filter_payment_intent_by_constraints( + merchant_id, + &constraints.clone().into(), + storage_scheme, + ) .await?; Ok(result) } @@ -1964,7 +1969,7 @@ pub fn generate_mandate( // A function to manually authenticate the client secret with intent fulfillment time pub(crate) fn authenticate_client_secret( request_client_secret: Option<&String>, - payment_intent: &payment_intent::PaymentIntent, + payment_intent: &PaymentIntent, merchant_intent_fulfillment_time: Option, ) -> Result<(), errors::ApiErrorResponse> { match (request_client_secret, &payment_intent.client_secret) { @@ -2034,7 +2039,7 @@ pub async fn verify_payment_intent_time_and_client_secret( db: &dyn StorageInterface, merchant_account: &domain::MerchantAccount, client_secret: Option, -) -> error_stack::Result, errors::ApiErrorResponse> { +) -> error_stack::Result, errors::ApiErrorResponse> { client_secret .async_map(|cs| async move { let payment_id = get_payment_id_from_client_secret(&cs)?; @@ -2164,11 +2169,12 @@ pub(crate) fn get_payment_id_from_client_secret(cs: &str) -> RouterResult( } pub fn get_attempt_type( - payment_intent: &storage::PaymentIntent, - payment_attempt: &storage::PaymentAttempt, + payment_intent: &PaymentIntent, + payment_attempt: &PaymentAttempt, request: &api::PaymentsRequest, action: &str, ) -> RouterResult { @@ -2567,7 +2573,7 @@ impl AttemptType { #[inline(always)] fn make_new_payment_attempt( payment_method_data: &Option, - old_payment_attempt: storage::PaymentAttempt, + old_payment_attempt: PaymentAttempt, new_attempt_count: i16, ) -> storage::PaymentAttemptNew { let created_at @ modified_at @ last_synced = Some(common_utils::date_time::now()); @@ -2634,11 +2640,11 @@ impl AttemptType { pub async fn modify_payment_intent_and_payment_attempt( &self, request: &api::PaymentsRequest, - fetched_payment_intent: storage::PaymentIntent, - fetched_payment_attempt: storage::PaymentAttempt, + fetched_payment_intent: PaymentIntent, + fetched_payment_attempt: PaymentAttempt, db: &dyn StorageInterface, storage_scheme: storage::enums::MerchantStorageScheme, - ) -> RouterResult<(storage::PaymentIntent, storage::PaymentAttempt)> { + ) -> RouterResult<(PaymentIntent, PaymentAttempt)> { match self { Self::SameOld => Ok((fetched_payment_intent, fetched_payment_attempt)), Self::New => { @@ -2680,7 +2686,7 @@ impl AttemptType { pub async fn get_connector_response( &self, - payment_attempt: &storage::PaymentAttempt, + payment_attempt: &PaymentAttempt, db: &dyn StorageInterface, storage_scheme: storage::enums::MerchantStorageScheme, ) -> RouterResult { @@ -2911,7 +2917,7 @@ pub async fn get_additional_payment_data( } pub fn validate_customer_access( - payment_intent: &storage::PaymentIntent, + payment_intent: &PaymentIntent, auth_flow: services::AuthFlow, request: &api::PaymentsRequest, ) -> Result<(), errors::ApiErrorResponse> { diff --git a/crates/router/src/core/payments/operations.rs b/crates/router/src/core/payments/operations.rs index 1ba878be27..c697693c45 100644 --- a/crates/router/src/core/payments/operations.rs +++ b/crates/router/src/core/payments/operations.rs @@ -134,7 +134,7 @@ pub trait Domain: Send + Sync { merchant_account: &domain::MerchantAccount, state: &AppState, request: &R, - payment_intent: &storage::payment_intent::PaymentIntent, + payment_intent: &storage::PaymentIntent, mechant_key_store: &domain::MerchantKeyStore, ) -> CustomResult; } @@ -208,7 +208,7 @@ where _merchant_account: &domain::MerchantAccount, state: &AppState, _request: &api::PaymentsRetrieveRequest, - _payment_intent: &storage::payment_intent::PaymentIntent, + _payment_intent: &storage::PaymentIntent, _merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult { helpers::get_connector_default(state, None).await @@ -278,7 +278,7 @@ where _merchant_account: &domain::MerchantAccount, state: &AppState, _request: &api::PaymentsCaptureRequest, - _payment_intent: &storage::payment_intent::PaymentIntent, + _payment_intent: &storage::PaymentIntent, _merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult { helpers::get_connector_default(state, None).await @@ -336,7 +336,7 @@ where _merchant_account: &domain::MerchantAccount, state: &AppState, _request: &api::PaymentsCancelRequest, - _payment_intent: &storage::payment_intent::PaymentIntent, + _payment_intent: &storage::PaymentIntent, _merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult { helpers::get_connector_default(state, None).await diff --git a/crates/router/src/core/payments/operations/payment_complete_authorize.rs b/crates/router/src/core/payments/operations/payment_complete_authorize.rs index bab7c2cfc0..f0e2700859 100644 --- a/crates/router/src/core/payments/operations/payment_complete_authorize.rs +++ b/crates/router/src/core/payments/operations/payment_complete_authorize.rs @@ -312,7 +312,7 @@ impl Domain for CompleteAuthorize { _merchant_account: &domain::MerchantAccount, state: &AppState, request: &api::PaymentsRequest, - _payment_intent: &storage::payment_intent::PaymentIntent, + _payment_intent: &storage::PaymentIntent, _key_store: &domain::MerchantKeyStore, ) -> CustomResult { // Use a new connector in the confirm call or use the same one which was passed when diff --git a/crates/router/src/core/payments/operations/payment_confirm.rs b/crates/router/src/core/payments/operations/payment_confirm.rs index 3349e9170f..9469cea970 100644 --- a/crates/router/src/core/payments/operations/payment_confirm.rs +++ b/crates/router/src/core/payments/operations/payment_confirm.rs @@ -362,7 +362,7 @@ impl Domain for PaymentConfirm { _merchant_account: &domain::MerchantAccount, state: &AppState, request: &api::PaymentsRequest, - _payment_intent: &storage::payment_intent::PaymentIntent, + _payment_intent: &storage::PaymentIntent, _key_store: &domain::MerchantKeyStore, ) -> CustomResult { // Use a new connector in the confirm call or use the same one which was passed when diff --git a/crates/router/src/core/payments/operations/payment_create.rs b/crates/router/src/core/payments/operations/payment_create.rs index 810f526150..da0d08c07c 100644 --- a/crates/router/src/core/payments/operations/payment_create.rs +++ b/crates/router/src/core/payments/operations/payment_create.rs @@ -334,7 +334,7 @@ impl Domain for PaymentCreate { _merchant_account: &domain::MerchantAccount, state: &AppState, request: &api::PaymentsRequest, - _payment_intent: &storage::payment_intent::PaymentIntent, + _payment_intent: &storage::PaymentIntent, _merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult { helpers::get_connector_default(state, request.routing.clone()).await diff --git a/crates/router/src/core/payments/operations/payment_method_validate.rs b/crates/router/src/core/payments/operations/payment_method_validate.rs index d0860fd27e..dedacba5f3 100644 --- a/crates/router/src/core/payments/operations/payment_method_validate.rs +++ b/crates/router/src/core/payments/operations/payment_method_validate.rs @@ -293,7 +293,7 @@ where _merchant_account: &domain::MerchantAccount, state: &AppState, _request: &api::VerifyRequest, - _payment_intent: &storage::payment_intent::PaymentIntent, + _payment_intent: &storage::PaymentIntent, _mechant_key_store: &domain::MerchantKeyStore, ) -> CustomResult { helpers::get_connector_default(state, None).await diff --git a/crates/router/src/core/payments/operations/payment_session.rs b/crates/router/src/core/payments/operations/payment_session.rs index f795e4b439..4fecc586dd 100644 --- a/crates/router/src/core/payments/operations/payment_session.rs +++ b/crates/router/src/core/payments/operations/payment_session.rs @@ -309,7 +309,7 @@ where merchant_account: &domain::MerchantAccount, state: &AppState, request: &api::PaymentsSessionRequest, - payment_intent: &storage::payment_intent::PaymentIntent, + payment_intent: &storage::PaymentIntent, key_store: &domain::MerchantKeyStore, ) -> RouterResult { let db = &state.store; diff --git a/crates/router/src/core/payments/operations/payment_start.rs b/crates/router/src/core/payments/operations/payment_start.rs index 03eb5294b1..9dab74ee19 100644 --- a/crates/router/src/core/payments/operations/payment_start.rs +++ b/crates/router/src/core/payments/operations/payment_start.rs @@ -275,7 +275,7 @@ where _merchant_account: &domain::MerchantAccount, state: &AppState, _request: &api::PaymentsStartRequest, - _payment_intent: &storage::payment_intent::PaymentIntent, + _payment_intent: &storage::PaymentIntent, _mechant_key_store: &domain::MerchantKeyStore, ) -> CustomResult { helpers::get_connector_default(state, None).await diff --git a/crates/router/src/core/payments/operations/payment_status.rs b/crates/router/src/core/payments/operations/payment_status.rs index bbb9b72765..30a2ac24c4 100644 --- a/crates/router/src/core/payments/operations/payment_status.rs +++ b/crates/router/src/core/payments/operations/payment_status.rs @@ -2,7 +2,7 @@ use std::marker::PhantomData; use api_models::enums::CancelTransaction; use async_trait::async_trait; -use common_utils::ext_traits::AsyncExt; +use common_utils::{errors::ReportSwitchExt, ext_traits::AsyncExt}; use error_stack::ResultExt; use router_derive::PaymentOperation; use router_env::{instrument, tracing}; @@ -106,7 +106,7 @@ impl Domain for PaymentStatus { _merchant_account: &domain::MerchantAccount, state: &AppState, request: &api::PaymentsRequest, - _payment_intent: &storage::payment_intent::PaymentIntent, + _payment_intent: &storage::PaymentIntent, _key_store: &domain::MerchantKeyStore, ) -> CustomResult { helpers::get_connector_default(state, request.routing.clone()).await @@ -411,7 +411,8 @@ pub async fn get_payment_intent_payment_attempt( pi.active_attempt_id.as_str(), storage_scheme, ) - .await?; + .await + .switch()?; } api_models::payments::PaymentIdType::ConnectorTransactionId(ref id) => { pa = db @@ -420,7 +421,8 @@ pub async fn get_payment_intent_payment_attempt( id, storage_scheme, ) - .await?; + .await + .switch()?; pi = db .find_payment_intent_by_payment_id_merchant_id( pa.payment_id.as_str(), @@ -432,7 +434,8 @@ pub async fn get_payment_intent_payment_attempt( api_models::payments::PaymentIdType::PaymentAttemptId(ref id) => { pa = db .find_payment_attempt_by_attempt_id_merchant_id(id, merchant_id, storage_scheme) - .await?; + .await + .switch()?; pi = db .find_payment_intent_by_payment_id_merchant_id( pa.payment_id.as_str(), @@ -448,7 +451,8 @@ pub async fn get_payment_intent_payment_attempt( merchant_id, storage_scheme, ) - .await?; + .await + .switch()?; pi = db .find_payment_intent_by_payment_id_merchant_id( @@ -459,7 +463,7 @@ pub async fn get_payment_intent_payment_attempt( .await?; } } - Ok((pi, pa)) + error_stack::Result::<_, errors::DataStorageError>::Ok((pi, pa)) })() .await .to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) diff --git a/crates/router/src/core/payments/operations/payment_update.rs b/crates/router/src/core/payments/operations/payment_update.rs index 61b9afce8b..0935fd587b 100644 --- a/crates/router/src/core/payments/operations/payment_update.rs +++ b/crates/router/src/core/payments/operations/payment_update.rs @@ -403,7 +403,7 @@ impl Domain for PaymentUpdate { _merchant_account: &domain::MerchantAccount, state: &AppState, request: &api::PaymentsRequest, - _payment_intent: &storage::payment_intent::PaymentIntent, + _payment_intent: &storage::PaymentIntent, _key_store: &domain::MerchantKeyStore, ) -> CustomResult { helpers::get_connector_default(state, request.routing.clone()).await diff --git a/crates/router/src/db.rs b/crates/router/src/db.rs index 69b62f84ab..15001a54fa 100644 --- a/crates/router/src/db.rs +++ b/crates/router/src/db.rs @@ -29,6 +29,7 @@ pub mod reverse_lookup; use std::sync::Arc; +use data_models::payments::payment_intent::PaymentIntentInterface; use futures::lock::Mutex; use masking::PeekInterface; use storage_impl::redis::kv_store::RedisConnInterface; @@ -64,7 +65,7 @@ pub trait StorageInterface: + merchant_connector_account::ConnectorAccessToken + merchant_connector_account::MerchantConnectorAccountInterface + payment_attempt::PaymentAttemptInterface - + payment_intent::PaymentIntentInterface + + PaymentIntentInterface + payment_method::PaymentMethodInterface + payout_attempt::PayoutAttemptInterface + payouts::PayoutsInterface diff --git a/crates/router/src/db/payment_attempt.rs b/crates/router/src/db/payment_attempt.rs index 52180cd15e..6fe4250504 100644 --- a/crates/router/src/db/payment_attempt.rs +++ b/crates/router/src/db/payment_attempt.rs @@ -72,7 +72,7 @@ pub trait PaymentAttemptInterface { async fn get_filters_for_payments( &self, - pi: &[diesel_models::payment_intent::PaymentIntent], + pi: &[types::PaymentIntent], merchant_id: &str, storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; @@ -81,6 +81,7 @@ pub trait PaymentAttemptInterface { #[cfg(not(feature = "kv_store"))] mod storage { use error_stack::IntoReport; + use storage_impl::DataModelExt; use super::PaymentAttemptInterface; use crate::{ @@ -193,13 +194,18 @@ mod storage { async fn get_filters_for_payments( &self, - pi: &[diesel_models::payment_intent::PaymentIntent], + pi: &[data_models::payments::payment_intent::PaymentIntent], merchant_id: &str, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let conn = connection::pg_connection_read(self).await?; - PaymentAttempt::get_filters_for_payments(&conn, pi, merchant_id) + let intents = pi + .iter() + .cloned() + .map(|pi| pi.to_storage_model()) + .collect::>(); + PaymentAttempt::get_filters_for_payments(&conn, intents.as_slice(), merchant_id) .await .map_err(Into::into) .into_report() @@ -267,7 +273,7 @@ impl PaymentAttemptInterface for MockDb { async fn get_filters_for_payments( &self, - _pi: &[diesel_models::payment_intent::PaymentIntent], + _pi: &[data_models::payments::payment_intent::PaymentIntent], _merchant_id: &str, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult @@ -430,7 +436,7 @@ mod storage { use diesel_models::reverse_lookup::ReverseLookup; use error_stack::{IntoReport, ResultExt}; use redis_interface::HsetnxReply; - use storage_impl::redis::kv_store::RedisConnInterface; + use storage_impl::{redis::kv_store::RedisConnInterface, DataModelExt}; use super::PaymentAttemptInterface; use crate::{ @@ -898,13 +904,18 @@ mod storage { async fn get_filters_for_payments( &self, - pi: &[diesel_models::payment_intent::PaymentIntent], + pi: &[data_models::payments::payment_intent::PaymentIntent], merchant_id: &str, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let conn = connection::pg_connection_read(self).await?; - PaymentAttempt::get_filters_for_payments(&conn, pi, merchant_id) + let intents = pi + .iter() + .cloned() + .map(|pi| pi.to_storage_model()) + .collect::>(); + PaymentAttempt::get_filters_for_payments(&conn, intents.as_slice(), merchant_id) .await .map_err(Into::into) .into_report() diff --git a/crates/router/src/db/payment_intent.rs b/crates/router/src/db/payment_intent.rs index 0d807ce85d..64ab05cf84 100644 --- a/crates/router/src/db/payment_intent.rs +++ b/crates/router/src/db/payment_intent.rs @@ -1,3 +1,11 @@ +use data_models::payments::payment_intent::{ + PaymentIntent, PaymentIntentInterface, PaymentIntentNew, +}; +#[cfg(feature = "olap")] +use data_models::payments::{ + payment_attempt::PaymentAttempt, payment_intent::PaymentIntentFetchConstraints, +}; + use super::MockDb; #[cfg(feature = "olap")] use crate::types::api; @@ -6,406 +14,17 @@ use crate::{ types::storage::{self as types, enums}, }; -#[async_trait::async_trait] -pub trait PaymentIntentInterface { - async fn update_payment_intent( - &self, - this: types::PaymentIntent, - payment_intent: types::PaymentIntentUpdate, - storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult; - - async fn insert_payment_intent( - &self, - new: types::PaymentIntentNew, - storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult; - - async fn find_payment_intent_by_payment_id_merchant_id( - &self, - payment_id: &str, - merchant_id: &str, - storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult; - - #[cfg(feature = "olap")] - async fn filter_payment_intent_by_constraints( - &self, - merchant_id: &str, - pc: &api::PaymentListConstraints, - storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult, errors::StorageError>; - - #[cfg(feature = "olap")] - async fn filter_payment_intents_by_time_range_constraints( - &self, - merchant_id: &str, - time_range: &api::TimeRange, - storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult, errors::StorageError>; - - #[cfg(feature = "olap")] - async fn apply_filters_on_payments_list( - &self, - merchant_id: &str, - constraints: &api::PaymentListFilterConstraints, - storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult, errors::StorageError>; -} - -#[cfg(feature = "kv_store")] -mod storage { - use common_utils::date_time; - use error_stack::{IntoReport, ResultExt}; - use redis_interface::HsetnxReply; - use storage_impl::redis::kv_store::RedisConnInterface; - - use super::PaymentIntentInterface; - #[cfg(feature = "olap")] - use crate::types::api; - use crate::{ - connection, - core::errors::{self, CustomResult}, - services::Store, - types::storage::{enums, kv, payment_intent::*}, - utils::{self, db_utils, storage_partitioning}, - }; - - #[async_trait::async_trait] - impl PaymentIntentInterface for Store { - async fn insert_payment_intent( - &self, - new: PaymentIntentNew, - storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { - match storage_scheme { - enums::MerchantStorageScheme::PostgresOnly => { - let conn = connection::pg_connection_write(self).await?; - new.insert(&conn).await.map_err(Into::into).into_report() - } - - enums::MerchantStorageScheme::RedisKv => { - let key = format!("{}_{}", new.merchant_id, new.payment_id); - let created_intent = PaymentIntent { - id: 0i32, - payment_id: new.payment_id.clone(), - merchant_id: new.merchant_id.clone(), - status: new.status, - amount: new.amount, - currency: new.currency, - amount_captured: new.amount_captured, - customer_id: new.customer_id.clone(), - description: new.description.clone(), - return_url: new.return_url.clone(), - metadata: new.metadata.clone(), - connector_id: new.connector_id.clone(), - shipping_address_id: new.shipping_address_id.clone(), - billing_address_id: new.billing_address_id.clone(), - statement_descriptor_name: new.statement_descriptor_name.clone(), - statement_descriptor_suffix: new.statement_descriptor_suffix.clone(), - created_at: new.created_at.unwrap_or_else(date_time::now), - modified_at: new.created_at.unwrap_or_else(date_time::now), - last_synced: new.last_synced, - setup_future_usage: new.setup_future_usage, - off_session: new.off_session, - client_secret: new.client_secret.clone(), - business_country: new.business_country, - business_label: new.business_label.clone(), - active_attempt_id: new.active_attempt_id.to_owned(), - order_details: new.order_details.clone(), - allowed_payment_method_types: new.allowed_payment_method_types.clone(), - connector_metadata: new.connector_metadata.clone(), - feature_metadata: new.feature_metadata.clone(), - attempt_count: new.attempt_count, - }; - - match self - .get_redis_conn() - .map_err(Into::::into)? - .serialize_and_set_hash_field_if_not_exist(&key, "pi", &created_intent) - .await - { - Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue { - entity: "payment_intent", - key: Some(key), - }) - .into_report(), - Ok(HsetnxReply::KeySet) => { - let redis_entry = kv::TypedSql { - op: kv::DBOperation::Insert { - insertable: kv::Insertable::PaymentIntent(new), - }, - }; - self.push_to_drainer_stream::( - redis_entry, - storage_partitioning::PartitionKey::MerchantIdPaymentId { - merchant_id: &created_intent.merchant_id, - payment_id: &created_intent.payment_id, - }, - ) - .await - .change_context(errors::StorageError::KVError)?; - Ok(created_intent) - } - Err(error) => Err(error.change_context(errors::StorageError::KVError)), - } - } - } - } - - async fn update_payment_intent( - &self, - this: PaymentIntent, - payment_intent: PaymentIntentUpdate, - storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { - match storage_scheme { - enums::MerchantStorageScheme::PostgresOnly => { - let conn = connection::pg_connection_write(self).await?; - this.update(&conn, payment_intent) - .await - .map_err(Into::into) - .into_report() - } - - enums::MerchantStorageScheme::RedisKv => { - let key = format!("{}_{}", this.merchant_id, this.payment_id); - - let updated_intent = payment_intent.clone().apply_changeset(this.clone()); - // Check for database presence as well Maybe use a read replica here ? - - let redis_value = - utils::Encode::::encode_to_string_of_json(&updated_intent) - .change_context(errors::StorageError::SerializationFailed)?; - - let updated_intent = self - .get_redis_conn() - .map_err(Into::::into)? - .set_hash_fields(&key, ("pi", &redis_value)) - .await - .map(|_| updated_intent) - .change_context(errors::StorageError::KVError)?; - - let redis_entry = kv::TypedSql { - op: kv::DBOperation::Update { - updatable: kv::Updateable::PaymentIntentUpdate( - kv::PaymentIntentUpdateMems { - orig: this, - update_data: payment_intent, - }, - ), - }, - }; - - self.push_to_drainer_stream::( - redis_entry, - storage_partitioning::PartitionKey::MerchantIdPaymentId { - merchant_id: &updated_intent.merchant_id, - payment_id: &updated_intent.payment_id, - }, - ) - .await - .change_context(errors::StorageError::KVError)?; - Ok(updated_intent) - } - } - } - - async fn find_payment_intent_by_payment_id_merchant_id( - &self, - payment_id: &str, - merchant_id: &str, - storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { - let database_call = || async { - let conn = connection::pg_connection_read(self).await?; - PaymentIntent::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id) - .await - .map_err(Into::into) - .into_report() - }; - match storage_scheme { - enums::MerchantStorageScheme::PostgresOnly => database_call().await, - - enums::MerchantStorageScheme::RedisKv => { - let key = format!("{merchant_id}_{payment_id}"); - db_utils::try_redis_get_else_try_database_get( - self.get_redis_conn() - .map_err(Into::::into)? - .get_hash_field_and_deserialize(&key, "pi", "PaymentIntent"), - database_call, - ) - .await - } - } - } - - #[cfg(feature = "olap")] - async fn filter_payment_intent_by_constraints( - &self, - merchant_id: &str, - pc: &api::PaymentListConstraints, - storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - match storage_scheme { - enums::MerchantStorageScheme::PostgresOnly => { - let conn = connection::pg_connection_read(self).await?; - PaymentIntent::filter_by_constraints(&conn, merchant_id, pc) - .await - .map_err(Into::into) - .into_report() - } - - enums::MerchantStorageScheme::RedisKv => Err(errors::StorageError::KVError.into()), - } - } - #[cfg(feature = "olap")] - async fn filter_payment_intents_by_time_range_constraints( - &self, - merchant_id: &str, - time_range: &api::TimeRange, - storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - match storage_scheme { - enums::MerchantStorageScheme::PostgresOnly => { - let conn = connection::pg_connection_read(self).await?; - PaymentIntent::filter_by_time_constraints(&conn, merchant_id, time_range) - .await - .map_err(Into::into) - .into_report() - } - - enums::MerchantStorageScheme::RedisKv => Err(errors::StorageError::KVError.into()), - } - } - - #[cfg(feature = "olap")] - async fn apply_filters_on_payments_list( - &self, - merchant_id: &str, - constraints: &api::PaymentListFilterConstraints, - storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - match storage_scheme { - enums::MerchantStorageScheme::PostgresOnly => { - let conn = connection::pg_connection_read(self).await?; - PaymentIntent::apply_filters_on_payments(&conn, merchant_id, constraints) - .await - .map_err(Into::into) - .into_report() - } - - enums::MerchantStorageScheme::RedisKv => Err(errors::StorageError::KVError.into()), - } - } - } -} - -#[cfg(not(feature = "kv_store"))] -mod storage { - use error_stack::IntoReport; - - use super::PaymentIntentInterface; - #[cfg(feature = "olap")] - use crate::types::api; - use crate::{ - connection, - core::errors::{self, CustomResult}, - services::Store, - types::storage::{enums, payment_intent::*}, - }; - - #[async_trait::async_trait] - impl PaymentIntentInterface for Store { - async fn insert_payment_intent( - &self, - new: PaymentIntentNew, - _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - new.insert(&conn).await.map_err(Into::into).into_report() - } - - async fn update_payment_intent( - &self, - this: PaymentIntent, - payment_intent: PaymentIntentUpdate, - _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - this.update(&conn, payment_intent) - .await - .map_err(Into::into) - .into_report() - } - - async fn find_payment_intent_by_payment_id_merchant_id( - &self, - payment_id: &str, - merchant_id: &str, - _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - PaymentIntent::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id) - .await - .map_err(Into::into) - .into_report() - } - - #[cfg(feature = "olap")] - async fn filter_payment_intent_by_constraints( - &self, - merchant_id: &str, - pc: &api::PaymentListConstraints, - _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - PaymentIntent::filter_by_constraints(&conn, merchant_id, pc) - .await - .map_err(Into::into) - .into_report() - } - #[cfg(feature = "olap")] - async fn filter_payment_intents_by_time_range_constraints( - &self, - merchant_id: &str, - time_range: &api::TimeRange, - _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - PaymentIntent::filter_by_time_constraints(&conn, merchant_id, time_range) - .await - .map_err(Into::into) - .into_report() - } - - #[cfg(feature = "olap")] - async fn apply_filters_on_payments_list( - &self, - merchant_id: &str, - constraints: &api::PaymentListFilterConstraints, - _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - PaymentIntent::apply_filters_on_payments(&conn, merchant_id, constraints) - .await - .map_err(Into::into) - .into_report() - } - } -} - #[async_trait::async_trait] impl PaymentIntentInterface for MockDb { #[cfg(feature = "olap")] async fn filter_payment_intent_by_constraints( &self, _merchant_id: &str, - _pc: &api::PaymentListConstraints, + _filters: &PaymentIntentFetchConstraints, _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { + ) -> CustomResult, errors::DataStorageError> { // [#172]: Implement function for `MockDb` - Err(errors::StorageError::MockDbError)? + Err(errors::DataStorageError::MockDbError)? } #[cfg(feature = "olap")] async fn filter_payment_intents_by_time_range_constraints( @@ -413,31 +32,30 @@ impl PaymentIntentInterface for MockDb { _merchant_id: &str, _time_range: &api::TimeRange, _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { + ) -> CustomResult, errors::DataStorageError> { // [#172]: Implement function for `MockDb` - Err(errors::StorageError::MockDbError)? + Err(errors::DataStorageError::MockDbError)? } #[cfg(feature = "olap")] - async fn apply_filters_on_payments_list( + async fn get_filtered_payment_intents_attempt( &self, _merchant_id: &str, - _constraints: &api::PaymentListFilterConstraints, + _constraints: &PaymentIntentFetchConstraints, _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> - { + ) -> error_stack::Result, errors::DataStorageError> { // [#172]: Implement function for `MockDb` - Err(errors::StorageError::MockDbError)? + Err(errors::DataStorageError::MockDbError)? } #[allow(clippy::panic)] async fn insert_payment_intent( &self, - new: types::PaymentIntentNew, + new: PaymentIntentNew, _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { + ) -> CustomResult { let mut payment_intents = self.payment_intents.lock().await; let time = common_utils::date_time::now(); - let payment_intent = types::PaymentIntent { + let payment_intent = PaymentIntent { #[allow(clippy::as_conversions)] id: payment_intents.len() as i32, payment_id: new.payment_id, @@ -478,10 +96,10 @@ impl PaymentIntentInterface for MockDb { #[allow(clippy::unwrap_used)] async fn update_payment_intent( &self, - this: types::PaymentIntent, + this: PaymentIntent, update: types::PaymentIntentUpdate, _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { + ) -> CustomResult { let mut payment_intents = self.payment_intents.lock().await; let payment_intent = payment_intents .iter_mut() @@ -498,7 +116,7 @@ impl PaymentIntentInterface for MockDb { payment_id: &str, merchant_id: &str, _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { + ) -> CustomResult { let payment_intents = self.payment_intents.lock().await; Ok(payment_intents diff --git a/crates/router/src/types/domain/merchant_account.rs b/crates/router/src/types/domain/merchant_account.rs index f90f8da1f7..30710218e7 100644 --- a/crates/router/src/types/domain/merchant_account.rs +++ b/crates/router/src/types/domain/merchant_account.rs @@ -2,11 +2,11 @@ use common_utils::{ crypto::{OptionalEncryptableName, OptionalEncryptableValue}, date_time, pii, }; -use diesel_models::{ - encryption::Encryption, enums, merchant_account::MerchantAccountUpdateInternal, -}; +use data_models::MerchantStorageScheme; +use diesel_models::{encryption::Encryption, merchant_account::MerchantAccountUpdateInternal}; use error_stack::ResultExt; use masking::{PeekInterface, Secret}; +use storage_impl::DataModelExt; use crate::{ errors::{CustomResult, ValidationError}, @@ -27,7 +27,7 @@ pub struct MerchantAccount { pub sub_merchants_enabled: Option, pub parent_merchant_id: Option, pub publishable_key: Option, - pub storage_scheme: enums::MerchantStorageScheme, + pub storage_scheme: MerchantStorageScheme, pub locker_id: Option, pub metadata: Option, pub routing_algorithm: Option, @@ -64,7 +64,7 @@ pub enum MerchantAccountUpdate { payout_routing_algorithm: Option, }, StorageSchemeUpdate { - storage_scheme: enums::MerchantStorageScheme, + storage_scheme: MerchantStorageScheme, }, ReconUpdate { is_recon_enabled: bool, @@ -114,7 +114,7 @@ impl From for MerchantAccountUpdateInternal { ..Default::default() }, MerchantAccountUpdate::StorageSchemeUpdate { storage_scheme } => Self { - storage_scheme: Some(storage_scheme), + storage_scheme: Some(storage_scheme.to_storage_model()), modified_at: Some(date_time::now()), ..Default::default() }, @@ -146,7 +146,7 @@ impl super::behaviour::Conversion for MerchantAccount { sub_merchants_enabled: self.sub_merchants_enabled, parent_merchant_id: self.parent_merchant_id, publishable_key: self.publishable_key, - storage_scheme: self.storage_scheme, + storage_scheme: self.storage_scheme.to_storage_model(), locker_id: self.locker_id, metadata: self.metadata, routing_algorithm: self.routing_algorithm, @@ -188,7 +188,7 @@ impl super::behaviour::Conversion for MerchantAccount { sub_merchants_enabled: item.sub_merchants_enabled, parent_merchant_id: item.parent_merchant_id, publishable_key: item.publishable_key, - storage_scheme: item.storage_scheme, + storage_scheme: MerchantStorageScheme::from_storage_model(item.storage_scheme), locker_id: item.locker_id, metadata: item.metadata, routing_algorithm: item.routing_algorithm, diff --git a/crates/router/src/types/storage.rs b/crates/router/src/types/storage.rs index d9ecdaaa19..d9a5b26f12 100644 --- a/crates/router/src/types/storage.rs +++ b/crates/router/src/types/storage.rs @@ -19,7 +19,6 @@ pub mod merchant_account; pub mod merchant_connector_account; pub mod merchant_key_store; pub mod payment_attempt; -pub mod payment_intent; pub mod payment_method; pub mod payout_attempt; pub mod payouts; @@ -28,10 +27,14 @@ mod query; pub mod refund; pub mod reverse_lookup; +pub use data_models::payments::payment_intent::{ + PaymentIntent, PaymentIntentNew, PaymentIntentUpdate, +}; + pub use self::{ address::*, api_keys::*, capture::*, cards_info::*, configs::*, connector_response::*, customers::*, dispute::*, ephemeral_key::*, events::*, file::*, locker_mock_up::*, mandate::*, merchant_account::*, merchant_connector_account::*, merchant_key_store::*, payment_attempt::*, - payment_intent::*, payment_method::*, payout_attempt::*, payouts::*, process_tracker::*, - refund::*, reverse_lookup::*, + payment_method::*, payout_attempt::*, payouts::*, process_tracker::*, refund::*, + reverse_lookup::*, }; diff --git a/crates/router/src/types/storage/enums.rs b/crates/router/src/types/storage/enums.rs index 9c7d02cdce..472fd94710 100644 --- a/crates/router/src/types/storage/enums.rs +++ b/crates/router/src/types/storage/enums.rs @@ -1 +1,2 @@ +pub use data_models::MerchantStorageScheme; pub use diesel_models::enums::*; diff --git a/crates/router/src/types/storage/payment_intent.rs b/crates/router/src/types/storage/payment_intent.rs deleted file mode 100644 index a2752b5499..0000000000 --- a/crates/router/src/types/storage/payment_intent.rs +++ /dev/null @@ -1,191 +0,0 @@ -use async_bb8_diesel::AsyncRunQueryDsl; -use diesel::{associations::HasTable, debug_query, pg::Pg, ExpressionMethods, JoinOnDsl, QueryDsl}; -use diesel_models::query::generics::db_metrics; -pub use diesel_models::{ - errors, - payment_attempt::PaymentAttempt, - payment_intent::{ - PaymentIntent, PaymentIntentNew, PaymentIntentUpdate, PaymentIntentUpdateInternal, - }, - schema::{ - payment_attempt::{self, dsl as dsl1}, - payment_intent::dsl, - }, -}; -use error_stack::{IntoReport, ResultExt}; -use router_env::{instrument, tracing}; - -use crate::{connection::PgPooledConn, core::errors::CustomResult, types::api}; - -const JOIN_LIMIT: i64 = 20; - -#[async_trait::async_trait] -pub trait PaymentIntentDbExt: Sized { - async fn filter_by_constraints( - conn: &PgPooledConn, - merchant_id: &str, - pc: &api::PaymentListConstraints, - ) -> CustomResult, errors::DatabaseError>; - - async fn filter_by_time_constraints( - conn: &PgPooledConn, - merchant_id: &str, - pc: &api::TimeRange, - ) -> CustomResult, errors::DatabaseError>; - - async fn apply_filters_on_payments( - conn: &PgPooledConn, - merchant_id: &str, - constraints: &api::PaymentListFilterConstraints, - ) -> CustomResult, errors::DatabaseError>; -} - -#[async_trait::async_trait] -impl PaymentIntentDbExt for PaymentIntent { - #[instrument(skip(conn))] - async fn filter_by_constraints( - conn: &PgPooledConn, - merchant_id: &str, - pc: &api::PaymentListConstraints, - ) -> CustomResult, errors::DatabaseError> { - let customer_id = &pc.customer_id; - let starting_after = &pc.starting_after; - let ending_before = &pc.ending_before; - - //[#350]: Replace this with Boxable Expression and pass it into generic filter - // when https://github.com/rust-lang/rust/issues/52662 becomes stable - let mut filter = ::table() - .filter(dsl::merchant_id.eq(merchant_id.to_owned())) - .order(dsl::created_at.desc()) - .into_boxed(); - - if let Some(customer_id) = customer_id { - filter = filter.filter(dsl::customer_id.eq(customer_id.to_owned())); - } - if let Some(created) = pc.created { - filter = filter.filter(dsl::created_at.eq(created)); - } - if let Some(created_lt) = pc.created_lt { - filter = filter.filter(dsl::created_at.lt(created_lt)); - } - if let Some(created_gt) = pc.created_gt { - filter = filter.filter(dsl::created_at.gt(created_gt)); - } - if let Some(created_lte) = pc.created_lte { - filter = filter.filter(dsl::created_at.le(created_lte)); - } - if let Some(created_gte) = pc.created_gte { - filter = filter.filter(dsl::created_at.gt(created_gte)); - } - if let Some(starting_after) = starting_after { - let id = Self::find_by_payment_id_merchant_id(conn, starting_after, merchant_id) - .await? - .id; - filter = filter.filter(dsl::id.gt(id)); - } - if let Some(ending_before) = ending_before { - let id = Self::find_by_payment_id_merchant_id(conn, ending_before, merchant_id) - .await? - .id; - filter = filter.filter(dsl::id.lt(id.to_owned())); - } - - filter = filter.limit(pc.limit); - - crate::logger::debug!(query = %debug_query::(&filter).to_string()); - - db_metrics::track_database_call::<::Table, _, _>( - filter.get_results_async(conn), - db_metrics::DatabaseOperation::Filter, - ) - .await - .into_report() - .change_context(errors::DatabaseError::NotFound) - .attach_printable_lazy(|| "Error filtering records by predicate") - } - - #[instrument(skip(conn))] - async fn filter_by_time_constraints( - conn: &PgPooledConn, - merchant_id: &str, - time_range: &api::TimeRange, - ) -> CustomResult, errors::DatabaseError> { - let start_time = time_range.start_time; - let end_time = time_range - .end_time - .unwrap_or_else(common_utils::date_time::now); - - //[#350]: Replace this with Boxable Expression and pass it into generic filter - // when https://github.com/rust-lang/rust/issues/52662 becomes stable - let mut filter = ::table() - .filter(dsl::merchant_id.eq(merchant_id.to_owned())) - .order(dsl::modified_at.desc()) - .into_boxed(); - - filter = filter.filter(dsl::created_at.ge(start_time)); - - filter = filter.filter(dsl::created_at.le(end_time)); - - crate::logger::debug!(query = %debug_query::(&filter).to_string()); - filter - .get_results_async(conn) - .await - .into_report() - .change_context(errors::DatabaseError::Others) - .attach_printable("Error filtering records by time range") - } - - #[instrument(skip(conn))] - async fn apply_filters_on_payments( - conn: &PgPooledConn, - merchant_id: &str, - constraints: &api::PaymentListFilterConstraints, - ) -> CustomResult, errors::DatabaseError> { - let offset = constraints.offset.unwrap_or_default(); - let mut filter = Self::table() - .inner_join(payment_attempt::table.on(dsl1::attempt_id.eq(dsl::active_attempt_id))) - .filter(dsl::merchant_id.eq(merchant_id.to_owned())) - .order(dsl::created_at.desc()) - .into_boxed(); - - match &constraints.payment_id { - Some(payment_id) => { - filter = filter.filter(dsl::payment_id.eq(payment_id.to_owned())); - } - None => { - filter = filter.limit(JOIN_LIMIT).offset(offset); - } - }; - - if let Some(time_range) = constraints.time_range { - filter = filter.filter(dsl::created_at.ge(time_range.start_time)); - - if let Some(end_time) = time_range.end_time { - filter = filter.filter(dsl::created_at.le(end_time)); - } - } - - if let Some(connector) = constraints.connector.clone() { - filter = filter.filter(dsl1::connector.eq_any(connector)); - } - - if let Some(filter_currency) = constraints.currency.clone() { - filter = filter.filter(dsl::currency.eq_any(filter_currency)); - } - - if let Some(status) = constraints.status.clone() { - filter = filter.filter(dsl::status.eq_any(status)); - } - if let Some(payment_method) = constraints.payment_methods.clone() { - filter = filter.filter(dsl1::payment_method.eq_any(payment_method)); - } - - crate::logger::debug!(filter = %debug_query::(&filter).to_string()); - filter - .get_results_async(conn) - .await - .into_report() - .change_context(errors::DatabaseError::Others) - .attach_printable("Error filtering payment records") - } -} diff --git a/crates/router/src/utils/db_utils.rs b/crates/router/src/utils/db_utils.rs index b02402d7fe..c169de293d 100644 --- a/crates/router/src/utils/db_utils.rs +++ b/crates/router/src/utils/db_utils.rs @@ -26,11 +26,11 @@ pub fn generate_hscan_pattern_for_attempt(sk: &str) -> String { pub async fn try_redis_get_else_try_database_get( redis_fut: RFut, database_call_closure: F, -) -> errors::CustomResult +) -> error_stack::Result where F: FnOnce() -> DFut, - RFut: futures::Future>, - DFut: futures::Future>, + RFut: futures::Future>, + DFut: futures::Future>, { let redis_output = redis_fut.await; match redis_output { diff --git a/crates/storage_impl/Cargo.toml b/crates/storage_impl/Cargo.toml index 5597bc1af5..5daf1690a7 100644 --- a/crates/storage_impl/Cargo.toml +++ b/crates/storage_impl/Cargo.toml @@ -8,11 +8,17 @@ readme = "README.md" license.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = ["olap", "oltp"] +oltp = ["data_models/oltp"] +olap = ["data_models/olap"] [dependencies] # First Party dependencies common_utils = { version = "0.1.0", path = "../common_utils" } +api_models = { version = "0.1.0", path = "../api_models" } diesel_models = { version = "0.1.0", path = "../diesel_models" } +data_models = { version = "0.1.0", path = "../data_models", default-features = false } masking = { version = "0.1.0", path = "../masking" } redis_interface = { version = "0.1.0", path = "../redis_interface" } router_env = { version = "0.1.0", path = "../router_env" } @@ -22,9 +28,10 @@ async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", async-trait = "0.1.72" bb8 = "0.8.1" crc32fast = "1.3.2" +futures = "0.3.28" diesel = { version = "2.1.0", default-features = false, features = ["postgres"] } dyn-clone = "1.0.12" error-stack = "0.3.1" moka = { version = "0.11.3", features = ["future"] } once_cell = "1.18.0" -tokio = { version = "1.28.2", features = ["rt-multi-thread"] } +tokio = { version = "1.28.2", features = ["rt-multi-thread"] } \ No newline at end of file diff --git a/crates/storage_impl/src/database/store.rs b/crates/storage_impl/src/database/store.rs index a80a3b1fdf..d2e18475ea 100644 --- a/crates/storage_impl/src/database/store.rs +++ b/crates/storage_impl/src/database/store.rs @@ -9,8 +9,8 @@ pub type PgPool = bb8::Pool>; pub type PgPooledConn = async_bb8_diesel::Connection; #[async_trait::async_trait] -pub trait DatabaseStore: Clone + Send { - type Config; +pub trait DatabaseStore: Clone + Send + Sync { + type Config: Send; async fn new(config: Self::Config, test_transaction: bool) -> Self; fn get_master_pool(&self) -> &PgPool; fn get_replica_pool(&self) -> &PgPool; diff --git a/crates/storage_impl/src/lib.rs b/crates/storage_impl/src/lib.rs index ef3970f1f9..88b8173564 100644 --- a/crates/storage_impl/src/lib.rs +++ b/crates/storage_impl/src/lib.rs @@ -5,9 +5,11 @@ use masking::StrongSecret; use redis::{kv_store::RedisConnInterface, RedisStore}; pub mod config; pub mod database; +pub mod metrics; pub mod payments; pub mod redis; pub mod refund; +mod utils; use database::store::PgPool; use redis_interface::errors::RedisError; @@ -193,3 +195,57 @@ impl KVRouterStore { .change_context(RedisError::StreamAppendFailed) } } + +// TODO: This should not be used beyond this crate +// Remove the pub modified once StorageScheme usage is completed +pub trait DataModelExt { + type StorageModel; + fn to_storage_model(self) -> Self::StorageModel; + fn from_storage_model(storage_model: Self::StorageModel) -> Self; +} + +impl DataModelExt for data_models::MerchantStorageScheme { + type StorageModel = diesel_models::enums::MerchantStorageScheme; + + fn to_storage_model(self) -> Self::StorageModel { + match self { + Self::PostgresOnly => diesel_models::enums::MerchantStorageScheme::PostgresOnly, + Self::RedisKv => diesel_models::enums::MerchantStorageScheme::RedisKv, + } + } + + fn from_storage_model(storage_model: Self::StorageModel) -> Self { + match storage_model { + diesel_models::enums::MerchantStorageScheme::PostgresOnly => Self::PostgresOnly, + diesel_models::enums::MerchantStorageScheme::RedisKv => Self::RedisKv, + } + } +} + +pub(crate) fn diesel_error_to_data_error( + diesel_error: &diesel_models::errors::DatabaseError, +) -> data_models::errors::StorageError { + match diesel_error { + diesel_models::errors::DatabaseError::DatabaseConnectionError => { + data_models::errors::StorageError::DatabaseConnectionError + } + diesel_models::errors::DatabaseError::NotFound => { + data_models::errors::StorageError::ValueNotFound("Value not found".to_string()) + } + diesel_models::errors::DatabaseError::UniqueViolation => { + data_models::errors::StorageError::DuplicateValue { + entity: "entity ", + key: None, + } + } + diesel_models::errors::DatabaseError::NoFieldsToUpdate => { + data_models::errors::StorageError::DatabaseError("No fields to update".to_string()) + } + diesel_models::errors::DatabaseError::QueryGenerationFailed => { + data_models::errors::StorageError::DatabaseError("Query generation failed".to_string()) + } + diesel_models::errors::DatabaseError::Others => { + data_models::errors::StorageError::DatabaseError("Others".to_string()) + } + } +} diff --git a/crates/storage_impl/src/metrics.rs b/crates/storage_impl/src/metrics.rs new file mode 100644 index 0000000000..ee9b77bbd5 --- /dev/null +++ b/crates/storage_impl/src/metrics.rs @@ -0,0 +1,6 @@ +use router_env::{counter_metric, global_meter, metrics_context}; + +metrics_context!(CONTEXT); +global_meter!(GLOBAL_METER, "ROUTER_API"); + +counter_metric!(KV_MISS, GLOBAL_METER); // No. of KV misses diff --git a/crates/storage_impl/src/payments.rs b/crates/storage_impl/src/payments.rs index edf5b191e1..f4196e1eb2 100644 --- a/crates/storage_impl/src/payments.rs +++ b/crates/storage_impl/src/payments.rs @@ -1,3 +1,6 @@ +pub mod payment_attempt; +pub mod payment_intent; + use diesel_models::{payment_attempt::PaymentAttempt, payment_intent::PaymentIntent}; use crate::redis::kv_store::KvStorePartition; diff --git a/crates/storage_impl/src/payments/payment_attempt.rs b/crates/storage_impl/src/payments/payment_attempt.rs new file mode 100644 index 0000000000..218d87cdfa --- /dev/null +++ b/crates/storage_impl/src/payments/payment_attempt.rs @@ -0,0 +1,156 @@ +use data_models::{ + mandates::{MandateAmountData, MandateDataType}, + payments::payment_attempt::PaymentAttempt, +}; +use diesel_models::{ + enums::{MandateAmountData as DieselMandateAmountData, MandateDataType as DieselMandateType}, + payment_attempt::PaymentAttempt as DieselPaymentAttempt, +}; + +use crate::DataModelExt; + +impl DataModelExt for MandateAmountData { + type StorageModel = DieselMandateAmountData; + + fn to_storage_model(self) -> Self::StorageModel { + DieselMandateAmountData { + amount: self.amount, + currency: self.currency, + start_date: self.start_date, + end_date: self.end_date, + metadata: self.metadata, + } + } + + fn from_storage_model(storage_model: Self::StorageModel) -> Self { + Self { + amount: storage_model.amount, + currency: storage_model.currency, + start_date: storage_model.start_date, + end_date: storage_model.end_date, + metadata: storage_model.metadata, + } + } +} + +impl DataModelExt for MandateDataType { + type StorageModel = DieselMandateType; + + fn to_storage_model(self) -> Self::StorageModel { + match self { + Self::SingleUse(data) => DieselMandateType::SingleUse(data.to_storage_model()), + Self::MultiUse(None) => DieselMandateType::MultiUse(None), + Self::MultiUse(Some(data)) => { + DieselMandateType::MultiUse(Some(data.to_storage_model())) + } + } + } + + fn from_storage_model(storage_model: Self::StorageModel) -> Self { + match storage_model { + DieselMandateType::SingleUse(data) => { + Self::SingleUse(MandateAmountData::from_storage_model(data)) + } + DieselMandateType::MultiUse(Some(data)) => { + Self::MultiUse(Some(MandateAmountData::from_storage_model(data))) + } + DieselMandateType::MultiUse(None) => Self::MultiUse(None), + } + } +} + +impl DataModelExt for PaymentAttempt { + type StorageModel = DieselPaymentAttempt; + + fn to_storage_model(self) -> Self::StorageModel { + DieselPaymentAttempt { + id: self.id, + payment_id: self.payment_id, + merchant_id: self.merchant_id, + attempt_id: self.attempt_id, + status: self.status, + amount: self.amount, + currency: self.currency, + save_to_locker: self.save_to_locker, + connector: self.connector, + error_message: self.error_message, + offer_amount: self.offer_amount, + surcharge_amount: self.surcharge_amount, + tax_amount: self.tax_amount, + payment_method_id: self.payment_method_id, + payment_method: self.payment_method, + connector_transaction_id: self.connector_transaction_id, + capture_method: self.capture_method, + capture_on: self.capture_on, + confirm: self.confirm, + authentication_type: self.authentication_type, + created_at: self.created_at, + modified_at: self.modified_at, + last_synced: self.last_synced, + cancellation_reason: self.cancellation_reason, + amount_to_capture: self.amount_to_capture, + mandate_id: self.mandate_id, + browser_info: self.browser_info, + error_code: self.error_code, + payment_token: self.payment_token, + connector_metadata: self.connector_metadata, + payment_experience: self.payment_experience, + payment_method_type: self.payment_method_type, + payment_method_data: self.payment_method_data, + business_sub_label: self.business_sub_label, + straight_through_algorithm: self.straight_through_algorithm, + preprocessing_step_id: self.preprocessing_step_id, + mandate_details: self.mandate_details.map(|md| md.to_storage_model()), + error_reason: self.error_reason, + multiple_capture_count: self.multiple_capture_count, + connector_response_reference_id: self.connector_response_reference_id, + } + } + + fn from_storage_model(storage_model: Self::StorageModel) -> Self { + Self { + id: storage_model.id, + payment_id: storage_model.payment_id, + merchant_id: storage_model.merchant_id, + attempt_id: storage_model.attempt_id, + status: storage_model.status, + amount: storage_model.amount, + currency: storage_model.currency, + save_to_locker: storage_model.save_to_locker, + connector: storage_model.connector, + error_message: storage_model.error_message, + offer_amount: storage_model.offer_amount, + surcharge_amount: storage_model.surcharge_amount, + tax_amount: storage_model.tax_amount, + payment_method_id: storage_model.payment_method_id, + payment_method: storage_model.payment_method, + connector_transaction_id: storage_model.connector_transaction_id, + capture_method: storage_model.capture_method, + capture_on: storage_model.capture_on, + confirm: storage_model.confirm, + authentication_type: storage_model.authentication_type, + created_at: storage_model.created_at, + modified_at: storage_model.modified_at, + last_synced: storage_model.last_synced, + cancellation_reason: storage_model.cancellation_reason, + amount_to_capture: storage_model.amount_to_capture, + mandate_id: storage_model.mandate_id, + browser_info: storage_model.browser_info, + error_code: storage_model.error_code, + payment_token: storage_model.payment_token, + connector_metadata: storage_model.connector_metadata, + payment_experience: storage_model.payment_experience, + payment_method_type: storage_model.payment_method_type, + payment_method_data: storage_model.payment_method_data, + business_sub_label: storage_model.business_sub_label, + straight_through_algorithm: storage_model.straight_through_algorithm, + preprocessing_step_id: storage_model.preprocessing_step_id, + mandate_details: storage_model + .mandate_details + .map(MandateDataType::from_storage_model), + error_reason: storage_model.error_reason, + multiple_capture_count: storage_model.multiple_capture_count, + connector_response_reference_id: storage_model.connector_response_reference_id, + } + } +} diff --git a/crates/storage_impl/src/payments/payment_intent.rs b/crates/storage_impl/src/payments/payment_intent.rs new file mode 100644 index 0000000000..3150707271 --- /dev/null +++ b/crates/storage_impl/src/payments/payment_intent.rs @@ -0,0 +1,898 @@ +#[cfg(feature = "olap")] +use async_bb8_diesel::AsyncRunQueryDsl; +use common_utils::{date_time, ext_traits::Encode}; +#[cfg(feature = "olap")] +use data_models::payments::{ + payment_attempt::PaymentAttempt, payment_intent::PaymentIntentFetchConstraints, +}; +use data_models::{ + errors::StorageError, + payments::payment_intent::{ + PaymentIntent, PaymentIntentInterface, PaymentIntentNew, PaymentIntentUpdate, + }, + MerchantStorageScheme, +}; +#[cfg(feature = "olap")] +use diesel::{associations::HasTable, ExpressionMethods, JoinOnDsl, QueryDsl}; +#[cfg(feature = "olap")] +use diesel_models::query::generics::db_metrics; +use diesel_models::{ + kv, + payment_intent::{ + PaymentIntent as DieselPaymentIntent, PaymentIntentNew as DieselPaymentIntentNew, + PaymentIntentUpdate as DieselPaymentIntentUpdate, + }, +}; +#[cfg(feature = "olap")] +use diesel_models::{ + payment_attempt::PaymentAttempt as DieselPaymentAttempt, + schema::{payment_attempt::dsl as pa_dsl, payment_intent::dsl as pi_dsl}, +}; +use error_stack::{IntoReport, ResultExt}; +use redis_interface::HsetnxReply; +#[cfg(feature = "olap")] +use router_env::logger; + +use crate::{ + redis::kv_store::{PartitionKey, RedisConnInterface}, + utils::{pg_connection_read, pg_connection_write}, + DataModelExt, DatabaseStore, KVRouterStore, +}; + +#[cfg(feature = "olap")] +const QUERY_LIMIT: u32 = 20; + +#[async_trait::async_trait] +impl PaymentIntentInterface for KVRouterStore { + async fn insert_payment_intent( + &self, + new: PaymentIntentNew, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result { + match storage_scheme { + MerchantStorageScheme::PostgresOnly => { + self.router_store + .insert_payment_intent(new, storage_scheme) + .await + } + + MerchantStorageScheme::RedisKv => { + let key = format!("{}_{}", new.merchant_id, new.payment_id); + let created_intent = PaymentIntent { + id: 0i32, + payment_id: new.payment_id.clone(), + merchant_id: new.merchant_id.clone(), + status: new.status, + amount: new.amount, + currency: new.currency, + amount_captured: new.amount_captured, + customer_id: new.customer_id.clone(), + description: new.description.clone(), + return_url: new.return_url.clone(), + metadata: new.metadata.clone(), + connector_id: new.connector_id.clone(), + shipping_address_id: new.shipping_address_id.clone(), + billing_address_id: new.billing_address_id.clone(), + statement_descriptor_name: new.statement_descriptor_name.clone(), + statement_descriptor_suffix: new.statement_descriptor_suffix.clone(), + created_at: new.created_at.unwrap_or_else(date_time::now), + modified_at: new.created_at.unwrap_or_else(date_time::now), + last_synced: new.last_synced, + setup_future_usage: new.setup_future_usage, + off_session: new.off_session, + client_secret: new.client_secret.clone(), + business_country: new.business_country, + business_label: new.business_label.clone(), + active_attempt_id: new.active_attempt_id.to_owned(), + order_details: new.order_details.clone(), + allowed_payment_method_types: new.allowed_payment_method_types.clone(), + connector_metadata: new.connector_metadata.clone(), + feature_metadata: new.feature_metadata.clone(), + attempt_count: new.attempt_count, + }; + + match self + .get_redis_conn() + .change_context(StorageError::DatabaseConnectionError)? + .serialize_and_set_hash_field_if_not_exist(&key, "pi", &created_intent) + .await + { + Ok(HsetnxReply::KeyNotSet) => Err(StorageError::DuplicateValue { + entity: "payment_intent", + key: Some(key), + }) + .into_report(), + Ok(HsetnxReply::KeySet) => { + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Insert { + insertable: kv::Insertable::PaymentIntent(new.to_storage_model()), + }, + }; + self.push_to_drainer_stream::( + redis_entry, + PartitionKey::MerchantIdPaymentId { + merchant_id: &created_intent.merchant_id, + payment_id: &created_intent.payment_id, + }, + ) + .await + .change_context(StorageError::KVError)?; + Ok(created_intent) + } + Err(error) => Err(error.change_context(StorageError::KVError)), + } + } + } + } + + async fn update_payment_intent( + &self, + this: PaymentIntent, + payment_intent: PaymentIntentUpdate, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result { + match storage_scheme { + MerchantStorageScheme::PostgresOnly => { + self.router_store + .update_payment_intent(this, payment_intent, storage_scheme) + .await + } + MerchantStorageScheme::RedisKv => { + let key = format!("{}_{}", this.merchant_id, this.payment_id); + + let updated_intent = payment_intent.clone().apply_changeset(this.clone()); + // Check for database presence as well Maybe use a read replica here ? + + let redis_value = + Encode::::encode_to_string_of_json(&updated_intent) + .change_context(StorageError::SerializationFailed)?; + + let updated_intent = self + .get_redis_conn() + .change_context(StorageError::DatabaseConnectionError)? + .set_hash_fields(&key, ("pi", &redis_value)) + .await + .map(|_| updated_intent) + .change_context(StorageError::KVError)?; + + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Update { + updatable: kv::Updateable::PaymentIntentUpdate( + kv::PaymentIntentUpdateMems { + orig: this.to_storage_model(), + update_data: payment_intent.to_storage_model(), + }, + ), + }, + }; + + self.push_to_drainer_stream::( + redis_entry, + PartitionKey::MerchantIdPaymentId { + merchant_id: &updated_intent.merchant_id, + payment_id: &updated_intent.payment_id, + }, + ) + .await + .change_context(StorageError::KVError)?; + Ok(updated_intent) + } + } + } + + async fn find_payment_intent_by_payment_id_merchant_id( + &self, + payment_id: &str, + merchant_id: &str, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result { + let database_call = || async { + self.router_store + .find_payment_intent_by_payment_id_merchant_id( + payment_id, + merchant_id, + storage_scheme, + ) + .await + }; + match storage_scheme { + MerchantStorageScheme::PostgresOnly => database_call().await, + + MerchantStorageScheme::RedisKv => { + let key = format!("{merchant_id}_{payment_id}"); + crate::utils::try_redis_get_else_try_database_get( + self.get_redis_conn() + .change_context(StorageError::DatabaseConnectionError)? + .get_hash_field_and_deserialize(&key, "pi", "PaymentIntent"), + database_call, + ) + .await + } + } + } + + #[cfg(feature = "olap")] + async fn filter_payment_intent_by_constraints( + &self, + merchant_id: &str, + filters: &PaymentIntentFetchConstraints, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result, StorageError> { + match storage_scheme { + MerchantStorageScheme::PostgresOnly => { + self.router_store + .filter_payment_intent_by_constraints(merchant_id, filters, storage_scheme) + .await + } + MerchantStorageScheme::RedisKv => Err(StorageError::KVError.into()), + } + } + #[cfg(feature = "olap")] + async fn filter_payment_intents_by_time_range_constraints( + &self, + merchant_id: &str, + time_range: &api_models::payments::TimeRange, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result, StorageError> { + match storage_scheme { + MerchantStorageScheme::PostgresOnly => { + self.router_store + .filter_payment_intents_by_time_range_constraints( + merchant_id, + time_range, + storage_scheme, + ) + .await + } + MerchantStorageScheme::RedisKv => Err(StorageError::KVError.into()), + } + } + + #[cfg(feature = "olap")] + async fn get_filtered_payment_intents_attempt( + &self, + merchant_id: &str, + filters: &PaymentIntentFetchConstraints, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result, StorageError> { + match storage_scheme { + MerchantStorageScheme::PostgresOnly => { + self.router_store + .get_filtered_payment_intents_attempt(merchant_id, filters, storage_scheme) + .await + } + MerchantStorageScheme::RedisKv => Err(StorageError::KVError.into()), + } + } +} + +#[async_trait::async_trait] +impl PaymentIntentInterface for crate::RouterStore { + async fn insert_payment_intent( + &self, + new: PaymentIntentNew, + _storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result { + let conn = pg_connection_write(self).await?; + new.to_storage_model() + .insert(&conn) + .await + .map_err(|er| { + let new_err = crate::diesel_error_to_data_error(er.current_context()); + er.change_context(new_err) + }) + .map(PaymentIntent::from_storage_model) + } + + async fn update_payment_intent( + &self, + this: PaymentIntent, + payment_intent: PaymentIntentUpdate, + _storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result { + let conn = pg_connection_write(self).await?; + this.to_storage_model() + .update(&conn, payment_intent.to_storage_model()) + .await + .map_err(|er| { + let new_err = crate::diesel_error_to_data_error(er.current_context()); + er.change_context(new_err) + }) + .map(PaymentIntent::from_storage_model) + } + + async fn find_payment_intent_by_payment_id_merchant_id( + &self, + payment_id: &str, + merchant_id: &str, + _storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result { + let conn = pg_connection_read(self).await?; + DieselPaymentIntent::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id) + .await + .map(PaymentIntent::from_storage_model) + .map_err(|er| { + let new_err = crate::diesel_error_to_data_error(er.current_context()); + er.change_context(new_err) + }) + } + + #[cfg(feature = "olap")] + async fn filter_payment_intent_by_constraints( + &self, + merchant_id: &str, + filters: &PaymentIntentFetchConstraints, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result, StorageError> { + let conn = self.get_replica_pool(); + + //[#350]: Replace this with Boxable Expression and pass it into generic filter + // when https://github.com/rust-lang/rust/issues/52662 becomes stable + let mut query = ::table() + .filter(pi_dsl::merchant_id.eq(merchant_id.to_owned())) + .order(pi_dsl::created_at.desc()) + .into_boxed(); + + match filters { + PaymentIntentFetchConstraints::Single { payment_intent_id } => { + query = query.filter(pi_dsl::payment_id.eq(payment_intent_id.to_owned())); + } + PaymentIntentFetchConstraints::List { + offset: _, + starting_at, + ending_at, + connector: _, + currency, + status, + payment_methods: _, + customer_id, + starting_after_id, + ending_before_id, + limit, + } => { + query = query.limit(limit.unwrap_or(QUERY_LIMIT).into()); + + if let Some(customer_id) = customer_id { + query = query.filter(pi_dsl::customer_id.eq(customer_id.clone())); + } + + query = match (starting_at, starting_after_id) { + (Some(starting_at), _) => query.filter(pi_dsl::created_at.ge(*starting_at)), + (None, Some(starting_after_id)) => { + // TODO: Fetch partial columns for this query since we only need some columns + let starting_at = self + .find_payment_intent_by_payment_id_merchant_id( + starting_after_id, + merchant_id, + storage_scheme, + ) + .await? + .created_at; + query.filter(pi_dsl::created_at.ge(starting_at)) + } + (None, None) => query, + }; + + query = match (ending_at, ending_before_id) { + (Some(ending_at), _) => query.filter(pi_dsl::created_at.le(*ending_at)), + (None, Some(ending_before_id)) => { + // TODO: Fetch partial columns for this query since we only need some columns + let ending_at = self + .find_payment_intent_by_payment_id_merchant_id( + ending_before_id, + merchant_id, + storage_scheme, + ) + .await? + .created_at; + query.filter(pi_dsl::created_at.le(ending_at)) + } + (None, None) => query, + }; + query = match currency { + Some(currency) => query.filter(pi_dsl::currency.eq_any(currency.clone())), + None => query, + }; + + query = match status { + Some(status) => query.filter(pi_dsl::status.eq_any(status.clone())), + None => query, + }; + } + } + + logger::debug!(query = %diesel::debug_query::(&query).to_string()); + + db_metrics::track_database_call::<::Table, _, _>( + query.get_results_async::(conn), + db_metrics::DatabaseOperation::Filter, + ) + .await + .map(|payment_intents| { + payment_intents + .into_iter() + .map(PaymentIntent::from_storage_model) + .collect::>() + }) + .into_report() + .map_err(|er| { + let new_err = StorageError::DatabaseError(format!("{er:?}")); + er.change_context(new_err) + }) + .attach_printable_lazy(|| "Error filtering records by predicate") + } + + #[cfg(feature = "olap")] + async fn filter_payment_intents_by_time_range_constraints( + &self, + merchant_id: &str, + time_range: &api_models::payments::TimeRange, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result, StorageError> { + // TODO: Remove this redundant function + let payment_filters = (*time_range).into(); + self.filter_payment_intent_by_constraints(merchant_id, &payment_filters, storage_scheme) + .await + } + + #[cfg(feature = "olap")] + async fn get_filtered_payment_intents_attempt( + &self, + merchant_id: &str, + constraints: &PaymentIntentFetchConstraints, + storage_scheme: MerchantStorageScheme, + ) -> error_stack::Result, StorageError> { + let conn = self.get_replica_pool(); + + let mut query = DieselPaymentIntent::table() + .inner_join( + diesel_models::schema::payment_attempt::table + .on(pa_dsl::attempt_id.eq(pi_dsl::active_attempt_id)), + ) + .filter(pi_dsl::merchant_id.eq(merchant_id.to_owned())) + .order(pi_dsl::created_at.desc()) + .into_boxed(); + query = match constraints { + PaymentIntentFetchConstraints::Single { payment_intent_id } => { + query.filter(pi_dsl::payment_id.eq(payment_intent_id.to_owned())) + } + PaymentIntentFetchConstraints::List { + offset, + starting_at, + ending_at, + connector, + currency, + status, + payment_methods, + customer_id, + starting_after_id, + ending_before_id, + limit, + } => { + query = query.limit(limit.unwrap_or(QUERY_LIMIT).into()); + + if let Some(customer_id) = customer_id { + query = query.filter(pi_dsl::customer_id.eq(customer_id.clone())); + } + + query = match (starting_at, starting_after_id) { + (Some(starting_at), _) => query.filter(pi_dsl::created_at.ge(*starting_at)), + (None, Some(starting_after_id)) => { + // TODO: Fetch partial columns for this query since we only need some columns + let starting_at = self + .find_payment_intent_by_payment_id_merchant_id( + starting_after_id, + merchant_id, + storage_scheme, + ) + .await? + .created_at; + query.filter(pi_dsl::created_at.ge(starting_at)) + } + (None, None) => query, + }; + + query = match (ending_at, ending_before_id) { + (Some(ending_at), _) => query.filter(pi_dsl::created_at.le(*ending_at)), + (None, Some(ending_before_id)) => { + // TODO: Fetch partial columns for this query since we only need some columns + let ending_at = self + .find_payment_intent_by_payment_id_merchant_id( + ending_before_id, + merchant_id, + storage_scheme, + ) + .await? + .created_at; + query.filter(pi_dsl::created_at.le(ending_at)) + } + (None, None) => query, + }; + + query = match offset { + Some(offset) => query.offset((*offset).into()), + None => query, + }; + + query = match currency { + Some(currency) => query.filter(pi_dsl::currency.eq_any(currency.clone())), + None => query, + }; + + let connectors = connector + .as_ref() + .map(|c| c.iter().map(|c| c.to_string()).collect::>()); + + query = match connectors { + Some(connectors) => query.filter(pa_dsl::connector.eq_any(connectors)), + None => query, + }; + + query = match status { + Some(status) => query.filter(pi_dsl::status.eq_any(status.clone())), + None => query, + }; + + query = match payment_methods { + Some(payment_methods) => { + query.filter(pa_dsl::payment_method.eq_any(payment_methods.clone())) + } + None => query, + }; + + query + } + }; + + logger::debug!(filter = %diesel::debug_query::(&query).to_string()); + + query + .get_results_async::<(DieselPaymentIntent, DieselPaymentAttempt)>(conn) + .await + .map(|results| { + results + .into_iter() + .map(|(pi, pa)| { + ( + PaymentIntent::from_storage_model(pi), + PaymentAttempt::from_storage_model(pa), + ) + }) + .collect() + }) + .into_report() + .map_err(|er| { + let new_er = StorageError::DatabaseError(format!("{er:?}")); + er.change_context(new_er) + }) + .attach_printable("Error filtering payment records") + } +} + +impl DataModelExt for PaymentIntentNew { + type StorageModel = DieselPaymentIntentNew; + + fn to_storage_model(self) -> Self::StorageModel { + DieselPaymentIntentNew { + payment_id: self.payment_id, + merchant_id: self.merchant_id, + status: self.status, + amount: self.amount, + currency: self.currency, + amount_captured: self.amount_captured, + customer_id: self.customer_id, + description: self.description, + return_url: self.return_url, + metadata: self.metadata, + connector_id: self.connector_id, + shipping_address_id: self.shipping_address_id, + billing_address_id: self.billing_address_id, + statement_descriptor_name: self.statement_descriptor_name, + statement_descriptor_suffix: self.statement_descriptor_suffix, + created_at: self.created_at, + modified_at: self.modified_at, + last_synced: self.last_synced, + setup_future_usage: self.setup_future_usage, + off_session: self.off_session, + client_secret: self.client_secret, + active_attempt_id: self.active_attempt_id, + business_country: self.business_country, + business_label: self.business_label, + order_details: self.order_details, + allowed_payment_method_types: self.allowed_payment_method_types, + connector_metadata: self.connector_metadata, + feature_metadata: self.feature_metadata, + attempt_count: self.attempt_count, + } + } + + fn from_storage_model(storage_model: Self::StorageModel) -> Self { + Self { + payment_id: storage_model.payment_id, + merchant_id: storage_model.merchant_id, + status: storage_model.status, + amount: storage_model.amount, + currency: storage_model.currency, + amount_captured: storage_model.amount_captured, + customer_id: storage_model.customer_id, + description: storage_model.description, + return_url: storage_model.return_url, + metadata: storage_model.metadata, + connector_id: storage_model.connector_id, + shipping_address_id: storage_model.shipping_address_id, + billing_address_id: storage_model.billing_address_id, + statement_descriptor_name: storage_model.statement_descriptor_name, + statement_descriptor_suffix: storage_model.statement_descriptor_suffix, + created_at: storage_model.created_at, + modified_at: storage_model.modified_at, + last_synced: storage_model.last_synced, + setup_future_usage: storage_model.setup_future_usage, + off_session: storage_model.off_session, + client_secret: storage_model.client_secret, + active_attempt_id: storage_model.active_attempt_id, + business_country: storage_model.business_country, + business_label: storage_model.business_label, + order_details: storage_model.order_details, + allowed_payment_method_types: storage_model.allowed_payment_method_types, + connector_metadata: storage_model.connector_metadata, + feature_metadata: storage_model.feature_metadata, + attempt_count: storage_model.attempt_count, + } + } +} + +impl DataModelExt for PaymentIntent { + type StorageModel = DieselPaymentIntent; + + fn to_storage_model(self) -> Self::StorageModel { + DieselPaymentIntent { + id: self.id, + payment_id: self.payment_id, + merchant_id: self.merchant_id, + status: self.status, + amount: self.amount, + currency: self.currency, + amount_captured: self.amount_captured, + customer_id: self.customer_id, + description: self.description, + return_url: self.return_url, + metadata: self.metadata, + connector_id: self.connector_id, + shipping_address_id: self.shipping_address_id, + billing_address_id: self.billing_address_id, + statement_descriptor_name: self.statement_descriptor_name, + statement_descriptor_suffix: self.statement_descriptor_suffix, + created_at: self.created_at, + modified_at: self.modified_at, + last_synced: self.last_synced, + setup_future_usage: self.setup_future_usage, + off_session: self.off_session, + client_secret: self.client_secret, + active_attempt_id: self.active_attempt_id, + business_country: self.business_country, + business_label: self.business_label, + order_details: self.order_details, + allowed_payment_method_types: self.allowed_payment_method_types, + connector_metadata: self.connector_metadata, + feature_metadata: self.feature_metadata, + attempt_count: self.attempt_count, + } + } + + fn from_storage_model(storage_model: Self::StorageModel) -> Self { + Self { + id: storage_model.id, + payment_id: storage_model.payment_id, + merchant_id: storage_model.merchant_id, + status: storage_model.status, + amount: storage_model.amount, + currency: storage_model.currency, + amount_captured: storage_model.amount_captured, + customer_id: storage_model.customer_id, + description: storage_model.description, + return_url: storage_model.return_url, + metadata: storage_model.metadata, + connector_id: storage_model.connector_id, + shipping_address_id: storage_model.shipping_address_id, + billing_address_id: storage_model.billing_address_id, + statement_descriptor_name: storage_model.statement_descriptor_name, + statement_descriptor_suffix: storage_model.statement_descriptor_suffix, + created_at: storage_model.created_at, + modified_at: storage_model.modified_at, + last_synced: storage_model.last_synced, + setup_future_usage: storage_model.setup_future_usage, + off_session: storage_model.off_session, + client_secret: storage_model.client_secret, + active_attempt_id: storage_model.active_attempt_id, + business_country: storage_model.business_country, + business_label: storage_model.business_label, + order_details: storage_model.order_details, + allowed_payment_method_types: storage_model.allowed_payment_method_types, + connector_metadata: storage_model.connector_metadata, + feature_metadata: storage_model.feature_metadata, + attempt_count: storage_model.attempt_count, + } + } +} + +impl DataModelExt for PaymentIntentUpdate { + type StorageModel = DieselPaymentIntentUpdate; + + fn to_storage_model(self) -> Self::StorageModel { + match self { + Self::ResponseUpdate { + status, + amount_captured, + return_url, + } => DieselPaymentIntentUpdate::ResponseUpdate { + status, + amount_captured, + return_url, + }, + Self::MetadataUpdate { metadata } => { + DieselPaymentIntentUpdate::MetadataUpdate { metadata } + } + Self::ReturnUrlUpdate { + return_url, + status, + customer_id, + shipping_address_id, + billing_address_id, + } => DieselPaymentIntentUpdate::ReturnUrlUpdate { + return_url, + status, + customer_id, + shipping_address_id, + billing_address_id, + }, + Self::MerchantStatusUpdate { + status, + shipping_address_id, + billing_address_id, + } => DieselPaymentIntentUpdate::MerchantStatusUpdate { + status, + shipping_address_id, + billing_address_id, + }, + Self::PGStatusUpdate { status } => DieselPaymentIntentUpdate::PGStatusUpdate { status }, + Self::Update { + amount, + currency, + setup_future_usage, + status, + customer_id, + shipping_address_id, + billing_address_id, + return_url, + business_country, + business_label, + description, + statement_descriptor_name, + statement_descriptor_suffix, + order_details, + metadata, + } => DieselPaymentIntentUpdate::Update { + amount, + currency, + setup_future_usage, + status, + customer_id, + shipping_address_id, + billing_address_id, + return_url, + business_country, + business_label, + description, + statement_descriptor_name, + statement_descriptor_suffix, + order_details, + metadata, + }, + Self::PaymentAttemptAndAttemptCountUpdate { + active_attempt_id, + attempt_count, + } => DieselPaymentIntentUpdate::PaymentAttemptAndAttemptCountUpdate { + active_attempt_id, + attempt_count, + }, + Self::StatusAndAttemptUpdate { + status, + active_attempt_id, + attempt_count, + } => DieselPaymentIntentUpdate::StatusAndAttemptUpdate { + status, + active_attempt_id, + attempt_count, + }, + } + } + + fn from_storage_model(storage_model: Self::StorageModel) -> Self { + match storage_model { + DieselPaymentIntentUpdate::ResponseUpdate { + status, + amount_captured, + return_url, + } => Self::ResponseUpdate { + status, + amount_captured, + return_url, + }, + DieselPaymentIntentUpdate::MetadataUpdate { metadata } => { + Self::MetadataUpdate { metadata } + } + DieselPaymentIntentUpdate::ReturnUrlUpdate { + return_url, + status, + customer_id, + shipping_address_id, + billing_address_id, + } => Self::ReturnUrlUpdate { + return_url, + status, + customer_id, + shipping_address_id, + billing_address_id, + }, + DieselPaymentIntentUpdate::MerchantStatusUpdate { + status, + shipping_address_id, + billing_address_id, + } => Self::MerchantStatusUpdate { + status, + shipping_address_id, + billing_address_id, + }, + DieselPaymentIntentUpdate::PGStatusUpdate { status } => Self::PGStatusUpdate { status }, + DieselPaymentIntentUpdate::Update { + amount, + currency, + setup_future_usage, + status, + customer_id, + shipping_address_id, + billing_address_id, + return_url, + business_country, + business_label, + description, + statement_descriptor_name, + statement_descriptor_suffix, + order_details, + metadata, + } => Self::Update { + amount, + currency, + setup_future_usage, + status, + customer_id, + shipping_address_id, + billing_address_id, + return_url, + business_country, + business_label, + description, + statement_descriptor_name, + statement_descriptor_suffix, + order_details, + metadata, + }, + DieselPaymentIntentUpdate::PaymentAttemptAndAttemptCountUpdate { + active_attempt_id, + attempt_count, + } => Self::PaymentAttemptAndAttemptCountUpdate { + active_attempt_id, + attempt_count, + }, + DieselPaymentIntentUpdate::StatusAndAttemptUpdate { + status, + active_attempt_id, + attempt_count, + } => Self::StatusAndAttemptUpdate { + status, + active_attempt_id, + attempt_count, + }, + } + } +} diff --git a/crates/storage_impl/src/utils.rs b/crates/storage_impl/src/utils.rs new file mode 100644 index 0000000000..6d6e1cd540 --- /dev/null +++ b/crates/storage_impl/src/utils.rs @@ -0,0 +1,70 @@ +use bb8::PooledConnection; +use data_models::errors::StorageError; +use diesel::PgConnection; +use error_stack::{IntoReport, ResultExt}; + +use crate::{metrics, DatabaseStore}; + +pub async fn pg_connection_read( + store: &T, +) -> error_stack::Result< + PooledConnection<'_, async_bb8_diesel::ConnectionManager>, + StorageError, +> { + // If only OLAP is enabled get replica pool. + #[cfg(all(feature = "olap", not(feature = "oltp")))] + let pool = store.get_replica_pool(); + + // If either one of these are true we need to get master pool. + // 1. Only OLTP is enabled. + // 2. Both OLAP and OLTP is enabled. + // 3. Both OLAP and OLTP is disabled. + #[cfg(any( + all(not(feature = "olap"), feature = "oltp"), + all(feature = "olap", feature = "oltp"), + all(not(feature = "olap"), not(feature = "oltp")) + ))] + let pool = store.get_master_pool(); + + pool.get() + .await + .into_report() + .change_context(StorageError::DatabaseConnectionError) +} + +pub async fn pg_connection_write( + store: &T, +) -> error_stack::Result< + PooledConnection<'_, async_bb8_diesel::ConnectionManager>, + StorageError, +> { + // Since all writes should happen to master DB only choose master DB. + let pool = store.get_master_pool(); + + pool.get() + .await + .into_report() + .change_context(StorageError::DatabaseConnectionError) +} + +pub async fn try_redis_get_else_try_database_get( + redis_fut: RFut, + database_call_closure: F, +) -> error_stack::Result +where + F: FnOnce() -> DFut, + RFut: futures::Future>, + DFut: futures::Future>, +{ + let redis_output = redis_fut.await; + match redis_output { + Ok(output) => Ok(output), + Err(redis_error) => match redis_error.current_context() { + redis_interface::errors::RedisError::NotFound => { + metrics::KV_MISS.add(&metrics::CONTEXT, 1, &[]); + database_call_closure().await + } + _ => Err(redis_error.change_context(StorageError::KVError)), + }, + } +}