diff --git a/crates/diesel_models/src/kv.rs b/crates/diesel_models/src/kv.rs index 6c0666e590..ed984649e9 100644 --- a/crates/diesel_models/src/kv.rs +++ b/crates/diesel_models/src/kv.rs @@ -126,12 +126,7 @@ impl DBOperation { )), #[cfg(feature = "v2")] Updateable::PaymentAttemptUpdate(a) => DBResult::PaymentAttempt(Box::new( - a.orig - .update_with_attempt_id( - conn, - PaymentAttemptUpdateInternal::from(a.update_data), - ) - .await?, + a.orig.update_with_attempt_id(conn, a.update_data).await?, )), #[cfg(feature = "v1")] Updateable::RefundUpdate(a) => { @@ -263,12 +258,20 @@ pub struct PaymentIntentUpdateMems { pub update_data: PaymentIntentUpdateInternal, } +#[cfg(feature = "v1")] #[derive(Debug, Serialize, Deserialize)] pub struct PaymentAttemptUpdateMems { pub orig: PaymentAttempt, pub update_data: PaymentAttemptUpdate, } +#[cfg(feature = "v2")] +#[derive(Debug, Serialize, Deserialize)] +pub struct PaymentAttemptUpdateMems { + pub orig: PaymentAttempt, + pub update_data: PaymentAttemptUpdateInternal, +} + #[derive(Debug, Serialize, Deserialize)] pub struct RefundUpdateMems { pub orig: Refund, diff --git a/crates/diesel_models/src/payment_attempt.rs b/crates/diesel_models/src/payment_attempt.rs index 95fb600adc..0afea0574a 100644 --- a/crates/diesel_models/src/payment_attempt.rs +++ b/crates/diesel_models/src/payment_attempt.rs @@ -833,7 +833,7 @@ pub enum PaymentAttemptUpdate { // TODO: uncomment fields as and when required #[cfg(feature = "v2")] -#[derive(Clone, Debug, AsChangeset, router_derive::DebugAsDisplay)] +#[derive(Clone, Debug, AsChangeset, router_derive::DebugAsDisplay, Serialize, Deserialize)] #[diesel(table_name = payment_attempt)] pub struct PaymentAttemptUpdateInternal { pub status: Option, @@ -881,6 +881,112 @@ pub struct PaymentAttemptUpdateInternal { pub connector_request_reference_id: Option, } +#[cfg(feature = "v2")] +impl PaymentAttemptUpdateInternal { + pub fn apply_changeset(self, source: PaymentAttempt) -> PaymentAttempt { + let Self { + status, + authentication_type, + error_message, + connector_payment_id, + modified_at, + browser_info, + error_code, + connector_metadata, + error_reason, + amount_capturable, + amount_to_capture, + updated_by, + merchant_connector_id, + connector, + redirection_data, + unified_code, + unified_message, + connector_token_details, + feature_metadata, + network_decline_code, + network_advice_code, + network_error_message, + payment_method_id, + connector_request_reference_id, + } = self; + + PaymentAttempt { + payment_id: source.payment_id, + merchant_id: source.merchant_id, + status: status.unwrap_or(source.status), + connector: connector.or(source.connector), + error_message: error_message.or(source.error_message), + surcharge_amount: source.surcharge_amount, + payment_method_id: payment_method_id.or(source.payment_method_id), + authentication_type: authentication_type.unwrap_or(source.authentication_type), + created_at: source.created_at, + modified_at: common_utils::date_time::now(), + last_synced: source.last_synced, + cancellation_reason: source.cancellation_reason, + amount_to_capture: amount_to_capture.or(source.amount_to_capture), + browser_info: browser_info + .and_then(|val| { + serde_json::from_value::(val).ok() + }) + .or(source.browser_info), + error_code: error_code.or(source.error_code), + payment_token: source.payment_token, + connector_metadata: connector_metadata.or(source.connector_metadata), + payment_experience: source.payment_experience, + payment_method_data: source.payment_method_data, + preprocessing_step_id: source.preprocessing_step_id, + error_reason: error_reason.or(source.error_reason), + multiple_capture_count: source.multiple_capture_count, + connector_response_reference_id: source.connector_response_reference_id, + amount_capturable: amount_capturable.unwrap_or(source.amount_capturable), + updated_by, + merchant_connector_id: merchant_connector_id.or(source.merchant_connector_id), + encoded_data: source.encoded_data, + unified_code: unified_code.flatten().or(source.unified_code), + unified_message: unified_message.flatten().or(source.unified_message), + net_amount: source.net_amount, + external_three_ds_authentication_attempted: source + .external_three_ds_authentication_attempted, + authentication_connector: source.authentication_connector, + authentication_id: source.authentication_id, + fingerprint_id: source.fingerprint_id, + client_source: source.client_source, + client_version: source.client_version, + customer_acceptance: source.customer_acceptance, + profile_id: source.profile_id, + organization_id: source.organization_id, + card_network: source.card_network, + shipping_cost: source.shipping_cost, + order_tax_amount: source.order_tax_amount, + request_extended_authorization: source.request_extended_authorization, + extended_authorization_applied: source.extended_authorization_applied, + capture_before: source.capture_before, + card_discovery: source.card_discovery, + charges: source.charges, + processor_merchant_id: source.processor_merchant_id, + created_by: source.created_by, + payment_method_type_v2: source.payment_method_type_v2, + connector_payment_id: source.connector_payment_id, + payment_method_subtype: source.payment_method_subtype, + routing_result: source.routing_result, + authentication_applied: source.authentication_applied, + external_reference_id: source.external_reference_id, + tax_on_surcharge: source.tax_on_surcharge, + payment_method_billing_address: source.payment_method_billing_address, + redirection_data: redirection_data.or(source.redirection_data), + connector_payment_data: source.connector_payment_data, + connector_token_details: connector_token_details.or(source.connector_token_details), + id: source.id, + feature_metadata: feature_metadata.or(source.feature_metadata), + network_advice_code: network_advice_code.or(source.network_advice_code), + network_decline_code: network_decline_code.or(source.network_decline_code), + network_error_message: network_error_message.or(source.network_error_message), + connector_request_reference_id: connector_request_reference_id + .or(source.connector_request_reference_id), + } + } +} #[cfg(feature = "v1")] #[derive(Clone, Debug, AsChangeset, router_derive::DebugAsDisplay)] #[diesel(table_name = payment_attempt)] diff --git a/crates/diesel_models/src/payment_intent.rs b/crates/diesel_models/src/payment_intent.rs index ea0a9cead2..bcaefa512d 100644 --- a/crates/diesel_models/src/payment_intent.rs +++ b/crates/diesel_models/src/payment_intent.rs @@ -632,6 +632,122 @@ pub struct PaymentIntentUpdateInternal { pub is_iframe_redirection_enabled: Option, } +#[cfg(feature = "v2")] +impl PaymentIntentUpdateInternal { + pub fn apply_changeset(self, source: PaymentIntent) -> PaymentIntent { + let Self { + status, + prerouting_algorithm, + amount_captured, + modified_at: _, // This will be ignored from self + active_attempt_id, + amount, + currency, + shipping_cost, + tax_details, + skip_external_tax_calculation, + surcharge_applicable, + surcharge_amount, + tax_on_surcharge, + routing_algorithm_id, + capture_method, + authentication_type, + billing_address, + shipping_address, + customer_present, + description, + return_url, + setup_future_usage, + apply_mit_exemption, + statement_descriptor, + order_details, + allowed_payment_method_types, + metadata, + connector_metadata, + feature_metadata, + payment_link_config, + request_incremental_authorization, + session_expiry, + frm_metadata, + request_external_three_ds_authentication, + updated_by, + force_3ds_challenge, + is_iframe_redirection_enabled, + } = self; + + PaymentIntent { + status: status.unwrap_or(source.status), + prerouting_algorithm: prerouting_algorithm.or(source.prerouting_algorithm), + amount_captured: amount_captured.or(source.amount_captured), + modified_at: common_utils::date_time::now(), + active_attempt_id: match active_attempt_id { + Some(v_option) => v_option, + None => source.active_attempt_id, + }, + amount: amount.unwrap_or(source.amount), + currency: currency.unwrap_or(source.currency), + shipping_cost: shipping_cost.or(source.shipping_cost), + tax_details: tax_details.or(source.tax_details), + skip_external_tax_calculation: skip_external_tax_calculation + .or(source.skip_external_tax_calculation), + surcharge_applicable: surcharge_applicable.or(source.surcharge_applicable), + surcharge_amount: surcharge_amount.or(source.surcharge_amount), + tax_on_surcharge: tax_on_surcharge.or(source.tax_on_surcharge), + routing_algorithm_id: routing_algorithm_id.or(source.routing_algorithm_id), + capture_method: capture_method.or(source.capture_method), + authentication_type: authentication_type.or(source.authentication_type), + billing_address: billing_address.or(source.billing_address), + shipping_address: shipping_address.or(source.shipping_address), + customer_present: customer_present.or(source.customer_present), + description: description.or(source.description), + return_url: return_url.or(source.return_url), + setup_future_usage: setup_future_usage.or(source.setup_future_usage), + apply_mit_exemption: apply_mit_exemption.or(source.apply_mit_exemption), + statement_descriptor: statement_descriptor.or(source.statement_descriptor), + order_details: order_details.or(source.order_details), + allowed_payment_method_types: allowed_payment_method_types + .or(source.allowed_payment_method_types), + metadata: metadata.or(source.metadata), + connector_metadata: connector_metadata.or(source.connector_metadata), + feature_metadata: feature_metadata.or(source.feature_metadata), + payment_link_config: payment_link_config.or(source.payment_link_config), + request_incremental_authorization: request_incremental_authorization + .or(source.request_incremental_authorization), + session_expiry: session_expiry.unwrap_or(source.session_expiry), + frm_metadata: frm_metadata.or(source.frm_metadata), + request_external_three_ds_authentication: request_external_three_ds_authentication + .or(source.request_external_three_ds_authentication), + updated_by, + force_3ds_challenge: force_3ds_challenge.or(source.force_3ds_challenge), + is_iframe_redirection_enabled: is_iframe_redirection_enabled + .or(source.is_iframe_redirection_enabled), + + // Fields from source + merchant_id: source.merchant_id, + customer_id: source.customer_id, + created_at: source.created_at, + last_synced: source.last_synced, + attempt_count: source.attempt_count, + profile_id: source.profile_id, + payment_link_id: source.payment_link_id, + authorization_count: source.authorization_count, + customer_details: source.customer_details, + organization_id: source.organization_id, + request_extended_authorization: source.request_extended_authorization, + psd2_sca_exemption_type: source.psd2_sca_exemption_type, + split_payments: source.split_payments, + platform_merchant_id: source.platform_merchant_id, + force_3ds_challenge_trigger: source.force_3ds_challenge_trigger, + processor_merchant_id: source.processor_merchant_id, + created_by: source.created_by, + merchant_reference_id: source.merchant_reference_id, + frm_merchant_decision: source.frm_merchant_decision, + enable_payment_link: source.enable_payment_link, + id: source.id, + } + } +} + #[cfg(feature = "v1")] #[derive(Clone, Debug, AsChangeset, router_derive::DebugAsDisplay)] #[diesel(table_name = payment_intent)] diff --git a/crates/drainer/Cargo.toml b/crates/drainer/Cargo.toml index 1a02f9f3d9..800622cf36 100644 --- a/crates/drainer/Cargo.toml +++ b/crates/drainer/Cargo.toml @@ -11,6 +11,7 @@ license.workspace = true release = ["vergen", "external_services/aws_kms"] vergen = ["router_env/vergen"] v1 = ["diesel_models/v1", "hyperswitch_interfaces/v1", "common_utils/v1"] +v2 = ["diesel_models/v2", "hyperswitch_interfaces/v2", "common_utils/v2"] [dependencies] actix-web = "4.11.0" diff --git a/crates/hyperswitch_interfaces/Cargo.toml b/crates/hyperswitch_interfaces/Cargo.toml index 04cb644a1d..6b216d5c86 100644 --- a/crates/hyperswitch_interfaces/Cargo.toml +++ b/crates/hyperswitch_interfaces/Cargo.toml @@ -10,7 +10,7 @@ license.workspace = true default = ["dummy_connector", "frm", "payouts"] dummy_connector = [] v1 = ["hyperswitch_domain_models/v1", "api_models/v1", "common_utils/v1"] -v2 = [] +v2 = ["api_models/v2", "common_utils/v2", "hyperswitch_domain_models/v2"] payouts = ["hyperswitch_domain_models/payouts"] frm = ["hyperswitch_domain_models/frm"] revenue_recovery = [] diff --git a/crates/router/src/lib.rs b/crates/router/src/lib.rs index ed51b12ff0..6bcb17ad1a 100644 --- a/crates/router/src/lib.rs +++ b/crates/router/src/lib.rs @@ -116,7 +116,7 @@ pub fn mk_app( > { let mut server_app = get_application_builder(request_body_limit, state.conf.cors.clone()); - #[cfg(all(feature = "dummy_connector", feature = "v1"))] + #[cfg(feature = "dummy_connector")] { use routes::DummyConnector; server_app = server_app.service(DummyConnector::server(state.clone())); diff --git a/crates/router/src/routes/admin.rs b/crates/router/src/routes/admin.rs index d3ffb715b7..20de3c492e 100644 --- a/crates/router/src/routes/admin.rs +++ b/crates/router/src/routes/admin.rs @@ -915,6 +915,7 @@ pub async fn connector_delete( /// Merchant Account - Toggle KV /// /// Toggle KV mode for the Merchant Account +#[cfg(feature = "v1")] #[instrument(skip_all)] pub async fn merchant_account_toggle_kv( state: web::Data, @@ -938,6 +939,29 @@ pub async fn merchant_account_toggle_kv( .await } +#[cfg(feature = "v2")] +#[instrument(skip_all)] +pub async fn merchant_account_toggle_kv( + state: web::Data, + req: HttpRequest, + path: web::Path, + json_payload: web::Json, +) -> HttpResponse { + let flow = Flow::ConfigKeyUpdate; + let mut payload = json_payload.into_inner(); + payload.merchant_id = path.into_inner(); + + api::server_wrap( + flow, + state, + &req, + payload, + |state, _, payload, _| kv_for_merchant(state, payload.merchant_id, payload.kv_enabled), + &auth::V2AdminApiAuth, + api_locking::LockAction::NotApplicable, + ) + .await +} /// Merchant Account - Transfer Keys /// /// Transfer Merchant Encryption key to keymanager @@ -965,6 +989,7 @@ pub async fn merchant_account_toggle_all_kv( /// Merchant Account - KV Status /// /// Toggle KV mode for the Merchant Account +#[cfg(feature = "v1")] #[instrument(skip_all)] pub async fn merchant_account_kv_status( state: web::Data, @@ -986,6 +1011,27 @@ pub async fn merchant_account_kv_status( .await } +#[cfg(feature = "v2")] +#[instrument(skip_all)] +pub async fn merchant_account_kv_status( + state: web::Data, + req: HttpRequest, + path: web::Path, +) -> HttpResponse { + let flow = Flow::ConfigKeyFetch; + let merchant_id = path.into_inner(); + + api::server_wrap( + flow, + state, + &req, + merchant_id, + |state, _, req, _| check_merchant_account_kv_status(state, req), + &auth::V2AdminApiAuth, + api_locking::LockAction::NotApplicable, + ) + .await +} /// Merchant Account - KV Status /// /// Toggle KV mode for the Merchant Account diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index 3152a666ac..872f78c0f7 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -600,6 +600,22 @@ impl DummyConnector { } } +#[cfg(all(feature = "dummy_connector", feature = "v2"))] +impl DummyConnector { + pub fn server(state: AppState) -> Scope { + let mut routes_with_restricted_access = web::scope(""); + #[cfg(not(feature = "external_access_dc"))] + { + routes_with_restricted_access = + routes_with_restricted_access.guard(actix_web::guard::Host("localhost")); + } + routes_with_restricted_access = routes_with_restricted_access + .service(web::resource("/payment").route(web::post().to(dummy_connector_payment))); + web::scope("/dummy-connector") + .app_data(web::Data::new(state)) + .service(routes_with_restricted_access) + } +} pub struct Payments; #[cfg(all(any(feature = "olap", feature = "oltp"), feature = "v2"))] @@ -1556,6 +1572,11 @@ impl MerchantAccount { ) .service( web::resource("/profiles").route(web::get().to(profiles::profiles_list)), + ) + .service( + web::resource("/kv") + .route(web::post().to(admin::merchant_account_toggle_kv)) + .route(web::get().to(admin::merchant_account_kv_status)), ), ) } diff --git a/crates/router/src/routes/dummy_connector.rs b/crates/router/src/routes/dummy_connector.rs index 5d425c675a..c0a7bc9f7f 100644 --- a/crates/router/src/routes/dummy_connector.rs +++ b/crates/router/src/routes/dummy_connector.rs @@ -61,7 +61,7 @@ pub async fn dummy_connector_complete_payment( .await } -#[cfg(all(feature = "dummy_connector", feature = "v1"))] +#[cfg(feature = "dummy_connector")] #[instrument(skip_all, fields(flow = ?types::Flow::DummyPaymentCreate))] pub async fn dummy_connector_payment( state: web::Data, @@ -82,7 +82,7 @@ pub async fn dummy_connector_payment( .await } -#[cfg(all(feature = "dummy_connector", feature = "v1"))] +#[cfg(feature = "dummy_connector")] #[instrument(skip_all, fields(flow = ?types::Flow::DummyPaymentRetrieve))] pub async fn dummy_connector_payment_data( state: web::Data, diff --git a/crates/router/src/routes/dummy_connector/core.rs b/crates/router/src/routes/dummy_connector/core.rs index cafb6b58bc..0d7fb9627c 100644 --- a/crates/router/src/routes/dummy_connector/core.rs +++ b/crates/router/src/routes/dummy_connector/core.rs @@ -9,7 +9,7 @@ use crate::{ utils::OptionExt, }; -#[cfg(all(feature = "dummy_connector", feature = "v1"))] +#[cfg(feature = "dummy_connector")] pub async fn payment( state: SessionState, req: types::DummyConnectorPaymentRequest, diff --git a/crates/storage_impl/src/customers.rs b/crates/storage_impl/src/customers.rs index bedbcadd6a..53056a137c 100644 --- a/crates/storage_impl/src/customers.rs +++ b/crates/storage_impl/src/customers.rs @@ -23,6 +23,32 @@ use crate::{ impl KvStorePartition for customers::Customer {} +#[cfg(feature = "v2")] +mod label { + use common_utils::id_type; + + pub(super) const MODEL_NAME: &str = "customer_v2"; + pub(super) const CLUSTER_LABEL: &str = "cust"; + + pub(super) fn get_global_id_label(global_customer_id: &id_type::GlobalCustomerId) -> String { + format!( + "customer_global_id_{}", + global_customer_id.get_string_repr() + ) + } + + pub(super) fn get_merchant_scoped_id_label( + merchant_id: &id_type::MerchantId, + merchant_reference_id: &id_type::CustomerId, + ) -> String { + format!( + "customer_mid_{}_mrefid_{}", + merchant_id.get_string_repr(), + merchant_reference_id.get_string_repr() + ) + } +} + #[async_trait::async_trait] impl domain::CustomerInterface for kv_router_store::KVRouterStore { type Error = StorageError; @@ -283,22 +309,32 @@ impl domain::CustomerInterface for kv_router_store::KVRouterSt .construct_new() .await .change_context(StorageError::EncryptionError)?; - let storage_scheme = Box::pin(decide_storage_scheme::<_, customers::Customer>( + + let decided_storage_scheme = Box::pin(decide_storage_scheme::<_, customers::Customer>( self, storage_scheme, Op::Insert, )) .await; - new_customer.update_storage_scheme(storage_scheme); + new_customer.update_storage_scheme(decided_storage_scheme); + + let mut reverse_lookups = Vec::new(); + + if let Some(ref merchant_ref_id) = new_customer.merchant_reference_id { + let reverse_lookup_merchant_scoped_id = + label::get_merchant_scoped_id_label(&new_customer.merchant_id, merchant_ref_id); + reverse_lookups.push(reverse_lookup_merchant_scoped_id); + } + self.insert_resource( state, key_store, - storage_scheme, + decided_storage_scheme, new_customer.clone().insert(&conn), new_customer.clone().into(), kv_router_store::InsertResourceParams { insertable: kv::Insertable::Customer(new_customer.clone()), - reverse_lookups: vec![], + reverse_lookups, identifier, key, resource_type: "customer", diff --git a/crates/storage_impl/src/lib.rs b/crates/storage_impl/src/lib.rs index 814314f435..156ebee2de 100644 --- a/crates/storage_impl/src/lib.rs +++ b/crates/storage_impl/src/lib.rs @@ -378,6 +378,16 @@ impl UniqueConstraints for diesel_models::PaymentAttempt { } } +#[cfg(feature = "v2")] +impl UniqueConstraints for diesel_models::PaymentAttempt { + fn unique_constraints(&self) -> Vec { + vec![format!("pa_{}", self.id.get_string_repr())] + } + fn table_name(&self) -> &str { + "PaymentAttempt" + } +} + #[cfg(feature = "v1")] impl UniqueConstraints for diesel_models::Refund { fn unique_constraints(&self) -> Vec { diff --git a/crates/storage_impl/src/payments/payment_attempt.rs b/crates/storage_impl/src/payments/payment_attempt.rs index 8eb8222afb..78531b78ad 100644 --- a/crates/storage_impl/src/payments/payment_attempt.rs +++ b/crates/storage_impl/src/payments/payment_attempt.rs @@ -6,16 +6,17 @@ use common_utils::{ fallback_reverse_lookup_not_found, types::{ConnectorTransactionId, ConnectorTransactionIdTrait, CreatedBy}, }; +#[cfg(feature = "v1")] +use diesel_models::payment_attempt::PaymentAttemptNew as DieselPaymentAttemptNew; use diesel_models::{ enums::{ MandateAmountData as DieselMandateAmountData, MandateDataType as DieselMandateType, MandateDetails as DieselMandateDetails, MerchantStorageScheme, }, + kv, payment_attempt::PaymentAttempt as DieselPaymentAttempt, reverse_lookup::{ReverseLookup, ReverseLookupNew}, }; -#[cfg(feature = "v1")] -use diesel_models::{kv, payment_attempt::PaymentAttemptNew as DieselPaymentAttemptNew}; use error_stack::ResultExt; #[cfg(feature = "v1")] use hyperswitch_domain_models::payments::payment_attempt::PaymentAttemptNew; @@ -32,22 +33,21 @@ use hyperswitch_domain_models::{ use hyperswitch_domain_models::{ payments::payment_attempt::PaymentListFilters, payments::PaymentIntent, }; -#[cfg(feature = "v1")] +#[cfg(feature = "v2")] +use label::*; use redis_interface::HsetnxReply; use router_env::{instrument, tracing}; +#[cfg(feature = "v2")] +use crate::kv_router_store::{FilterResourceParams, FindResourceBy, UpdateResourceParams}; use crate::{ diesel_error_to_data_error, errors, + errors::RedisErrorExt, kv_router_store::KVRouterStore, lookup::ReverseLookupInterface, - utils::{pg_connection_read, pg_connection_write}, - DataModelExt, DatabaseStore, RouterStore, -}; -#[cfg(feature = "v1")] -use crate::{ - errors::RedisErrorExt, redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey}, - utils::try_redis_get_else_try_database_get, + utils::{pg_connection_read, pg_connection_write, try_redis_get_else_try_database_get}, + DataModelExt, DatabaseStore, RouterStore, }; #[async_trait::async_trait] @@ -747,15 +747,98 @@ impl PaymentAttemptInterface for KVRouterStore { payment_attempt: PaymentAttempt, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { - // Ignoring storage scheme for v2 implementation - self.router_store - .insert_payment_attempt( - key_manager_state, - merchant_key_store, - payment_attempt, - storage_scheme, - ) - .await + let decided_storage_scheme = Box::pin(decide_storage_scheme::<_, DieselPaymentAttempt>( + self, + storage_scheme, + Op::Insert, + )) + .await; + + match decided_storage_scheme { + MerchantStorageScheme::PostgresOnly => { + self.router_store + .insert_payment_attempt( + key_manager_state, + merchant_key_store, + payment_attempt, + decided_storage_scheme, + ) + .await + } + MerchantStorageScheme::RedisKv => { + let key = PartitionKey::GlobalPaymentId { + id: &payment_attempt.payment_id, + }; + let key_str = key.to_string(); + let field = format!( + "{}_{}", + label::CLUSTER_LABEL, + payment_attempt.id.get_string_repr() + ); + + let diesel_payment_attempt_new = payment_attempt + .clone() + .construct_new() + .await + .change_context(errors::StorageError::EncryptionError)?; + + let diesel_payment_attempt_for_redis: DieselPaymentAttempt = + Conversion::convert(payment_attempt.clone()) + .await + .change_context(errors::StorageError::EncryptionError)?; + + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Insert { + insertable: Box::new(kv::Insertable::PaymentAttempt(Box::new( + diesel_payment_attempt_new.clone(), + ))), + }, + }; + + let reverse_lookup_attempt_id = ReverseLookupNew { + lookup_id: label::get_global_id_label(&payment_attempt.id), + pk_id: key_str.clone(), + sk_id: field.clone(), + source: "payment_attempt".to_string(), + updated_by: decided_storage_scheme.to_string(), + }; + self.insert_reverse_lookup(reverse_lookup_attempt_id, decided_storage_scheme) + .await?; + + if let Some(ref conn_txn_id_val) = payment_attempt.connector_payment_id { + let reverse_lookup_conn_txn_id = ReverseLookupNew { + lookup_id: label::get_profile_id_connector_transaction_label( + payment_attempt.profile_id.get_string_repr(), + conn_txn_id_val, + ), + pk_id: key_str.clone(), + sk_id: field.clone(), + source: "payment_attempt".to_string(), + updated_by: decided_storage_scheme.to_string(), + }; + self.insert_reverse_lookup(reverse_lookup_conn_txn_id, decided_storage_scheme) + .await?; + } + + match Box::pin(kv_wrapper::( + self, + KvOperation::HSetNx(&field, &diesel_payment_attempt_for_redis, redis_entry), + key, + )) + .await + .map_err(|err| err.to_redis_failed_response(&key_str))? + .try_into_hsetnx() + { + Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue { + entity: "payment_attempt", + key: Some(payment_attempt.id.get_string_repr().to_owned()), + } + .into()), + Ok(HsetnxReply::KeySet) => Ok(payment_attempt), + Err(error) => Err(error.change_context(errors::StorageError::KVError)), + } + } + } } #[cfg(feature = "v1")] @@ -889,19 +972,48 @@ impl PaymentAttemptInterface for KVRouterStore { key_manager_state: &KeyManagerState, merchant_key_store: &MerchantKeyStore, this: PaymentAttempt, - payment_attempt: PaymentAttemptUpdate, + payment_attempt_update: PaymentAttemptUpdate, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { - // Ignoring storage scheme for v2 implementation - self.router_store - .update_payment_attempt( - key_manager_state, - merchant_key_store, - this, - payment_attempt, - storage_scheme, - ) + let payment_attempt = Conversion::convert(this.clone()) .await + .change_context(errors::StorageError::DecryptionError)?; + + let key = PartitionKey::GlobalPaymentId { + id: &this.payment_id, + }; + + let field = format!("{}_{}", label::CLUSTER_LABEL, this.id.get_string_repr()); + let conn = pg_connection_write(self).await?; + + let payment_attempt_internal = + diesel_models::PaymentAttemptUpdateInternal::from(payment_attempt_update); + let updated_payment_attempt = payment_attempt_internal + .clone() + .apply_changeset(payment_attempt.clone()); + + let updated_by = updated_payment_attempt.updated_by.to_owned(); + let updated_payment_attempt_with_id = payment_attempt + .clone() + .update_with_attempt_id(&conn, payment_attempt_internal.clone()); + + Box::pin(self.update_resource( + key_manager_state, + merchant_key_store, + storage_scheme, + updated_payment_attempt_with_id, + updated_payment_attempt, + UpdateResourceParams { + updateable: kv::Updateable::PaymentAttemptUpdate(Box::new( + kv::PaymentAttemptUpdateMems { + orig: payment_attempt, + update_data: payment_attempt_internal, + }, + )), + operation: Op::Update(key.clone(), &field, Some(updated_by.as_str())), + }, + )) + .await } #[cfg(feature = "v1")] @@ -1095,15 +1207,69 @@ impl PaymentAttemptInterface for KVRouterStore { payment_id: &common_utils::id_type::GlobalPaymentId, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { - // Ignoring storage scheme for v2 implementation - self.router_store - .find_payment_attempt_last_successful_or_partially_captured_attempt_by_payment_id( - key_manager_state, - merchant_key_store, - payment_id, - storage_scheme, - ) - .await + let database_call = || { + self.router_store + .find_payment_attempt_last_successful_or_partially_captured_attempt_by_payment_id( + key_manager_state, + merchant_key_store, + payment_id, + storage_scheme, + ) + }; + + let decided_storage_scheme = Box::pin(decide_storage_scheme::<_, DieselPaymentAttempt>( + self, + storage_scheme, + Op::Find, + )) + .await; + + match decided_storage_scheme { + MerchantStorageScheme::PostgresOnly => database_call().await, + MerchantStorageScheme::RedisKv => { + let key = PartitionKey::GlobalPaymentId { id: payment_id }; + + let redis_fut = async { + let kv_result = kv_wrapper::( + self, + KvOperation::::Scan("pa_*"), + key.clone(), + ) + .await? + .try_into_scan(); + + let payment_attempt = kv_result.and_then(|mut payment_attempts| { + payment_attempts.sort_by(|a, b| b.modified_at.cmp(&a.modified_at)); + payment_attempts + .iter() + .find(|&pa| { + pa.status == diesel_models::enums::AttemptStatus::Charged + || pa.status + == diesel_models::enums::AttemptStatus::PartialCharged + }) + .cloned() + .ok_or(error_stack::report!( + redis_interface::errors::RedisError::NotFound + )) + })?; + let merchant_id = payment_attempt.merchant_id.clone(); + PaymentAttempt::convert_back( + key_manager_state, + payment_attempt, + merchant_key_store.key.get_inner(), + merchant_id.into(), + ) + .await + .change_context(redis_interface::errors::RedisError::UnknownResult) + }; + + Box::pin(try_redis_get_else_try_database_get( + redis_fut, + database_call, + )) + .await + } + } } #[cfg(feature = "v2")] @@ -1115,16 +1281,22 @@ impl PaymentAttemptInterface for KVRouterStore { connector_transaction_id: &str, storage_scheme: MerchantStorageScheme, ) -> CustomResult { - // Ignoring storage scheme for v2 implementation - self.router_store - .find_payment_attempt_by_profile_id_connector_transaction_id( - key_manager_state, - merchant_key_store, + let conn = pg_connection_read(self).await?; + self.find_resource_by_id( + key_manager_state, + merchant_key_store, + storage_scheme, + DieselPaymentAttempt::find_by_profile_id_connector_transaction_id( + &conn, profile_id, connector_transaction_id, - storage_scheme, - ) - .await + ), + FindResourceBy::LookupId(label::get_profile_id_connector_transaction_label( + profile_id.get_string_repr(), + connector_transaction_id, + )), + ) + .await } #[instrument(skip_all)] @@ -1329,15 +1501,15 @@ impl PaymentAttemptInterface for KVRouterStore { attempt_id: &common_utils::id_type::GlobalAttemptId, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { - // Ignoring storage scheme for v2 implementation - self.router_store - .find_payment_attempt_by_id( - key_manager_state, - merchant_key_store, - attempt_id, - storage_scheme, - ) - .await + let conn = pg_connection_read(self).await?; + self.find_resource_by_id( + key_manager_state, + merchant_key_store, + storage_scheme, + DieselPaymentAttempt::find_by_id(&conn, attempt_id), + FindResourceBy::LookupId(label::get_global_id_label(attempt_id)), + ) + .await } #[cfg(feature = "v2")] @@ -1349,14 +1521,20 @@ impl PaymentAttemptInterface for KVRouterStore { merchant_key_store: &MerchantKeyStore, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result, errors::StorageError> { - self.router_store - .find_payment_attempts_by_payment_intent_id( - key_manager_state, - payment_id, - merchant_key_store, - storage_scheme, - ) - .await + let conn = pg_connection_read(self).await?; + self.filter_resources( + key_manager_state, + merchant_key_store, + storage_scheme, + DieselPaymentAttempt::find_by_payment_id(&conn, payment_id), + |_| true, + FilterResourceParams { + key: PartitionKey::GlobalPaymentId { id: payment_id }, + pattern: "pa_*", + limit: None, + }, + ) + .await } #[cfg(feature = "v1")] @@ -2036,3 +2214,25 @@ async fn add_preprocessing_id_to_reverse_lookup( .insert_reverse_lookup(reverse_lookup_new, storage_scheme) .await } + +#[cfg(feature = "v2")] +mod label { + pub(super) const MODEL_NAME: &str = "payment_attempt_v2"; + pub(super) const CLUSTER_LABEL: &str = "pa"; + + pub(super) fn get_profile_id_connector_transaction_label( + profile_id: &str, + connector_transaction_id: &str, + ) -> String { + format!( + "profile_{}_conn_txn_{}", + profile_id, connector_transaction_id + ) + } + + pub(super) fn get_global_id_label( + attempt_id: &common_utils::id_type::GlobalAttemptId, + ) -> String { + format!("attempt_global_id_{}", attempt_id.get_string_repr()) + } +} diff --git a/crates/storage_impl/src/payments/payment_intent.rs b/crates/storage_impl/src/payments/payment_intent.rs index 17156855e1..d054a4b165 100644 --- a/crates/storage_impl/src/payments/payment_intent.rs +++ b/crates/storage_impl/src/payments/payment_intent.rs @@ -2,13 +2,22 @@ use api_models::payments::{AmountFilter, Order, SortBy, SortOn}; #[cfg(feature = "olap")] use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; -#[cfg(feature = "v1")] -use common_utils::ext_traits::Encode; -use common_utils::{ext_traits::AsyncExt, types::keymanager::KeyManagerState}; +#[cfg(feature = "v2")] +use common_utils::fallback_reverse_lookup_not_found; +use common_utils::{ + ext_traits::{AsyncExt, Encode}, + types::keymanager::KeyManagerState, +}; #[cfg(feature = "olap")] use diesel::{associations::HasTable, ExpressionMethods, JoinOnDsl, QueryDsl}; +#[cfg(feature = "v1")] +use diesel_models::payment_intent::PaymentIntentUpdate as DieselPaymentIntentUpdate; +#[cfg(feature = "v2")] +use diesel_models::payment_intent::PaymentIntentUpdateInternal; #[cfg(feature = "olap")] use diesel_models::query::generics::db_metrics; +#[cfg(feature = "v2")] +use diesel_models::reverse_lookup::ReverseLookupNew; #[cfg(all(feature = "v1", feature = "olap"))] use diesel_models::schema::{ payment_attempt::{self as payment_attempt_schema, dsl as pa_dsl}, @@ -20,10 +29,8 @@ use diesel_models::schema_v2::{ payment_intent::dsl as pi_dsl, }; use diesel_models::{ - enums::MerchantStorageScheme, payment_intent::PaymentIntent as DieselPaymentIntent, + enums::MerchantStorageScheme, kv, payment_intent::PaymentIntent as DieselPaymentIntent, }; -#[cfg(feature = "v1")] -use diesel_models::{kv, payment_intent::PaymentIntentUpdate as DieselPaymentIntentUpdate}; use error_stack::ResultExt; #[cfg(feature = "olap")] use hyperswitch_domain_models::payments::{ @@ -37,7 +44,6 @@ use hyperswitch_domain_models::{ PaymentIntent, }, }; -#[cfg(feature = "v1")] use redis_interface::HsetnxReply; #[cfg(feature = "olap")] use router_env::logger; @@ -47,17 +53,14 @@ use router_env::{instrument, tracing}; use crate::connection; use crate::{ diesel_error_to_data_error, - errors::StorageError, + errors::{RedisErrorExt, StorageError}, kv_router_store::KVRouterStore, - utils::{pg_connection_read, pg_connection_write}, + redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey}, + utils::{self, pg_connection_read, pg_connection_write}, DatabaseStore, }; -#[cfg(feature = "v1")] -use crate::{ - errors::RedisErrorExt, - redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey}, - utils, -}; +#[cfg(feature = "v2")] +use crate::{errors, lookup::ReverseLookupInterface}; #[async_trait::async_trait] impl PaymentIntentInterface for KVRouterStore { @@ -163,7 +166,68 @@ impl PaymentIntentInterface for KVRouterStore { } MerchantStorageScheme::RedisKv => { - todo!("Implement payment intent insert for kv") + let id = payment_intent.id.clone(); + let key = PartitionKey::GlobalPaymentId { id: &id }; + let field = format!("pi_{}", id.get_string_repr()); + let key_str = key.to_string(); + + let new_payment_intent = payment_intent + .clone() + .construct_new() + .await + .change_context(StorageError::EncryptionError)?; + + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Insert { + insertable: Box::new(kv::Insertable::PaymentIntent(Box::new( + new_payment_intent, + ))), + }, + }; + + let diesel_payment_intent = payment_intent + .clone() + .convert() + .await + .change_context(StorageError::EncryptionError)?; + + if let Some(merchant_reference_id) = &payment_intent.merchant_reference_id { + let reverse_lookup = ReverseLookupNew { + lookup_id: format!( + "pi_merchant_reference_{}_{}", + payment_intent.profile_id.get_string_repr(), + merchant_reference_id.get_string_repr() + ), + pk_id: key_str.clone(), + sk_id: field.clone(), + source: "payment_intent".to_string(), + updated_by: storage_scheme.to_string(), + }; + self.insert_reverse_lookup(reverse_lookup, storage_scheme) + .await?; + } + + match Box::pin(kv_wrapper::( + self, + KvOperation::::HSetNx( + &field, + &diesel_payment_intent, + redis_entry, + ), + key, + )) + .await + .map_err(|err| err.to_redis_failed_response(&key_str))? + .try_into_hsetnx() + { + Ok(HsetnxReply::KeyNotSet) => Err(StorageError::DuplicateValue { + entity: "payment_intent", + key: Some(key_str), + } + .into()), + Ok(HsetnxReply::KeySet) => Ok(payment_intent), + Err(error) => Err(error.change_context(StorageError::KVError)), + } } } } @@ -300,7 +364,59 @@ impl PaymentIntentInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - todo!() + let id = this.id.clone(); + let merchant_id = this.merchant_id.clone(); + let key = PartitionKey::GlobalPaymentId { id: &id }; + let field = format!("pi_{}", id.get_string_repr()); + let key_str = key.to_string(); + + let diesel_intent_update = + PaymentIntentUpdateInternal::try_from(payment_intent_update) + .change_context(StorageError::DeserializationFailed)?; + let origin_diesel_intent = this + .convert() + .await + .change_context(StorageError::EncryptionError)?; + + let diesel_intent = diesel_intent_update + .clone() + .apply_changeset(origin_diesel_intent.clone()); + + let redis_value = diesel_intent + .encode_to_string_of_json() + .change_context(StorageError::SerializationFailed)?; + + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Update { + updatable: Box::new(kv::Updateable::PaymentIntentUpdate(Box::new( + kv::PaymentIntentUpdateMems { + orig: origin_diesel_intent, + update_data: diesel_intent_update, + }, + ))), + }, + }; + + Box::pin(kv_wrapper::<(), _, _>( + self, + KvOperation::::Hset((&field, redis_value), redis_entry), + key, + )) + .await + .map_err(|err| err.to_redis_failed_response(&key_str))? + .try_into_hset() + .change_context(StorageError::KVError)?; + + let payment_intent = PaymentIntent::convert_back( + state, + diesel_intent, + merchant_key_store.key.get_inner(), + merchant_id.into(), + ) + .await + .change_context(StorageError::DecryptionError)?; + + Ok(payment_intent) } } } @@ -372,18 +488,50 @@ impl PaymentIntentInterface for KVRouterStore { state: &KeyManagerState, id: &common_utils::id_type::GlobalPaymentId, merchant_key_store: &MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, + storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { - let conn: bb8::PooledConnection< - '_, - async_bb8_diesel::ConnectionManager, - > = pg_connection_read(self).await?; - let diesel_payment_intent = DieselPaymentIntent::find_by_global_id(&conn, id) - .await - .map_err(|er| { - let new_err = diesel_error_to_data_error(*er.current_context()); - er.change_context(new_err) - })?; + let storage_scheme = Box::pin(decide_storage_scheme::<_, DieselPaymentIntent>( + self, + storage_scheme, + Op::Find, + )) + .await; + + let database_call = || async { + let conn: bb8::PooledConnection< + '_, + async_bb8_diesel::ConnectionManager, + > = pg_connection_read(self).await?; + + DieselPaymentIntent::find_by_global_id(&conn, id) + .await + .map_err(|er| { + let new_err = diesel_error_to_data_error(*er.current_context()); + er.change_context(new_err) + }) + }; + + let diesel_payment_intent = match storage_scheme { + MerchantStorageScheme::PostgresOnly => database_call().await, + MerchantStorageScheme::RedisKv => { + let key = PartitionKey::GlobalPaymentId { id }; + let field = format!("pi_{}", id.get_string_repr()); + + Box::pin(utils::try_redis_get_else_try_database_get( + async { + Box::pin(kv_wrapper::( + self, + KvOperation::::HGet(&field), + key, + )) + .await? + .try_into_hget() + }, + database_call, + )) + .await + } + }?; let merchant_id = diesel_payment_intent.merchant_id.clone(); @@ -391,7 +539,7 @@ impl PaymentIntentInterface for KVRouterStore { state, diesel_payment_intent, merchant_key_store.key.get_inner(), - merchant_id.to_owned().into(), + merchant_id.into(), ) .await .change_context(StorageError::DecryptionError) @@ -523,7 +671,68 @@ impl PaymentIntentInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - todo!() + let lookup_id = format!( + "pi_merchant_reference_{}_{}", + profile_id.get_string_repr(), + merchant_reference_id.get_string_repr() + ); + + let lookup = fallback_reverse_lookup_not_found!( + self.get_lookup_by_lookup_id(&lookup_id, *storage_scheme) + .await, + self.router_store + .find_payment_intent_by_merchant_reference_id_profile_id( + state, + merchant_reference_id, + profile_id, + merchant_key_store, + storage_scheme, + ) + .await + ); + + let key = PartitionKey::CombinationKey { + combination: &lookup.pk_id, + }; + + let database_call = || async { + let conn = pg_connection_read(self).await?; + DieselPaymentIntent::find_by_merchant_reference_id_profile_id( + &conn, + merchant_reference_id, + profile_id, + ) + .await + .map_err(|er| { + let new_err = diesel_error_to_data_error(*er.current_context()); + er.change_context(new_err) + }) + }; + + let diesel_payment_intent = Box::pin(utils::try_redis_get_else_try_database_get( + async { + Box::pin(kv_wrapper::( + self, + KvOperation::::HGet(&lookup.sk_id), + key, + )) + .await? + .try_into_hget() + }, + database_call, + )) + .await?; + + let merchant_id = diesel_payment_intent.merchant_id.clone(); + + PaymentIntent::convert_back( + state, + diesel_payment_intent, + merchant_key_store.key.get_inner(), + merchant_id.into(), + ) + .await + .change_context(StorageError::DecryptionError) } } } @@ -607,9 +816,8 @@ impl PaymentIntentInterface for crate::RouterStore { _storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { let conn = pg_connection_write(self).await?; - let diesel_payment_intent_update = - diesel_models::PaymentIntentUpdateInternal::try_from(payment_intent) - .change_context(StorageError::DeserializationFailed)?; + let diesel_payment_intent_update = PaymentIntentUpdateInternal::try_from(payment_intent) + .change_context(StorageError::DeserializationFailed)?; let diesel_payment_intent = this .convert() .await diff --git a/crates/storage_impl/src/redis/kv_store.rs b/crates/storage_impl/src/redis/kv_store.rs index f09506bc76..fd583f500e 100644 --- a/crates/storage_impl/src/redis/kv_store.rs +++ b/crates/storage_impl/src/redis/kv_store.rs @@ -55,6 +55,10 @@ pub enum PartitionKey<'a> { GlobalId { id: &'a str, }, + #[cfg(feature = "v2")] + GlobalPaymentId { + id: &'a common_utils::id_type::GlobalPaymentId, + }, } // PartitionKey::MerchantIdPaymentId {merchant_id, payment_id} impl std::fmt::Display for PartitionKey<'_> { @@ -108,7 +112,11 @@ impl std::fmt::Display for PartitionKey<'_> { )), #[cfg(feature = "v2")] - PartitionKey::GlobalId { id } => f.write_str(&format!("cust_{id}",)), + PartitionKey::GlobalId { id } => f.write_str(&format!("global_cust_{id}",)), + #[cfg(feature = "v2")] + PartitionKey::GlobalPaymentId { id } => { + f.write_str(&format!("global_payment_{}", id.get_string_repr())) + } } } }