From 17703fe2cbd693cf4f417c061df3c42c63ebc745 Mon Sep 17 00:00:00 2001 From: Sahkal Poddar Date: Wed, 14 Aug 2024 14:56:34 +0530 Subject: [PATCH] feat(customer_v2): customer v2 refactor customer v2 update endpoint (#5490) Co-authored-by: Narayan Bhat Co-authored-by: hrithikesh026 Co-authored-by: Prajjwal Kumar Co-authored-by: Sanchith Hegde Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> --- crates/api_models/src/customers.rs | 114 ++ crates/api_models/src/events/customer.rs | 13 +- crates/diesel_models/src/customers.rs | 8 + crates/diesel_models/src/kv.rs | 5 + crates/diesel_models/src/query/customers.rs | 163 ++- .../hyperswitch_domain_models/src/customer.rs | 16 +- .../src/compatibility/stripe/customers.rs | 11 +- .../compatibility/stripe/customers/types.rs | 2 +- crates/router/src/core/customers.rs | 531 ++++++-- crates/router/src/core/payment_methods.rs | 27 +- .../router/src/core/payment_methods/cards.rs | 125 +- .../src/core/payment_methods/validator.rs | 12 + crates/router/src/core/payments.rs | 22 +- crates/router/src/core/payments/helpers.rs | 13 + .../payments/operations/payment_confirm.rs | 55 +- crates/router/src/core/payout_link.rs | 14 + crates/router/src/core/payouts.rs | 86 +- crates/router/src/core/payouts/validator.rs | 2 +- crates/router/src/db/customers.rs | 1070 +++++++++-------- crates/router/src/db/kafka_store.rs | 83 ++ crates/router/src/routes/app.rs | 5 +- crates/router/src/routes/customers.rs | 48 +- crates/router/src/routes/payment_methods.rs | 12 +- crates/router/src/types/api/customers.rs | 4 +- crates/router/src/utils.rs | 98 ++ crates/storage_impl/src/payouts/payouts.rs | 21 +- crates/storage_impl/src/redis/kv_store.rs | 20 + .../2024-07-30-100323_customer_v2/up.sql | 1 + 28 files changed, 1813 insertions(+), 768 deletions(-) diff --git a/crates/api_models/src/customers.rs b/crates/api_models/src/customers.rs index 8f76a3f835..c4261d624f 100644 --- a/crates/api_models/src/customers.rs +++ b/crates/api_models/src/customers.rs @@ -342,3 +342,117 @@ pub struct CustomerDeleteResponse { #[schema(example = false)] pub payment_methods_deleted: bool, } + +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] +#[derive(Debug, Default, Clone, Deserialize, Serialize, ToSchema)] +pub struct CustomerUpdateRequest { + /// The identifier for the customer object + #[schema(value_type = Option, max_length = 64, min_length = 1, example = "cus_y3oqhf46pyzuxjbcn2giaqnb44")] + pub customer_id: Option, + /// The identifier for the Merchant Account + #[schema(max_length = 255, example = "y3oqhf46pyzuxjbcn2giaqnb44")] + #[serde(skip)] + pub merchant_id: id_type::MerchantId, + /// The customer's name + #[schema(max_length = 255, value_type = Option, example = "Jon Test")] + pub name: Option>, + /// The customer's email address + #[schema(value_type = Option, max_length = 255, example = "JonTest@test.com")] + pub email: Option, + /// The customer's phone number + #[schema(value_type = Option, max_length = 255, example = "9123456789")] + pub phone: Option>, + /// An arbitrary string that you can attach to a customer object. + #[schema(max_length = 255, example = "First Customer", value_type = Option)] + pub description: Option, + /// The country code for the customer phone number + #[schema(max_length = 255, example = "+65")] + pub phone_country_code: Option, + /// The address for the customer + #[schema(value_type = Option)] + pub address: Option, + /// You can specify up to 50 keys, with key names up to 40 characters long and values up to 500 + /// characters long. Metadata is useful for storing additional, structured information on an + /// object. + #[schema(value_type = Option,example = json!({ "city": "NY", "unit": "245" }))] + pub metadata: Option, +} + +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] +impl CustomerUpdateRequest { + pub fn get_merchant_reference_id(&self) -> Option { + Some( + self.customer_id + .to_owned() + .unwrap_or_else(common_utils::generate_customer_id_of_default_length), + ) + } + pub fn get_address(&self) -> Option { + self.address.clone() + } +} + +#[cfg(all(feature = "v2", feature = "customer_v2"))] +#[derive(Debug, Default, Clone, Deserialize, Serialize, ToSchema)] +pub struct CustomerUpdateRequest { + /// The merchant identifier for the customer object. + #[schema(value_type = Option, max_length = 64, min_length = 1, example = "cus_y3oqhf46pyzuxjbcn2giaqnb44")] + pub merchant_reference_id: Option, + /// The customer's name + #[schema(max_length = 255, value_type = String, example = "Jon Test")] + pub name: Option>, + /// The customer's email address + #[schema(value_type = String, max_length = 255, example = "JonTest@test.com")] + pub email: Option, + /// The customer's phone number + #[schema(value_type = Option, max_length = 255, example = "9123456789")] + pub phone: Option>, + /// An arbitrary string that you can attach to a customer object. + #[schema(max_length = 255, example = "First Customer", value_type = Option)] + pub description: Option, + /// The country code for the customer phone number + #[schema(max_length = 255, example = "+65")] + pub phone_country_code: Option, + /// The default billing address for the customer + #[schema(value_type = Option)] + pub default_billing_address: Option, + /// The default shipping address for the customer + #[schema(value_type = Option)] + pub default_shipping_address: Option, + /// You can specify up to 50 keys, with key names up to 40 characters long and values up to 500 + /// characters long. Metadata is useful for storing additional, structured information on an + /// object. + #[schema(value_type = Option,example = json!({ "city": "NY", "unit": "245" }))] + pub metadata: Option, + /// The unique identifier of the payment method + #[schema(example = "card_rGK4Vi5iSW70MY7J2mIg")] + pub default_payment_method_id: Option, +} + +#[cfg(all(feature = "v2", feature = "customer_v2"))] +impl CustomerUpdateRequest { + pub fn get_merchant_reference_id(&self) -> Option { + self.merchant_reference_id.clone() + } + + pub fn get_default_customer_billing_address(&self) -> Option { + self.default_billing_address.clone() + } + + pub fn get_default_customer_shipping_address(&self) -> Option { + self.default_shipping_address.clone() + } +} + +#[derive(Default, Debug, serde::Deserialize, serde::Serialize, Clone)] +pub struct UpdateCustomerId(String); + +impl UpdateCustomerId { + pub fn get_global_id(&self) -> String { + self.0.clone() + } + + pub fn new(id: String) -> Self { + Self(id) + } +} diff --git a/crates/api_models/src/events/customer.rs b/crates/api_models/src/events/customer.rs index 8457d6b662..dedc1b82c8 100644 --- a/crates/api_models/src/events/customer.rs +++ b/crates/api_models/src/events/customer.rs @@ -1,6 +1,8 @@ use common_utils::events::{ApiEventMetric, ApiEventsType}; -use crate::customers::{CustomerDeleteResponse, CustomerId, CustomerRequest, CustomerResponse}; +use crate::customers::{ + CustomerDeleteResponse, CustomerId, CustomerRequest, CustomerResponse, CustomerUpdateRequest, +}; impl ApiEventMetric for CustomerDeleteResponse { fn get_api_event_type(&self) -> Option { @@ -33,3 +35,12 @@ impl ApiEventMetric for CustomerId { }) } } + +impl ApiEventMetric for CustomerUpdateRequest { + fn get_api_event_type(&self) -> Option { + self.get_merchant_reference_id() + .clone() + .map(|cid| ApiEventsType::Customer { customer_id: cid }) + } +} +// These needs to be fixed for v2 diff --git a/crates/diesel_models/src/customers.rs b/crates/diesel_models/src/customers.rs index 35595c05d9..7e4e27a5d3 100644 --- a/crates/diesel_models/src/customers.rs +++ b/crates/diesel_models/src/customers.rs @@ -261,6 +261,8 @@ pub struct CustomerUpdateInternal { pub connector_customer: Option, pub default_payment_method_id: Option>, pub updated_by: Option, + pub default_billing_address: Option, + pub default_shipping_address: Option, } #[cfg(all(feature = "v2", feature = "customer_v2"))] @@ -276,6 +278,8 @@ impl CustomerUpdateInternal { connector_customer, // address_id, default_payment_method_id, + default_billing_address, + default_shipping_address, .. } = self; @@ -292,6 +296,10 @@ impl CustomerUpdateInternal { default_payment_method_id: default_payment_method_id .flatten() .map_or(source.default_payment_method_id, Some), + default_billing_address: default_billing_address + .map_or(source.default_billing_address, Some), + default_shipping_address: default_shipping_address + .map_or(source.default_shipping_address, Some), ..source } } diff --git a/crates/diesel_models/src/kv.rs b/crates/diesel_models/src/kv.rs index 84bc58f4e1..1de2097b99 100644 --- a/crates/diesel_models/src/kv.rs +++ b/crates/diesel_models/src/kv.rs @@ -138,6 +138,7 @@ impl DBOperation { ) .await?, )), + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] Updateable::CustomerUpdate(cust) => DBResult::Customer(Box::new( Customer::update_by_customer_id_merchant_id( conn, @@ -147,6 +148,10 @@ impl DBOperation { ) .await?, )), + #[cfg(all(feature = "v2", feature = "customer_v2"))] + Updateable::CustomerUpdate(cust) => DBResult::Customer(Box::new( + Customer::update_by_id(conn, cust.orig.id.clone(), cust.update_data).await?, + )), }, }) } diff --git a/crates/diesel_models/src/query/customers.rs b/crates/diesel_models/src/query/customers.rs index ef0fa7d7e2..1dc35511c9 100644 --- a/crates/diesel_models/src/query/customers.rs +++ b/crates/diesel_models/src/query/customers.rs @@ -1,11 +1,12 @@ -#[cfg(all(feature = "v2", feature = "customer_v2"))] use common_utils::id_type; +#[cfg(all(feature = "v2", feature = "customer_v2"))] +use diesel::BoolExpressionMethods; #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] use diesel::BoolExpressionMethods; use diesel::{associations::HasTable, ExpressionMethods}; use super::generics; -#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] +// #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] use crate::errors; #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] use crate::schema::customers::dsl; @@ -22,52 +23,36 @@ impl CustomerNew { } } -#[cfg(all(feature = "v2", feature = "customer_v2"))] impl Customer { - pub async fn update_by_customer_id_merchant_id( - _conn: &PgPooledConn, - _customer_id: id_type::CustomerId, - _merchant_id: id_type::MerchantId, - _customer: CustomerUpdateInternal, + #[cfg(all(feature = "v2", feature = "customer_v2"))] + pub async fn update_by_id( + conn: &PgPooledConn, + id: String, + customer: CustomerUpdateInternal, ) -> StorageResult { - // match generics::generic_update_by_id::<::Table, _, _, _>( - // conn, - // (customer_id.clone(), merchant_id.clone()), - // customer, - // ) - // .await - // { - // Err(error) => match error.current_context() { - // errors::DatabaseError::NoFieldsToUpdate => { - // generics::generic_find_by_id::<::Table, _, _>( - // conn, - // (customer_id, merchant_id), - // ) - // .await - // } - // _ => Err(error), - // }, - // result => result, - todo!() - // } + match generics::generic_update_by_id::<::Table, _, _, _>( + conn, + id.clone(), + customer, + ) + .await + { + Err(error) => match error.current_context() { + errors::DatabaseError::NoFieldsToUpdate => { + generics::generic_find_by_id::<::Table, _, _>(conn, id).await + } + _ => Err(error), + }, + result => result, + } } - pub async fn find_by_id_merchant_id(conn: &PgPooledConn, id: &str) -> StorageResult { + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + pub async fn find_by_global_id(conn: &PgPooledConn, id: &String) -> StorageResult { generics::generic_find_by_id::<::Table, _, _>(conn, id.to_owned()).await } - pub async fn find_by_customer_id_merchant_id( - _conn: &PgPooledConn, - _customer_id: &id_type::CustomerId, - _merchant_id: &id_type::MerchantId, - ) -> StorageResult { - // generics::generic_find_by_id::<::Table, _, _>( - // conn, - // (customer_id.to_owned(), merchant_id.to_owned()), - // ) - // .await - todo!() - } - + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] pub async fn list_by_merchant_id( conn: &PgPooledConn, merchant_id: &id_type::MerchantId, @@ -82,26 +67,39 @@ impl Customer { .await } - pub async fn find_optional_by_customer_id_merchant_id( - _conn: &PgPooledConn, - _customer_id: &id_type::CustomerId, - _merchant_id: &id_type::MerchantId, + #[cfg(all(feature = "v2", feature = "customer_v2"))] + pub async fn find_optional_by_merchant_id_merchant_reference_id( + conn: &PgPooledConn, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, ) -> StorageResult> { - // generics::generic_find_by_id_optional::<::Table, _, _>( - // conn, - // (customer_id.to_owned(), merchant_id.to_owned()), - // ) - // .await - todo!() + generics::generic_find_one_optional::<::Table, _, _>( + conn, + dsl::merchant_id + .eq(merchant_id.to_owned()) + .and(dsl::merchant_reference_id.eq(customer_id.to_owned())), + ) + .await } -} -#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] -impl Customer { + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + pub async fn find_optional_by_customer_id_merchant_id( + conn: &PgPooledConn, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + ) -> StorageResult> { + generics::generic_find_by_id_optional::<::Table, _, _>( + conn, + (customer_id.to_owned(), merchant_id.to_owned()), + ) + .await + } + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] pub async fn update_by_customer_id_merchant_id( conn: &PgPooledConn, - customer_id: common_utils::id_type::CustomerId, - merchant_id: common_utils::id_type::MerchantId, + customer_id: id_type::CustomerId, + merchant_id: id_type::MerchantId, customer: CustomerUpdateInternal, ) -> StorageResult { match generics::generic_update_by_id::<::Table, _, _, _>( @@ -125,10 +123,11 @@ impl Customer { } } + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] pub async fn delete_by_customer_id_merchant_id( conn: &PgPooledConn, - customer_id: &common_utils::id_type::CustomerId, - merchant_id: &common_utils::id_type::MerchantId, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, ) -> StorageResult { generics::generic_delete::<::Table, _>( conn, @@ -139,10 +138,26 @@ impl Customer { .await } + #[cfg(all(feature = "v2", feature = "customer_v2"))] + pub async fn find_by_merchant_reference_id_merchant_id( + conn: &PgPooledConn, + merchant_reference_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + ) -> StorageResult { + generics::generic_find_one::<::Table, _, _>( + conn, + dsl::merchant_id + .eq(merchant_id.to_owned()) + .and(dsl::merchant_reference_id.eq(merchant_reference_id.to_owned())), + ) + .await + } + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] pub async fn find_by_customer_id_merchant_id( conn: &PgPooledConn, - customer_id: &common_utils::id_type::CustomerId, - merchant_id: &common_utils::id_type::MerchantId, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, ) -> StorageResult { generics::generic_find_by_id::<::Table, _, _>( conn, @@ -150,30 +165,4 @@ impl Customer { ) .await } - - pub async fn list_by_merchant_id( - conn: &PgPooledConn, - merchant_id: &common_utils::id_type::MerchantId, - ) -> StorageResult> { - generics::generic_filter::<::Table, _, _, _>( - conn, - dsl::merchant_id.eq(merchant_id.to_owned()), - None, - None, - Some(dsl::created_at), - ) - .await - } - - pub async fn find_optional_by_customer_id_merchant_id( - conn: &PgPooledConn, - customer_id: &common_utils::id_type::CustomerId, - merchant_id: &common_utils::id_type::MerchantId, - ) -> StorageResult> { - generics::generic_find_by_id_optional::<::Table, _, _>( - conn, - (customer_id.to_owned(), merchant_id.to_owned()), - ) - .await - } } diff --git a/crates/hyperswitch_domain_models/src/customer.rs b/crates/hyperswitch_domain_models/src/customer.rs index 7f217632de..5559a8c0e7 100644 --- a/crates/hyperswitch_domain_models/src/customer.rs +++ b/crates/hyperswitch_domain_models/src/customer.rs @@ -290,6 +290,9 @@ pub enum CustomerUpdate { phone_country_code: Option, metadata: Option, connector_customer: Option, + default_billing_address: Option, + default_shipping_address: Option, + default_payment_method_id: Option>, }, ConnectorCustomer { connector_customer: Option, @@ -311,6 +314,9 @@ impl From for CustomerUpdateInternal { phone_country_code, metadata, connector_customer, + default_billing_address, + default_shipping_address, + default_payment_method_id, } => Self { name: name.map(Encryption::from), email: email.map(Encryption::from), @@ -320,20 +326,24 @@ impl From for CustomerUpdateInternal { metadata, connector_customer, modified_at: date_time::now(), - default_payment_method_id: None, + default_billing_address, + default_shipping_address, + default_payment_method_id, updated_by: None, }, CustomerUpdate::ConnectorCustomer { connector_customer } => Self { connector_customer, - modified_at: date_time::now(), name: None, email: None, phone: None, description: None, phone_country_code: None, metadata: None, + modified_at: date_time::now(), default_payment_method_id: None, updated_by: None, + default_billing_address: None, + default_shipping_address: None, }, CustomerUpdate::UpdateDefaultPaymentMethod { default_payment_method_id, @@ -348,6 +358,8 @@ impl From for CustomerUpdateInternal { metadata: None, connector_customer: None, updated_by: None, + default_billing_address: None, + default_shipping_address: None, }, } } diff --git a/crates/router/src/compatibility/stripe/customers.rs b/crates/router/src/compatibility/stripe/customers.rs index 39616e007b..330610a9a3 100644 --- a/crates/router/src/compatibility/stripe/customers.rs +++ b/crates/router/src/compatibility/stripe/customers.rs @@ -110,8 +110,9 @@ pub async fn customer_update( }; let customer_id = path.into_inner(); - let mut cust_update_req: customer_types::CustomerRequest = payload.into(); + let mut cust_update_req: customer_types::CustomerUpdateRequest = payload.into(); cust_update_req.customer_id = Some(customer_id); + let customer_update_id = customer_types::UpdateCustomerId::new("temp_global_id".to_string()); let flow = Flow::CustomersUpdate; @@ -130,7 +131,13 @@ pub async fn customer_update( &req, cust_update_req, |state, auth, req, _| { - customers::update_customer(state, auth.merchant_account, req, auth.key_store) + customers::update_customer( + state, + auth.merchant_account, + req, + auth.key_store, + customer_update_id.clone(), + ) }, &auth::HeaderAuth(auth::ApiKeyAuth), api_locking::LockAction::NotApplicable, diff --git a/crates/router/src/compatibility/stripe/customers/types.rs b/crates/router/src/compatibility/stripe/customers/types.rs index 82b787445c..9f5c546a18 100644 --- a/crates/router/src/compatibility/stripe/customers/types.rs +++ b/crates/router/src/compatibility/stripe/customers/types.rs @@ -135,7 +135,7 @@ impl From for api::CustomerRequest { } #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] -impl From for api::CustomerRequest { +impl From for api::CustomerUpdateRequest { fn from(req: CustomerUpdateRequest) -> Self { Self { name: req.name, diff --git a/crates/router/src/core/customers.rs b/crates/router/src/core/customers.rs index e3169e611f..d6e66f0fb9 100644 --- a/crates/router/src/core/customers.rs +++ b/crates/router/src/core/customers.rs @@ -1,9 +1,9 @@ use api_models::customers::CustomerRequestWithEmail; #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] -use common_utils::{crypto::Encryptable, ext_traits::OptionExt, types::Description}; +use common_utils::{crypto::Encryptable, types::Description}; use common_utils::{ errors::ReportSwitchExt, - ext_traits::AsyncExt, + ext_traits::{AsyncExt, OptionExt}, id_type, type_name, types::keymanager::{Identifier, KeyManagerState, ToEncryptable}, }; @@ -23,15 +23,13 @@ use crate::{ types::{ api::customers, domain::{self, types}, + storage::{self}, transformers::ForeignFrom, }, }; #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] use crate::{ - core::payment_methods::cards, - routes::metrics, - types::storage::{self, enums}, - utils::CustomerAddress, + core::payment_methods::cards, routes::metrics, types::storage::enums, utils::CustomerAddress, }; pub const REDACTED: &str = "Redacted"; @@ -51,7 +49,7 @@ pub async fn create_customer( let merchant_id = merchant_account.get_id(); let merchant_reference_id_customer = MerchantReferenceIdForCustomer { - customer_id: merchant_reference_id.as_ref(), + merchant_reference_id: merchant_reference_id.as_ref(), merchant_id, merchant_account: &merchant_account, key_store: &key_store, @@ -65,7 +63,7 @@ pub async fn create_customer( // it errors out, now the address that was inserted is not deleted merchant_reference_id_customer - .verify_if_customer_not_present_by_optional_merchant_reference_id(db) + .verify_if_merchant_reference_not_present_by_optional_merchant_reference_id(db) .await?; let domain_customer = customer_data @@ -332,28 +330,29 @@ impl<'a> AddressStructForDbEntry<'a> { } struct MerchantReferenceIdForCustomer<'a> { - customer_id: Option<&'a id_type::CustomerId>, + merchant_reference_id: Option<&'a id_type::CustomerId>, merchant_id: &'a id_type::MerchantId, merchant_account: &'a domain::MerchantAccount, key_store: &'a domain::MerchantKeyStore, key_manager_state: &'a KeyManagerState, } +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] impl<'a> MerchantReferenceIdForCustomer<'a> { - async fn verify_if_customer_not_present_by_optional_merchant_reference_id( + async fn verify_if_merchant_reference_not_present_by_optional_merchant_reference_id( &self, db: &dyn StorageInterface, ) -> Result, error_stack::Report> { - self.customer_id + self.merchant_reference_id .async_map(|cust| async { - self.verify_if_customer_not_present_by_merchant_reference(cust, db) + self.verify_if_merchant_reference_not_present_by_merchant_reference_id(cust, db) .await }) .await .transpose() } - async fn verify_if_customer_not_present_by_merchant_reference( + async fn verify_if_merchant_reference_not_present_by_merchant_reference_id( &self, cus: &'a id_type::CustomerId, db: &dyn StorageInterface, @@ -382,6 +381,53 @@ impl<'a> MerchantReferenceIdForCustomer<'a> { } } +#[cfg(all(feature = "v2", feature = "customer_v2"))] +impl<'a> MerchantReferenceIdForCustomer<'a> { + async fn verify_if_merchant_reference_not_present_by_optional_merchant_reference_id( + &self, + db: &dyn StorageInterface, + ) -> Result, error_stack::Report> { + self.merchant_reference_id + .async_map(|merchant_ref| async { + self.verify_if_merchant_reference_not_present_by_merchant_reference( + merchant_ref, + db, + ) + .await + }) + .await + .transpose() + } + + async fn verify_if_merchant_reference_not_present_by_merchant_reference( + &self, + merchant_ref: &'a id_type::CustomerId, + db: &dyn StorageInterface, + ) -> Result<(), error_stack::Report> { + match db + .find_customer_by_merchant_reference_id_merchant_id( + self.key_manager_state, + merchant_ref, + self.merchant_id, + self.key_store, + self.merchant_account.storage_scheme, + ) + .await + { + Err(err) => { + if !err.current_context().is_db_not_found() { + Err(err).switch() + } else { + Ok(()) + } + } + Ok(_) => Err(report!( + errors::CustomersErrorResponse::CustomerAlreadyExists + )), + } + } +} + #[cfg(all(any(feature = "v1", feature = "v2",), not(feature = "customer_v2")))] #[instrument(skip(state))] pub async fn retrieve_customer( @@ -613,143 +659,390 @@ pub async fn delete_customer( Ok(services::ApplicationResponse::Json(response)) } -#[cfg(all( - any(feature = "v1", feature = "v2", feature = "oltp"), - not(feature = "customer_v2") -))] #[instrument(skip(state))] pub async fn update_customer( state: SessionState, merchant_account: domain::MerchantAccount, - update_customer: customers::CustomerRequest, + update_customer: customers::CustomerUpdateRequest, key_store: domain::MerchantKeyStore, + id: customers::UpdateCustomerId, ) -> errors::CustomerResponse { let db = state.store.as_ref(); + let key_manager_state = &(&state).into(); //Add this in update call if customer can be updated anywhere else - let customer_id = update_customer - .customer_id - .as_ref() - .get_required_value("customer_id") - .change_context(errors::CustomersErrorResponse::InternalServerError) - .attach("Missing required field `customer_id`")?; - let key_manager_state = &(&state).into(); - let customer = db - .find_customer_by_customer_id_merchant_id( - key_manager_state, - customer_id, - merchant_account.get_id(), + let merchant_reference_id = update_customer.get_merchant_reference_id(); + + let verify_id_for_update_customer = VerifyIdForUpdateCustomer { + merchant_reference_id: merchant_reference_id.as_ref(), + id: &id, + merchant_account: &merchant_account, + key_store: &key_store, + key_manager_state, + }; + + let customer = verify_id_for_update_customer + .verify_id_and_get_customer_object(db) + .await?; + + let updated_customer = update_customer + .create_domain_model_from_request( + db, &key_store, - merchant_account.storage_scheme, + &merchant_account, + key_manager_state, + &state, + &customer, ) - .await - .switch()?; + .await?; - let key = key_store.key.get_inner().peek(); + update_customer.generate_response(&updated_customer) +} - let address = if let Some(addr) = &update_customer.address { - match customer.address_id.clone() { - Some(address_id) => { - let customer_address: api_models::payments::AddressDetails = addr.clone(); - let update_address = update_customer - .get_address_update( - &state, - customer_address, - key, - merchant_account.storage_scheme, - merchant_account.get_id().clone(), - ) - .await - .switch() - .attach_printable("Failed while encrypting Address while Update")?; - Some( - db.update_address(key_manager_state, address_id, update_address, &key_store) +#[async_trait::async_trait] +trait CustomerUpdateBridge { + async fn create_domain_model_from_request<'a>( + &'a self, + db: &'a dyn StorageInterface, + key_store: &'a domain::MerchantKeyStore, + merchant_account: &'a domain::MerchantAccount, + key_manager_state: &'a KeyManagerState, + state: &'a SessionState, + domain_customer: &'a domain::Customer, + ) -> errors::CustomResult; + + fn generate_response<'a>( + &'a self, + customer: &'a domain::Customer, + ) -> errors::CustomerResponse; +} + +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] +struct AddressStructForDbUpdate<'a> { + update_customer: &'a customers::CustomerUpdateRequest, + merchant_account: &'a domain::MerchantAccount, + key_store: &'a domain::MerchantKeyStore, + key_manager_state: &'a KeyManagerState, + state: &'a SessionState, + domain_customer: &'a domain::Customer, +} + +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] +impl<'a> AddressStructForDbUpdate<'a> { + async fn update_address_if_sent( + &self, + db: &dyn StorageInterface, + ) -> errors::CustomResult, errors::CustomersErrorResponse> { + let address = if let Some(addr) = &self.update_customer.address { + match self.domain_customer.address_id.clone() { + Some(address_id) => { + let customer_address: api_models::payments::AddressDetails = addr.clone(); + let update_address = self + .update_customer + .get_address_update( + self.state, + customer_address, + self.key_store.key.get_inner().peek(), + self.merchant_account.storage_scheme, + self.merchant_account.get_id().clone(), + ) + .await + .switch() + .attach_printable("Failed while encrypting Address while Update")?; + Some( + db.update_address( + self.key_manager_state, + address_id, + update_address, + self.key_store, + ) .await .switch() .attach_printable(format!( "Failed while updating address: merchant_id: {:?}, customer_id: {:?}", - merchant_account.get_id(), - customer_id + self.merchant_account.get_id(), + self.domain_customer.customer_id ))?, - ) - } - None => { - let customer_address: api_models::payments::AddressDetails = addr.clone(); - - let address = update_customer - .get_domain_address( - &state, - customer_address, - merchant_account.get_id(), - &customer.customer_id, - key, - merchant_account.storage_scheme, ) - .await - .switch() - .attach_printable("Failed while encrypting address")?; - Some( - db.insert_address_for_customers(key_manager_state, address, &key_store) + } + None => { + let customer_address: api_models::payments::AddressDetails = addr.clone(); + + let address = self + .update_customer + .get_domain_address( + self.state, + customer_address, + self.merchant_account.get_id(), + &self.domain_customer.customer_id, + self.key_store.key.get_inner().peek(), + self.merchant_account.storage_scheme, + ) + .await + .switch() + .attach_printable("Failed while encrypting address")?; + Some( + db.insert_address_for_customers( + self.key_manager_state, + address, + self.key_store, + ) .await .switch() .attach_printable("Failed while inserting new address")?, - ) + ) + } } - } - } else { - match &customer.address_id { - Some(address_id) => Some( - db.find_address_by_address_id(key_manager_state, address_id, &key_store) + } else { + match &self.domain_customer.address_id { + Some(address_id) => Some( + db.find_address_by_address_id( + self.key_manager_state, + address_id, + self.key_store, + ) .await .switch()?, - ), - None => None, - } - }; - let encrypted_data = types::crypto_operation( - &(&state).into(), - type_name!(domain::Customer), - types::CryptoOperation::BatchEncrypt(CustomerRequestWithEmail::to_encryptable( - CustomerRequestWithEmail { - name: update_customer.name.clone(), - email: update_customer.email.clone(), - phone: update_customer.phone.clone(), - }, - )), - Identifier::Merchant(key_store.merchant_id.clone()), - key, - ) - .await - .and_then(|val| val.try_into_batchoperation()) - .switch()?; - let encryptable_customer = CustomerRequestWithEmail::from_encryptable(encrypted_data) - .change_context(errors::CustomersErrorResponse::InternalServerError)?; + ), + None => None, + } + }; + Ok(address) + } +} - let response = db - .update_customer_by_customer_id_merchant_id( +struct VerifyIdForUpdateCustomer<'a> { + merchant_reference_id: Option<&'a id_type::CustomerId>, + id: &'a customers::UpdateCustomerId, + merchant_account: &'a domain::MerchantAccount, + key_store: &'a domain::MerchantKeyStore, + key_manager_state: &'a KeyManagerState, +} + +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] +impl<'a> VerifyIdForUpdateCustomer<'a> { + async fn verify_id_and_get_customer_object( + &self, + db: &dyn StorageInterface, + ) -> Result> { + let customer_id = self + .merchant_reference_id + .get_required_value("customer_id") + .change_context(errors::CustomersErrorResponse::InternalServerError) + .attach("Missing required field `customer_id`")?; + + let _id = self.id; + + let customer = db + .find_customer_by_customer_id_merchant_id( + self.key_manager_state, + customer_id, + self.merchant_account.get_id(), + self.key_store, + self.merchant_account.storage_scheme, + ) + .await + .switch()?; + + Ok(customer) + } +} + +#[cfg(all(feature = "v2", feature = "customer_v2"))] +impl<'a> VerifyIdForUpdateCustomer<'a> { + async fn verify_id_and_get_customer_object( + &self, + db: &dyn StorageInterface, + ) -> Result> { + let id = self.id.get_global_id(); + + let _merchant_reference_id = self.merchant_reference_id; + let customer = db + .find_customer_by_global_id( + self.key_manager_state, + &id, + &self.merchant_account.get_id(), + self.key_store, + self.merchant_account.storage_scheme, + ) + .await + .switch()?; + + Ok(customer) + } +} + +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] +#[async_trait::async_trait] +impl CustomerUpdateBridge for customers::CustomerUpdateRequest { + async fn create_domain_model_from_request<'a>( + &'a self, + db: &'a dyn StorageInterface, + key_store: &'a domain::MerchantKeyStore, + merchant_account: &'a domain::MerchantAccount, + key_manager_state: &'a KeyManagerState, + state: &'a SessionState, + domain_customer: &'a domain::Customer, + ) -> errors::CustomResult { + let update_address_for_update_customer = AddressStructForDbUpdate { + update_customer: self, + merchant_account, + key_store, key_manager_state, - customer_id.to_owned(), - merchant_account.get_id().to_owned(), - customer, - storage::CustomerUpdate::Update { - name: encryptable_customer.name, - email: encryptable_customer.email, - phone: Box::new(encryptable_customer.phone), - phone_country_code: update_customer.phone_country_code, - metadata: update_customer.metadata, - description: update_customer.description, - connector_customer: None, - address_id: address.clone().map(|addr| addr.address_id), - }, - &key_store, - merchant_account.storage_scheme, + state, + domain_customer, + }; + + let address = update_address_for_update_customer + .update_address_if_sent(db) + .await?; + + let key = key_store.key.get_inner().peek(); + + let encrypted_data = types::crypto_operation( + key_manager_state, + type_name!(domain::Customer), + types::CryptoOperation::BatchEncrypt(CustomerRequestWithEmail::to_encryptable( + CustomerRequestWithEmail { + name: self.name.clone(), + email: self.email.clone(), + phone: self.phone.clone(), + }, + )), + Identifier::Merchant(key_store.merchant_id.clone()), + key, ) .await + .and_then(|val| val.try_into_batchoperation()) .switch()?; - Ok(services::ApplicationResponse::Json( - customers::CustomerResponse::foreign_from((response, update_customer.address)), - )) + let encryptable_customer = CustomerRequestWithEmail::from_encryptable(encrypted_data) + .change_context(errors::CustomersErrorResponse::InternalServerError)?; + + let response = db + .update_customer_by_customer_id_merchant_id( + key_manager_state, + domain_customer.customer_id.to_owned(), + merchant_account.get_id().to_owned(), + domain_customer.to_owned(), + storage::CustomerUpdate::Update { + name: encryptable_customer.name, + email: encryptable_customer.email, + phone: Box::new(encryptable_customer.phone), + phone_country_code: self.phone_country_code.clone(), + metadata: self.metadata.clone(), + description: self.description.clone(), + connector_customer: None, + address_id: address.clone().map(|addr| addr.address_id), + }, + key_store, + merchant_account.storage_scheme, + ) + .await + .switch()?; + + Ok(response) + } + + fn generate_response<'a>( + &'a self, + customer: &'a domain::Customer, + ) -> errors::CustomerResponse { + let address = self.get_address(); + let address_details = address.map(api_models::payments::AddressDetails::from); + + Ok(services::ApplicationResponse::Json( + customers::CustomerResponse::foreign_from((customer.clone(), address_details)), + )) + } +} + +#[cfg(all(feature = "v2", feature = "customer_v2"))] +#[async_trait::async_trait] +impl CustomerUpdateBridge for customers::CustomerUpdateRequest { + async fn create_domain_model_from_request<'a>( + &'a self, + db: &'a dyn StorageInterface, + key_store: &'a domain::MerchantKeyStore, + merchant_account: &'a domain::MerchantAccount, + key_manager_state: &'a KeyManagerState, + state: &'a SessionState, + domain_customer: &'a domain::Customer, + ) -> errors::CustomResult { + let default_billing_address = self.get_default_customer_billing_address(); + + let encrypted_customer_billing_address = default_billing_address + .async_map(|billing_address| create_encrypted_data(state, key_store, billing_address)) + .await + .transpose() + .change_context(errors::CustomersErrorResponse::InternalServerError) + .attach_printable("Unable to encrypt default customer billing address")?; + + let default_shipping_address = self.get_default_customer_shipping_address(); + + let encrypted_customer_shipping_address = default_shipping_address + .async_map(|shipping_address| create_encrypted_data(state, key_store, shipping_address)) + .await + .transpose() + .change_context(errors::CustomersErrorResponse::InternalServerError) + .attach_printable("Unable to encrypt default customer shipping address")?; + + let key = key_store.key.get_inner().peek(); + + let encrypted_data = types::crypto_operation( + key_manager_state, + type_name!(domain::Customer), + types::CryptoOperation::BatchEncrypt(CustomerRequestWithEmail::to_encryptable( + CustomerRequestWithEmail { + name: self.name.clone(), + email: self.email.clone(), + phone: self.phone.clone(), + }, + )), + Identifier::Merchant(key_store.merchant_id.clone()), + key, + ) + .await + .and_then(|val| val.try_into_batchoperation()) + .switch()?; + + let encryptable_customer = CustomerRequestWithEmail::from_encryptable(encrypted_data) + .change_context(errors::CustomersErrorResponse::InternalServerError)?; + + let response = db + .update_customer_by_global_id( + key_manager_state, + domain_customer.id.to_owned(), + domain_customer.to_owned(), + merchant_account.get_id(), + storage::CustomerUpdate::Update { + name: encryptable_customer.name, + email: encryptable_customer.email, + phone: Box::new(encryptable_customer.phone), + phone_country_code: self.phone_country_code.clone(), + metadata: self.metadata.clone(), + description: self.description.clone(), + connector_customer: None, + default_billing_address: encrypted_customer_billing_address.map(Into::into), + default_shipping_address: encrypted_customer_shipping_address.map(Into::into), + default_payment_method_id: Some(self.default_payment_method_id.clone()), + }, + &key_store, + merchant_account.storage_scheme, + ) + .await + .switch()?; + Ok(response) + } + + fn generate_response<'a>( + &'a self, + customer: &'a domain::Customer, + ) -> errors::CustomerResponse { + Ok(services::ApplicationResponse::Json( + customers::CustomerResponse::foreign_from(customer.clone()), + )) + } } pub async fn migrate_customers( diff --git a/crates/router/src/core/payment_methods.rs b/crates/router/src/core/payment_methods.rs index 977727d911..14fad68c17 100644 --- a/crates/router/src/core/payment_methods.rs +++ b/crates/router/src/core/payment_methods.rs @@ -6,21 +6,24 @@ pub mod utils; mod validator; pub mod vault; -use std::{borrow::Cow, collections::HashSet}; +use std::borrow::Cow; +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] +use std::collections::HashSet; pub use api_models::enums::Connector; use api_models::payment_methods; #[cfg(feature = "payouts")] pub use api_models::{enums::PayoutConnectors, payouts as payout_types}; -use common_utils::{consts::DEFAULT_LOCALE, ext_traits::Encode, id_type::CustomerId}; +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] +use common_utils::ext_traits::Encode; +use common_utils::{consts::DEFAULT_LOCALE, id_type::CustomerId}; use diesel_models::{ enums, GenericLinkNew, PaymentMethodCollectLink, PaymentMethodCollectLinkData, }; use error_stack::{report, ResultExt}; -use hyperswitch_domain_models::{ - api::{GenericLinks, GenericLinksData}, - payments::{payment_attempt::PaymentAttempt, PaymentIntent}, -}; +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] +use hyperswitch_domain_models::api::{GenericLinks, GenericLinksData}; +use hyperswitch_domain_models::payments::{payment_attempt::PaymentAttempt, PaymentIntent}; use masking::PeekInterface; use router_env::{instrument, tracing}; use time::Duration; @@ -204,6 +207,17 @@ pub async fn create_pm_collect_db_entry( }) } +#[cfg(all(feature = "v2", feature = "customer_v2"))] +pub async fn render_pm_collect_link( + _state: SessionState, + _merchant_account: domain::MerchantAccount, + _key_store: domain::MerchantKeyStore, + _req: payment_methods::PaymentMethodCollectLinkRenderRequest, +) -> RouterResponse { + todo!() +} + +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] pub async fn render_pm_collect_link( state: SessionState, merchant_account: domain::MerchantAccount, @@ -263,6 +277,7 @@ pub async fn render_pm_collect_link( field_name: "customer_id", })?; // Fetch customer + let customer = db .find_customer_by_customer_id_merchant_id( &(&state).into(), diff --git a/crates/router/src/core/payment_methods/cards.rs b/crates/router/src/core/payment_methods/cards.rs index 6e9c97426e..dc1bbb9a73 100644 --- a/crates/router/src/core/payment_methods/cards.rs +++ b/crates/router/src/core/payment_methods/cards.rs @@ -32,13 +32,14 @@ use common_utils::{ }, }; use diesel_models::payment_method; -use domain::CustomerUpdate; use error_stack::{report, ResultExt}; use euclid::{ dssa::graph::{AnalysisContext, CgraphExt}, frontend::dir, }; use hyperswitch_constraint_graph as cgraph; +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] +use hyperswitch_domain_models::customer::CustomerUpdate; use kgraph_utils::transformers::IntoDirValue; use masking::Secret; use router_env::{instrument, metrics::add_attributes, tracing}; @@ -50,11 +51,14 @@ use super::surcharge_decision_configs::{ }; #[cfg(all( any(feature = "v2", feature = "v1"), - not(feature = "payment_methods_v2") + not(feature = "payment_methods_v2"), + not(feature = "customer_v2") ))] use crate::routes::app::SessionStateInfo; #[cfg(feature = "payouts")] use crate::types::domain::types::AsyncLift; +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] +use crate::utils::{self}; use crate::{ configs::settings, core::{ @@ -80,9 +84,34 @@ use crate::{ storage::{self, enums, PaymentMethodListContext, PaymentTokenData}, transformers::{ForeignFrom, ForeignTryFrom}, }, - utils::{self, ConnectorResponseExt, OptionExt}, + utils::{ConnectorResponseExt, OptionExt}, }; +#[cfg(all(feature = "v2", feature = "customer_v2"))] +#[instrument(skip_all)] +#[allow(clippy::too_many_arguments)] +pub async fn create_payment_method( + state: &routes::SessionState, + req: &api::PaymentMethodCreate, + customer_id: &id_type::CustomerId, + payment_method_id: &str, + locker_id: Option, + merchant_id: &id_type::MerchantId, + pm_metadata: Option, + customer_acceptance: Option, + payment_method_data: Option, + key_store: &domain::MerchantKeyStore, + connector_mandate_details: Option, + status: Option, + network_transaction_id: Option, + storage_scheme: MerchantStorageScheme, + payment_method_billing_address: Option, + card_scheme: Option, +) -> errors::CustomResult { + todo!() +} + +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] #[instrument(skip_all)] #[allow(clippy::too_many_arguments)] pub async fn create_payment_method( @@ -451,6 +480,19 @@ impl } } +#[cfg(all(feature = "v2", feature = "customer_v2"))] +pub async fn skip_locker_call_and_migrate_payment_method( + _state: routes::SessionState, + _req: &api::PaymentMethodMigrate, + _merchant_id: id_type::MerchantId, + _key_store: &domain::MerchantKeyStore, + _merchant_account: &domain::MerchantAccount, + _card: api_models::payment_methods::CardDetailFromLocker, +) -> errors::RouterResponse { + todo!() +} + +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] pub async fn skip_locker_call_and_migrate_payment_method( state: routes::SessionState, req: &api::PaymentMethodMigrate, @@ -693,6 +735,19 @@ pub fn authenticate_pm_client_secret_and_check_expiry( } } +#[cfg(all(feature = "v2", feature = "customer_v2"))] +#[instrument(skip_all)] +pub async fn add_payment_method_data( + _state: routes::SessionState, + _req: api::PaymentMethodCreate, + _merchant_account: domain::MerchantAccount, + _key_store: domain::MerchantKeyStore, + _pm_id: String, +) -> errors::RouterResponse { + todo!() +} + +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] #[instrument(skip_all)] pub async fn add_payment_method_data( state: routes::SessionState, @@ -723,6 +778,7 @@ pub async fn add_payment_method_data( } let customer_id = payment_method.customer_id.clone(); + let customer = db .find_customer_by_customer_id_merchant_id( &(&state).into(), @@ -2225,7 +2281,17 @@ fn get_val(str: String, val: &serde_json::Value) -> Option { .and_then(|v| v.as_str()) .map(|s| s.to_string()) } +#[cfg(all(feature = "v2", feature = "customer_v2"))] +pub async fn list_payment_methods( + _state: routes::SessionState, + _merchant_account: domain::MerchantAccount, + _key_store: domain::MerchantKeyStore, + mut _req: api::PaymentMethodListRequest, +) -> errors::RouterResponse { + todo!() +} +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] pub async fn list_payment_methods( state: routes::SessionState, merchant_account: domain::MerchantAccount, @@ -3645,7 +3711,11 @@ fn filter_recurring_based( recurring_enabled.map_or(true, |enabled| payment_method.recurring_enabled == enabled) } -#[cfg(all(feature = "v2", feature = "payment_methods_v2"))] +#[cfg(all( + feature = "v2", + feature = "payment_methods_v2", + feature = "customer_v2" +))] pub async fn list_customer_payment_method_util( state: routes::SessionState, merchant_account: domain::MerchantAccount, @@ -3700,7 +3770,8 @@ pub async fn list_customer_payment_method_util( #[cfg(all( any(feature = "v2", feature = "v1"), - not(feature = "payment_methods_v2") + not(feature = "payment_methods_v2"), + not(feature = "customer_v2") ))] pub async fn do_list_customer_pm_fetch_customer_if_not_passed( state: routes::SessionState, @@ -3775,7 +3846,8 @@ pub async fn do_list_customer_pm_fetch_customer_if_not_passed( #[cfg(all( any(feature = "v2", feature = "v1"), - not(feature = "payment_methods_v2") + not(feature = "payment_methods_v2"), + not(feature = "customer_v2") ))] pub async fn list_customer_payment_method( state: &routes::SessionState, @@ -4275,7 +4347,11 @@ impl SavedPMLPaymentsInfo { } } -#[cfg(all(feature = "v2", feature = "payment_methods_v2"))] +#[cfg(all( + feature = "v2", + feature = "payment_methods_v2", + feature = "customer_v2" +))] pub async fn list_customer_payment_method( state: &routes::SessionState, merchant_account: domain::MerchantAccount, @@ -4290,7 +4366,7 @@ pub async fn list_customer_payment_method( // let key = key_store.key.get_inner().peek(); let customer = db - .find_customer_by_customer_id_merchant_id( + .find_customer_by_merchant_reference_id_merchant_id( key_manager_state, customer_id, merchant_account.get_id(), @@ -4802,6 +4878,20 @@ async fn get_bank_account_connector_details( None => Ok(None), } } + +#[cfg(all(feature = "v2", feature = "customer_v2"))] +pub async fn set_default_payment_method( + _state: &routes::SessionState, + _merchant_id: &id_type::MerchantId, + _key_store: domain::MerchantKeyStore, + _customer_id: &id_type::CustomerId, + _payment_method_id: String, + _storage_scheme: MerchantStorageScheme, +) -> errors::RouterResponse { + todo!() +} + +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] pub async fn set_default_payment_method( state: &routes::SessionState, merchant_id: &id_type::MerchantId, @@ -4851,13 +4941,13 @@ pub async fn set_default_payment_method( }, )?; + let customer_id = customer.get_customer_id().clone(); + let customer_update = CustomerUpdate::UpdateDefaultPaymentMethod { default_payment_method_id: Some(Some(payment_method_id.to_owned())), }; - - let customer_id = customer.get_customer_id().clone(); - // update the db with the default payment method id + let updated_customer_details = db .update_customer_by_customer_id_merchant_id( key_manager_state, @@ -5092,6 +5182,18 @@ pub async fn retrieve_payment_method( )) } +#[instrument(skip_all)] +#[cfg(all(feature = "v2", feature = "customer_v2"))] +pub async fn delete_payment_method( + _state: routes::SessionState, + _merchant_account: domain::MerchantAccount, + _pm_id: api::PaymentMethodId, + _key_store: domain::MerchantKeyStore, +) -> errors::RouterResponse { + todo!() +} + +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] #[instrument(skip_all)] pub async fn delete_payment_method( state: routes::SessionState, @@ -5149,7 +5251,6 @@ pub async fn delete_payment_method( let customer_update = CustomerUpdate::UpdateDefaultPaymentMethod { default_payment_method_id: Some(None), }; - db.update_customer_by_customer_id_merchant_id( key_manager_state, key.customer_id, diff --git a/crates/router/src/core/payment_methods/validator.rs b/crates/router/src/core/payment_methods/validator.rs index d9ec794151..f32e337a8c 100644 --- a/crates/router/src/core/payment_methods/validator.rs +++ b/crates/router/src/core/payment_methods/validator.rs @@ -15,6 +15,17 @@ use crate::{ utils, }; +#[cfg(all(feature = "v2", feature = "customer_v2"))] +pub async fn validate_request_and_initiate_payment_method_collect_link( + _state: &SessionState, + _merchant_account: &domain::MerchantAccount, + _key_store: &domain::MerchantKeyStore, + _req: &PaymentMethodCollectLinkRequest, +) -> RouterResult { + todo!() +} + +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] pub async fn validate_request_and_initiate_payment_method_collect_link( state: &SessionState, merchant_account: &domain::MerchantAccount, @@ -25,6 +36,7 @@ pub async fn validate_request_and_initiate_payment_method_collect_link( let db: &dyn StorageInterface = &*state.store; let customer_id = req.customer_id.clone(); let merchant_id = merchant_account.get_id().clone(); + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] match db .find_customer_by_customer_id_merchant_id( &state.into(), diff --git a/crates/router/src/core/payments.rs b/crates/router/src/core/payments.rs index 60a5581d39..30c0d8a01e 100644 --- a/crates/router/src/core/payments.rs +++ b/crates/router/src/core/payments.rs @@ -73,7 +73,6 @@ use crate::{ configs::settings::{ApplePayPreDecryptFlow, PaymentMethodTypeTokenFilter}, connector::utils::missing_field_err, core::{ - authentication as authentication_core, errors::{self, CustomResult, RouterResponse, RouterResult}, payment_methods::cards, utils, @@ -84,11 +83,10 @@ use crate::{ services::{self, api::Authenticate, ConnectorRedirectResponse}, types::{ self as router_types, - api::{self, authentication, ConnectorCallType, ConnectorCommon}, + api::{self, ConnectorCallType, ConnectorCommon}, domain, storage::{self, enums as storage_enums, payment_attempt::PaymentAttemptExt}, transformers::{ForeignInto, ForeignTryInto}, - BrowserInformation, }, utils::{ add_apple_pay_flow_metrics, add_connector_http_status_code_metrics, Encode, OptionExt, @@ -96,6 +94,11 @@ use crate::{ }, workflows::payment_sync, }; +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] +use crate::{ + core::authentication as authentication_core, + types::{api::authentication, BrowserInformation}, +}; #[allow(clippy::too_many_arguments, clippy::type_complexity)] #[instrument(skip_all, fields(payment_id, merchant_id))] @@ -4134,6 +4137,17 @@ where } } +#[cfg(all(feature = "v2", feature = "customer_v2"))] +pub async fn payment_external_authentication( + _state: SessionState, + _merchant_account: domain::MerchantAccount, + _key_store: domain::MerchantKeyStore, + _req: api_models::payments::PaymentsExternalAuthenticationRequest, +) -> RouterResponse { + todo!() +} + +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] #[instrument(skip_all)] pub async fn payment_external_authentication( state: SessionState, @@ -4178,6 +4192,7 @@ pub async fn payment_external_authentication( &[storage_enums::IntentStatus::RequiresCustomerAction], "authenticate", )?; + let optional_customer = match &payment_intent.customer_id { Some(customer_id) => Some( state @@ -4197,6 +4212,7 @@ pub async fn payment_external_authentication( ), None => None, }; + let profile_id = payment_intent .profile_id .as_ref() diff --git a/crates/router/src/core/payments/helpers.rs b/crates/router/src/core/payments/helpers.rs index 66f1e31ce3..2491cd2bc9 100644 --- a/crates/router/src/core/payments/helpers.rs +++ b/crates/router/src/core/payments/helpers.rs @@ -1446,6 +1446,7 @@ pub async fn get_customer_from_details( None => Ok(None), Some(customer_id) => { let db = &*state.store; + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] let customer = db .find_customer_optional_by_customer_id_merchant_id( &state.into(), @@ -1455,6 +1456,18 @@ pub async fn get_customer_from_details( storage_scheme, ) .await?; + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + let customer = db + .find_optional_by_merchant_id_merchant_reference_id( + &state.into(), + &customer_id, + merchant_id, + merchant_key_store, + storage_scheme, + ) + .await?; + payment_data.email = payment_data.email.clone().or_else(|| { customer.as_ref().and_then(|inner| { inner diff --git a/crates/router/src/core/payments/operations/payment_confirm.rs b/crates/router/src/core/payments/operations/payment_confirm.rs index f56009c8e2..2e7131edce 100644 --- a/crates/router/src/core/payments/operations/payment_confirm.rs +++ b/crates/router/src/core/payments/operations/payment_confirm.rs @@ -3,17 +3,19 @@ use std::marker::PhantomData; use api_models::{ admin::ExtendedCardInfoConfig, enums::FrmSuggestion, - payment_methods::PaymentMethodsData, - payments::{AdditionalPaymentData, ExtendedCardInfo, GetAddressFromPaymentMethodData}, + // payment_methods::PaymentMethodsData, + payments::{ExtendedCardInfo, GetAddressFromPaymentMethodData}, }; +// use api_models::{admin::ExtendedCardInfoConfig, enums::FrmSuggestion, payments::ExtendedCardInfo}; +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] +use api_models::{payment_methods::PaymentMethodsData, payments::AdditionalPaymentData}; use async_trait::async_trait; -use common_utils::{ - ext_traits::{AsyncExt, Encode, StringExt, ValueExt}, - type_name, - types::keymanager::Identifier, -}; +use common_utils::ext_traits::{AsyncExt, Encode, StringExt, ValueExt}; +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] +use common_utils::{type_name, types::keymanager::Identifier}; use error_stack::{report, ResultExt}; use futures::FutureExt; +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] use hyperswitch_domain_models::payments::payment_intent::PaymentIntentUpdateFields; use masking::{ExposeInterface, PeekInterface}; use router_derive::PaymentOperation; @@ -21,29 +23,30 @@ use router_env::{instrument, logger, tracing}; use tracing_futures::Instrument; use super::{BoxedOperation, Domain, GetTracker, Operation, UpdateTracker, ValidateRequest}; +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] +use crate::{ + core::payment_methods::cards::create_encrypted_data, + events::audit_events::{AuditEvent, AuditEventType}, + types::domain::types::{crypto_operation, CryptoOperation}, +}; use crate::{ core::{ authentication, blocklist::utils as blocklist_utils, errors::{self, CustomResult, RouterResult, StorageErrorExt}, mandate::helpers as m_helpers, - payment_methods::cards::create_encrypted_data, payments::{ self, helpers, operations, populate_surcharge_details, CustomerDetails, PaymentAddress, PaymentData, }, utils as core_utils, }, - events::audit_events::{AuditEvent, AuditEventType}, routes::{app::ReqState, SessionState}, services, types::{ self, api::{self, ConnectorCallType, PaymentIdTypeExt}, - domain::{ - self, - types::{crypto_operation, CryptoOperation}, - }, + domain::{self}, storage::{self, enums as storage_enums}, }, utils::{self, OptionExt}, @@ -1018,6 +1021,30 @@ impl Domain for PaymentConfirm { } } +#[cfg(all(feature = "v2", feature = "customer_v2"))] +#[async_trait] +impl UpdateTracker, api::PaymentsRequest> for PaymentConfirm { + #[instrument(skip_all)] + async fn update_trackers<'b>( + &'b self, + _state: &'b SessionState, + _req_state: ReqState, + mut _payment_data: PaymentData, + _customer: Option, + _storage_scheme: storage_enums::MerchantStorageScheme, + _updated_customer: Option, + _key_store: &domain::MerchantKeyStore, + _frm_suggestion: Option, + _header_payload: api::HeaderPayload, + ) -> RouterResult<(BoxedOperation<'b, F, api::PaymentsRequest>, PaymentData)> + where + F: 'b + Send, + { + todo!() + } +} + +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] #[async_trait] impl UpdateTracker, api::PaymentsRequest> for PaymentConfirm { #[instrument(skip_all)] @@ -1379,7 +1406,6 @@ impl UpdateTracker, api::PaymentsRequest> for Paymen let customer_fut = if let Some((updated_customer, customer)) = updated_customer.zip(customer) { - let m_customer_customer_id = customer.get_customer_id().to_owned(); let m_customer_merchant_id = customer.merchant_id.to_owned(); let m_key_store = key_store.clone(); let m_updated_customer = updated_customer.clone(); @@ -1388,6 +1414,7 @@ impl UpdateTracker, api::PaymentsRequest> for Paymen let key_manager_state = state.into(); tokio::spawn( async move { + let m_customer_customer_id = customer.get_customer_id().to_owned(); m_db.update_customer_by_customer_id_merchant_id( &key_manager_state, m_customer_customer_id, diff --git a/crates/router/src/core/payout_link.rs b/crates/router/src/core/payout_link.rs index 1b9445f180..0e48060511 100644 --- a/crates/router/src/core/payout_link.rs +++ b/crates/router/src/core/payout_link.rs @@ -24,6 +24,19 @@ use crate::{ types::domain, }; +#[cfg(all(feature = "v2", feature = "customer_v2"))] +pub async fn initiate_payout_link( + _state: SessionState, + _merchant_account: domain::MerchantAccount, + _key_store: domain::MerchantKeyStore, + _req: payouts::PayoutLinkInitiateRequest, + _request_headers: &header::HeaderMap, + _locale: String, +) -> RouterResponse { + todo!() +} + +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] pub async fn initiate_payout_link( state: SessionState, merchant_account: domain::MerchantAccount, @@ -140,6 +153,7 @@ pub async fn initiate_payout_link( .attach_printable_lazy(|| { format!("customer [{}] not found", payout_link.primary_reference) })?; + let enabled_payout_methods = filter_payout_methods(&state, &merchant_account, &key_store, &payout).await?; // Fetch default enabled_payout_methods diff --git a/crates/router/src/core/payouts.rs b/crates/router/src/core/payouts.rs index 07b50d3a15..828c1518ef 100644 --- a/crates/router/src/core/payouts.rs +++ b/crates/router/src/core/payouts.rs @@ -743,7 +743,22 @@ pub async fn payouts_fulfill_core( response_handler(&merchant_account, &payout_data).await } -#[cfg(feature = "olap")] +#[cfg(all(feature = "olap", feature = "v2", feature = "customer_v2"))] +pub async fn payouts_list_core( + _state: SessionState, + _merchant_account: domain::MerchantAccount, + _profile_id_list: Option>, + _key_store: domain::MerchantKeyStore, + _constraints: payouts::PayoutListConstraints, +) -> RouterResponse { + todo!() +} + +#[cfg(all( + feature = "olap", + any(feature = "v1", feature = "v2"), + not(feature = "customer_v2") +))] pub async fn payouts_list_core( state: SessionState, merchant_account: domain::MerchantAccount, @@ -775,6 +790,7 @@ pub async fn payouts_list_core( { Ok(ref payout_attempt) => match payout.customer_id.clone() { Some(ref customer_id) => { + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] match db .find_customer_by_customer_id_merchant_id( &(&state).into(), @@ -1135,8 +1151,6 @@ pub async fn create_recipient( Ok(recipient_create_data) => { let db = &*state.store; if let Some(customer) = customer_details { - let customer_id = customer.get_customer_id().to_owned(); - let merchant_id = merchant_account.get_id().to_owned(); if let Some(updated_customer) = customers::update_connector_customer_in_customers( &connector_label, @@ -1145,20 +1159,46 @@ pub async fn create_recipient( ) .await { - payout_data.customer_details = Some( - db.update_customer_by_customer_id_merchant_id( - &state.into(), - customer_id, - merchant_id, - customer, - updated_customer, - key_store, - merchant_account.storage_scheme, - ) - .await - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Error updating customers in db")?, - ) + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "customer_v2") + ))] + { + let customer_id = customer.get_customer_id().to_owned(); + payout_data.customer_details = Some( + db.update_customer_by_customer_id_merchant_id( + &state.into(), + customer_id, + merchant_account.get_id().to_owned(), + customer, + updated_customer, + key_store, + merchant_account.storage_scheme, + ) + .await + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Error updating customers in db")?, + ); + } + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + { + let global_id = "temp_id".to_string(); + payout_data.customer_details = Some( + db.update_customer_by_global_id( + &state.into(), + global_id, + customer, + &merchant_account.get_id(), + updated_customer, + key_store, + merchant_account.storage_scheme, + ) + .await + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Error updating customers in db")?, + ); + } } } @@ -2356,6 +2396,18 @@ pub async fn payout_create_db_entries( }) } +#[cfg(all(feature = "v2", feature = "customer_v2"))] +pub async fn make_payout_data( + _state: &SessionState, + _merchant_account: &domain::MerchantAccount, + _auth_profile_id: Option, + _key_store: &domain::MerchantKeyStore, + _req: &payouts::PayoutRequest, +) -> RouterResult { + todo!() +} + +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] pub async fn make_payout_data( state: &SessionState, merchant_account: &domain::MerchantAccount, diff --git a/crates/router/src/core/payouts/validator.rs b/crates/router/src/core/payouts/validator.rs index d9ab7d12bd..d425725645 100644 --- a/crates/router/src/core/payouts/validator.rs +++ b/crates/router/src/core/payouts/validator.rs @@ -129,7 +129,7 @@ pub async fn validate_create_request( state, req.payout_method_data.as_ref(), Some(payout_token), - &customer.customer_id, + &customer.get_customer_id(), merchant_account.get_id(), req.payout_type, merchant_key_store, diff --git a/crates/router/src/db/customers.rs b/crates/router/src/db/customers.rs index 9de76567de..885e7ffc2d 100644 --- a/crates/router/src/db/customers.rs +++ b/crates/router/src/db/customers.rs @@ -1,7 +1,9 @@ use common_utils::{ext_traits::AsyncExt, id_type, types::keymanager::KeyManagerState}; use error_stack::ResultExt; +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] use futures::future::try_join_all; use hyperswitch_domain_models::customer; +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] use router_env::{instrument, tracing}; use super::MockDb; @@ -22,12 +24,14 @@ where customer::Customer: Conversion, { + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] async fn delete_customer_by_customer_id_merchant_id( &self, customer_id: &id_type::CustomerId, merchant_id: &id_type::MerchantId, ) -> CustomResult; + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] async fn find_customer_optional_by_customer_id_merchant_id( &self, state: &KeyManagerState, @@ -37,6 +41,17 @@ where storage_scheme: MerchantStorageScheme, ) -> CustomResult, errors::StorageError>; + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_optional_by_merchant_id_merchant_reference_id( + &self, + state: &KeyManagerState, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &domain::MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult, errors::StorageError>; + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] #[allow(clippy::too_many_arguments)] async fn update_customer_by_customer_id_merchant_id( &self, @@ -49,6 +64,7 @@ where storage_scheme: MerchantStorageScheme, ) -> CustomResult; + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] async fn find_customer_by_customer_id_merchant_id( &self, state: &KeyManagerState, @@ -58,6 +74,17 @@ where storage_scheme: MerchantStorageScheme, ) -> CustomResult; + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_customer_by_merchant_reference_id_merchant_id( + &self, + state: &KeyManagerState, + merchant_reference_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &domain::MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] async fn list_customers_by_merchant_id( &self, state: &KeyManagerState, @@ -72,6 +99,29 @@ where key_store: &domain::MerchantKeyStore, storage_scheme: MerchantStorageScheme, ) -> CustomResult; + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + #[allow(clippy::too_many_arguments)] + async fn update_customer_by_global_id( + &self, + state: &KeyManagerState, + id: String, + customer: customer::Customer, + merchant_id: &id_type::MerchantId, + customer_update: storage_types::CustomerUpdate, + key_store: &domain::MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_customer_by_global_id( + &self, + state: &KeyManagerState, + id: &String, + merchant_id: &id_type::MerchantId, + key_store: &domain::MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; } #[cfg(feature = "kv_store")] @@ -79,6 +129,7 @@ mod storage { use common_utils::{ext_traits::AsyncExt, id_type, types::keymanager::KeyManagerState}; use diesel_models::kv; use error_stack::{report, ResultExt}; + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] use futures::future::try_join_all; use hyperswitch_domain_models::customer; use masking::PeekInterface; @@ -105,11 +156,11 @@ mod storage { utils::db_utils, }; - #[cfg(all(feature = "v2", feature = "customer_v2"))] #[async_trait::async_trait] impl CustomerInterface for Store { #[instrument(skip_all)] // check customer not found in kv and fallback to db + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] async fn find_customer_optional_by_customer_id_merchant_id( &self, state: &KeyManagerState, @@ -178,6 +229,76 @@ mod storage { }) } + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_optional_by_merchant_id_merchant_reference_id( + &self, + state: &KeyManagerState, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &domain::MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult, errors::StorageError> { + let conn = connection::pg_connection_read(self).await?; + let database_call = || async { + storage_types::Customer::find_optional_by_merchant_id_merchant_reference_id( + &conn, + customer_id, + merchant_id, + ) + .await + .map_err(|err| report!(errors::StorageError::from(err))) + }; + let storage_scheme = + decide_storage_scheme::<_, diesel_models::Customer>(self, storage_scheme, Op::Find) + .await; + let maybe_customer = match storage_scheme { + MerchantStorageScheme::PostgresOnly => database_call().await, + MerchantStorageScheme::RedisKv => { + let key = PartitionKey::MerchantIdCustomerId { + merchant_id, + customer_id: customer_id.get_string_repr(), + }; + let field = format!("cust_{}", customer_id.get_string_repr()); + Box::pin(db_utils::try_redis_get_else_try_database_get( + // check for ValueNotFound + async { + kv_wrapper( + self, + KvOperation::::HGet(&field), + key, + ) + .await? + .try_into_hget() + .map(Some) + }, + database_call, + )) + .await + } + }?; + + let maybe_result = maybe_customer + .async_map(|c| async { + c.convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(errors::StorageError::DecryptionError) + }) + .await + .transpose()?; + + maybe_result.map_or(Ok(None), |customer: domain::Customer| match customer.name { + Some(ref name) if name.peek() == REDACTED => { + Err(errors::StorageError::CustomerRedacted)? + } + _ => Ok(Some(customer)), + }) + } + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] #[instrument(skip_all)] async fn update_customer_by_customer_id_merchant_id( &self, @@ -260,6 +381,72 @@ mod storage { .change_context(errors::StorageError::DecryptionError) } + #[cfg(all(feature = "v2", feature = "customer_v2"))] + #[instrument(skip_all)] + async fn find_customer_by_merchant_reference_id_merchant_id( + &self, + state: &KeyManagerState, + merchant_reference_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &domain::MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + let database_call = || async { + storage_types::Customer::find_by_merchant_reference_id_merchant_id( + &conn, + merchant_reference_id, + merchant_id, + ) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + }; + let storage_scheme = + decide_storage_scheme::<_, diesel_models::Customer>(self, storage_scheme, Op::Find) + .await; + let customer = match storage_scheme { + MerchantStorageScheme::PostgresOnly => database_call().await, + MerchantStorageScheme::RedisKv => { + let key = PartitionKey::MerchantIdMerchantReferenceId { + merchant_id, + merchant_reference_id: merchant_reference_id.get_string_repr(), + }; + let field = format!("cust_{}", merchant_reference_id.get_string_repr()); + Box::pin(db_utils::try_redis_get_else_try_database_get( + async { + kv_wrapper( + self, + KvOperation::::HGet(&field), + key, + ) + .await? + .try_into_hget() + }, + database_call, + )) + .await + } + }?; + + let result: customer::Customer = customer + .convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(errors::StorageError::DecryptionError)?; + //.await + + match result.name { + Some(ref name) if name.peek() == REDACTED => { + Err(errors::StorageError::CustomerRedacted)? + } + _ => Ok(result), + } + } + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] #[instrument(skip_all)] async fn find_customer_by_customer_id_merchant_id( &self, @@ -324,6 +511,7 @@ mod storage { } } + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] #[instrument(skip_all)] async fn list_customers_by_merchant_id( &self, @@ -355,6 +543,7 @@ mod storage { Ok(customers) } + #[cfg(all(feature = "v2", feature = "customer_v2"))] #[instrument(skip_all)] async fn insert_customer( &self, @@ -363,345 +552,82 @@ mod storage { key_store: &domain::MerchantKeyStore, storage_scheme: MerchantStorageScheme, ) -> CustomResult { - let customer_id = customer_data.get_customer_id().clone(); - let merchant_id = customer_data.merchant_id.clone(); - let mut new_customer = customer_data - .construct_new() - .await - .change_context(errors::StorageError::EncryptionError)?; - let storage_scheme = decide_storage_scheme::<_, diesel_models::Customer>( - self, - storage_scheme, - Op::Insert, - ) - .await; - new_customer.update_storage_scheme(storage_scheme); - let create_customer = match storage_scheme { - MerchantStorageScheme::PostgresOnly => { - let conn = connection::pg_connection_write(self).await?; - new_customer - .insert(&conn) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - } - MerchantStorageScheme::RedisKv => { - let key = PartitionKey::MerchantIdCustomerId { - merchant_id: &merchant_id, - customer_id: customer_id.get_string_repr(), - }; - let field = format!("cust_{}", customer_id.get_string_repr()); - - let redis_entry = kv::TypedSql { - op: kv::DBOperation::Insert { - insertable: kv::Insertable::Customer(new_customer.clone()), - }, - }; - let storage_customer = new_customer.into(); - - match kv_wrapper::( - self, - KvOperation::HSetNx::( - &field, - &storage_customer, - redis_entry, - ), - key, - ) - .await - .change_context(errors::StorageError::KVError)? - .try_into_hsetnx() - { - Ok(redis_interface::HsetnxReply::KeyNotSet) => { - Err(report!(errors::StorageError::DuplicateValue { - entity: "customer", - key: Some(customer_id.get_string_repr().to_string()), - })) - } - Ok(redis_interface::HsetnxReply::KeySet) => Ok(storage_customer), - Err(er) => Err(er).change_context(errors::StorageError::KVError), - } - } - }?; - - create_customer - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - #[instrument(skip_all)] - async fn delete_customer_by_customer_id_merchant_id( - &self, - _customer_id: &id_type::CustomerId, - _merchant_id: &id_type::MerchantId, - ) -> CustomResult { - todo!() - } - } - - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - #[async_trait::async_trait] - impl CustomerInterface for Store { - #[instrument(skip_all)] - // check customer not found in kv and fallback to db - async fn find_customer_optional_by_customer_id_merchant_id( - &self, - state: &KeyManagerState, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - let database_call = || async { - storage_types::Customer::find_optional_by_customer_id_merchant_id( - &conn, - customer_id, - merchant_id, - ) - .await - .map_err(|err| report!(errors::StorageError::from(err))) - }; - let storage_scheme = - decide_storage_scheme::<_, diesel_models::Customer>(self, storage_scheme, Op::Find) - .await; - let maybe_customer = match storage_scheme { - MerchantStorageScheme::PostgresOnly => database_call().await, - MerchantStorageScheme::RedisKv => { - let key = PartitionKey::MerchantIdCustomerId { - merchant_id, - customer_id: customer_id.get_string_repr(), - }; - let field = format!("cust_{}", customer_id.get_string_repr()); - Box::pin(db_utils::try_redis_get_else_try_database_get( - // check for ValueNotFound - async { - kv_wrapper( - self, - KvOperation::::HGet(&field), - key, - ) - .await? - .try_into_hget() - .map(Some) - }, - database_call, - )) - .await - } - }?; - - let maybe_result = maybe_customer - .async_map(|c| async { - c.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await - .transpose()?; - - maybe_result.map_or(Ok(None), |customer: domain::Customer| match customer.name { - Some(ref name) if name.peek() == REDACTED => { - Err(errors::StorageError::CustomerRedacted)? - } - _ => Ok(Some(customer)), - }) - } - - #[instrument(skip_all)] - async fn update_customer_by_customer_id_merchant_id( - &self, - state: &KeyManagerState, - customer_id: id_type::CustomerId, - merchant_id: id_type::MerchantId, - customer: customer::Customer, - customer_update: storage_types::CustomerUpdate, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - let customer = Conversion::convert(customer) - .await - .change_context(errors::StorageError::EncryptionError)?; - let database_call = || async { - storage_types::Customer::update_by_customer_id_merchant_id( - &conn, - customer_id.clone(), - merchant_id.clone(), - customer_update.clone().into(), - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - }; - let key = PartitionKey::MerchantIdCustomerId { - merchant_id: &merchant_id, - customer_id: customer_id.get_string_repr(), - }; - let field = format!("cust_{}", customer_id.get_string_repr()); - let storage_scheme = decide_storage_scheme::<_, diesel_models::Customer>( - self, - storage_scheme, - Op::Update(key.clone(), &field, customer.updated_by.as_deref()), - ) - .await; - let updated_object = match storage_scheme { - MerchantStorageScheme::PostgresOnly => database_call().await, - MerchantStorageScheme::RedisKv => { - let updated_customer = - diesel_models::CustomerUpdateInternal::from(customer_update.clone()) - .apply_changeset(customer.clone()); - - let redis_value = serde_json::to_string(&updated_customer) - .change_context(errors::StorageError::KVError)?; - - let redis_entry = kv::TypedSql { - op: kv::DBOperation::Update { - updatable: kv::Updateable::CustomerUpdate(kv::CustomerUpdateMems { - orig: customer, - update_data: customer_update.into(), - }), - }, - }; - - kv_wrapper::<(), _, _>( - self, - KvOperation::Hset::( - (&field, redis_value), - redis_entry, - ), - key, - ) - .await - .change_context(errors::StorageError::KVError)? - .try_into_hset() - .change_context(errors::StorageError::KVError)?; - - Ok(updated_customer) - } - }; - - updated_object? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - #[instrument(skip_all)] - async fn find_customer_by_customer_id_merchant_id( - &self, - state: &KeyManagerState, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - let database_call = || async { - storage_types::Customer::find_by_customer_id_merchant_id( - &conn, - customer_id, - merchant_id, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - }; - let storage_scheme = - decide_storage_scheme::<_, diesel_models::Customer>(self, storage_scheme, Op::Find) - .await; - let customer = match storage_scheme { - MerchantStorageScheme::PostgresOnly => database_call().await, - MerchantStorageScheme::RedisKv => { - let key = PartitionKey::MerchantIdCustomerId { - merchant_id, - customer_id: customer_id.get_string_repr(), - }; - let field = format!("cust_{}", customer_id.get_string_repr()); - Box::pin(db_utils::try_redis_get_else_try_database_get( - async { - kv_wrapper( - self, - KvOperation::::HGet(&field), - key, - ) - .await? - .try_into_hget() - }, - database_call, - )) - .await - } - }?; - - let result: customer::Customer = customer - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError)?; - //.await - - match result.name { - Some(ref name) if name.peek() == REDACTED => { - Err(errors::StorageError::CustomerRedacted)? - } - _ => Ok(result), - } - } - - #[instrument(skip_all)] - async fn list_customers_by_merchant_id( - &self, - state: &KeyManagerState, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - - let encrypted_customers = - storage_types::Customer::list_by_merchant_id(&conn, merchant_id) - .await - .map_err(|error| report!(errors::StorageError::from(error)))?; - - let customers = try_join_all(encrypted_customers.into_iter().map( - |encrypted_customer| async { - encrypted_customer - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }, - )) - .await?; - - Ok(customers) - } - - #[instrument(skip_all)] - async fn insert_customer( - &self, - customer_data: customer::Customer, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let customer_id = customer_data.get_customer_id().clone(); + let id = customer_data.id.clone(); + let mut new_customer = customer_data + .construct_new() + .await + .change_context(errors::StorageError::EncryptionError)?; + let storage_scheme = decide_storage_scheme::<_, diesel_models::Customer>( + self, + storage_scheme, + Op::Insert, + ) + .await; + new_customer.update_storage_scheme(storage_scheme); + let create_customer = match storage_scheme { + MerchantStorageScheme::PostgresOnly => { + let conn = connection::pg_connection_write(self).await?; + new_customer + .insert(&conn) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + } + MerchantStorageScheme::RedisKv => { + let key = PartitionKey::GlobalId { id: &id }; + let field = format!("cust_{}", id); + + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Insert { + insertable: kv::Insertable::Customer(new_customer.clone()), + }, + }; + let storage_customer = new_customer.into(); + + match kv_wrapper::( + self, + KvOperation::HSetNx::( + &field, + &storage_customer, + redis_entry, + ), + key, + ) + .await + .change_context(errors::StorageError::KVError)? + .try_into_hsetnx() + { + Ok(redis_interface::HsetnxReply::KeyNotSet) => { + Err(report!(errors::StorageError::DuplicateValue { + entity: "customer", + key: Some(id.to_string()), + })) + } + Ok(redis_interface::HsetnxReply::KeySet) => Ok(storage_customer), + Err(er) => Err(er).change_context(errors::StorageError::KVError), + } + } + }?; + + create_customer + .convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(errors::StorageError::DecryptionError) + } + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] + #[instrument(skip_all)] + async fn insert_customer( + &self, + customer_data: customer::Customer, + state: &KeyManagerState, + key_store: &domain::MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let customer_id = customer_data.customer_id.clone(); let merchant_id = customer_data.merchant_id.clone(); let mut new_customer = customer_data .construct_new() @@ -771,6 +697,7 @@ mod storage { .change_context(errors::StorageError::DecryptionError) } + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] #[instrument(skip_all)] async fn delete_customer_by_customer_id_merchant_id( &self, @@ -786,6 +713,143 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) } + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + #[instrument(skip_all)] + async fn find_customer_by_global_id( + &self, + state: &KeyManagerState, + id: &String, + _merchant_id: &id_type::MerchantId, + key_store: &domain::MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + let database_call = || async { + storage_types::Customer::find_by_global_id(&conn, id) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + }; + let storage_scheme = + decide_storage_scheme::<_, diesel_models::Customer>(self, storage_scheme, Op::Find) + .await; + let customer = match storage_scheme { + MerchantStorageScheme::PostgresOnly => database_call().await, + MerchantStorageScheme::RedisKv => { + let key = PartitionKey::GlobalId { id }; + let field = format!("cust_{}", id); + Box::pin(db_utils::try_redis_get_else_try_database_get( + async { + kv_wrapper( + self, + KvOperation::::HGet(&field), + key, + ) + .await? + .try_into_hget() + }, + database_call, + )) + .await + } + }?; + + let result: customer::Customer = customer + .convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(errors::StorageError::DecryptionError)?; + //.await + + match result.name { + Some(ref name) if name.peek() == REDACTED => { + Err(errors::StorageError::CustomerRedacted)? + } + _ => Ok(result), + } + } + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + #[instrument(skip_all)] + async fn update_customer_by_global_id( + &self, + state: &KeyManagerState, + id: String, + customer: customer::Customer, + _merchant_id: &id_type::MerchantId, + customer_update: storage_types::CustomerUpdate, + key_store: &domain::MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + let customer = Conversion::convert(customer) + .await + .change_context(errors::StorageError::EncryptionError)?; + let database_call = || async { + storage_types::Customer::update_by_id( + &conn, + id.clone(), + customer_update.clone().into(), + ) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + }; + let key = PartitionKey::GlobalId { id: &id }; + let field = format!("cust_{}", id); + let storage_scheme = decide_storage_scheme::<_, diesel_models::Customer>( + self, + storage_scheme, + Op::Update(key.clone(), &field, customer.updated_by.as_deref()), + ) + .await; + let updated_object = match storage_scheme { + MerchantStorageScheme::PostgresOnly => database_call().await, + MerchantStorageScheme::RedisKv => { + let updated_customer = + diesel_models::CustomerUpdateInternal::from(customer_update.clone()) + .apply_changeset(customer.clone()); + + let redis_value = serde_json::to_string(&updated_customer) + .change_context(errors::StorageError::KVError)?; + + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Update { + updatable: kv::Updateable::CustomerUpdate(kv::CustomerUpdateMems { + orig: customer, + update_data: customer_update.into(), + }), + }, + }; + + kv_wrapper::<(), _, _>( + self, + KvOperation::Hset::( + (&field, redis_value), + redis_entry, + ), + key, + ) + .await + .change_context(errors::StorageError::KVError)? + .try_into_hset() + .change_context(errors::StorageError::KVError)?; + + Ok(updated_customer) + } + }; + + updated_object? + .convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(errors::StorageError::DecryptionError) + } } } @@ -815,10 +879,10 @@ mod storage { }, }; - #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] #[async_trait::async_trait] impl CustomerInterface for Store { #[instrument(skip_all)] + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] async fn find_customer_optional_by_customer_id_merchant_id( &self, state: &KeyManagerState, @@ -855,6 +919,45 @@ mod storage { }) } + #[instrument(skip_all)] + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_optional_by_merchant_id_merchant_reference_id( + &self, + state: &KeyManagerState, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &domain::MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult, errors::StorageError> { + let conn = connection::pg_connection_read(self).await?; + let maybe_customer: Option = + storage_types::Customer::find_optional_by_customer_id_merchant_id( + &conn, + customer_id, + merchant_id, + ) + .await + .map_err(|error| report!(errors::StorageError::from(error)))? + .async_map(|c| async { + c.convert(state, key_store.key.get_inner(), merchant_id.clone().into()) + .await + .change_context(errors::StorageError::DecryptionError) + }) + .await + .transpose()?; + maybe_customer.map_or(Ok(None), |customer| { + // in the future, once #![feature(is_some_and)] is stable, we can make this more concise: + // `if customer.name.is_some_and(|ref name| name == REDACTED) ...` + match customer.name { + Some(ref name) if name.peek() == REDACTED => { + Err(errors::StorageError::CustomerRedacted)? + } + _ => Ok(Some(customer)), + } + }) + } + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] #[instrument(skip_all)] async fn update_customer_by_customer_id_merchant_id( &self, @@ -883,6 +986,7 @@ mod storage { .await } + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] #[instrument(skip_all)] async fn find_customer_by_customer_id_merchant_id( &self, @@ -915,6 +1019,40 @@ mod storage { } } + #[cfg(all(feature = "v2", feature = "customer_v2"))] + #[instrument(skip_all)] + async fn find_customer_by_merchant_reference_id_merchant_id( + &self, + state: &KeyManagerState, + merchant_reference_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &domain::MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + let customer: customer::Customer = + storage_types::Customer::find_by_merchant_reference_id_merchant_id( + &conn, + merchant_reference_id, + merchant_id, + ) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + .async_and_then(|c| async { + c.convert(state, key_store.key.get_inner(), merchant_id.clone().into()) + .await + .change_context(errors::StorageError::DecryptionError) + }) + .await?; + match customer.name { + Some(ref name) if name.peek() == REDACTED => { + Err(errors::StorageError::CustomerRedacted)? + } + _ => Ok(customer), + } + } + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] #[instrument(skip_all)] async fn list_customers_by_merchant_id( &self, @@ -970,6 +1108,7 @@ mod storage { .await } + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] #[instrument(skip_all)] async fn delete_customer_by_customer_id_merchant_id( &self, @@ -985,100 +1124,52 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) } - } - #[cfg(all(feature = "v2", feature = "customer_v2"))] - #[async_trait::async_trait] - impl CustomerInterface for Store { - #[instrument(skip_all)] - async fn find_customer_optional_by_customer_id_merchant_id( + #[cfg(all(feature = "v2", feature = "customer_v2"))] + #[allow(clippy::too_many_arguments)] + async fn update_customer_by_global_id( &self, state: &KeyManagerState, - customer_id: &id_type::CustomerId, + id: String, + customer: customer::Customer, merchant_id: &id_type::MerchantId, + customer_update: storage_types::CustomerUpdate, key_store: &domain::MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - let maybe_customer: Option = - storage_types::Customer::find_optional_by_customer_id_merchant_id( - &conn, - customer_id, - merchant_id, - ) + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + storage_types::Customer::update_by_global_id(&conn, id, customer_update.into()) .await - .map_err(|error| report!(errors::StorageError::from(error)))? - .async_map(|c| async { - c.convert(state, key_store.key.get_inner(), merchant_id.to_string()) + .map_err(|error| report!(errors::StorageError::from(error))) + .async_and_then(|c| async { + c.convert(state, key_store.key.get_inner(), merchant_id) .await .change_context(errors::StorageError::DecryptionError) }) .await - .transpose()?; - maybe_customer.map_or(Ok(None), |customer| { - // in the future, once #![feature(is_some_and)] is stable, we can make this more concise: - // `if customer.name.is_some_and(|ref name| name == REDACTED) ...` - match customer.name { - Some(ref name) if name.peek() == REDACTED => { - Err(errors::StorageError::CustomerRedacted)? - } - _ => Ok(Some(customer)), - } - }) } + #[cfg(all(feature = "v2", feature = "customer_v2"))] #[instrument(skip_all)] - async fn update_customer_by_customer_id_merchant_id( + async fn find_customer_by_global_id( &self, state: &KeyManagerState, - customer_id: id_type::CustomerId, - merchant_id: id_type::MerchantId, - _customer: customer::Customer, - customer_update: storage_types::CustomerUpdate, - key_store: &domain::MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - storage_types::Customer::update_by_customer_id_merchant_id( - &conn, - customer_id, - merchant_id.clone(), - customer_update.into(), - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - .async_and_then(|c| async { - c.convert(state, key_store.key.get_inner(), merchant_id) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await - } - - #[instrument(skip_all)] - async fn find_customer_by_customer_id_merchant_id( - &self, - state: &KeyManagerState, - customer_id: &id_type::CustomerId, + id: &String, merchant_id: &id_type::MerchantId, key_store: &domain::MerchantKeyStore, _storage_scheme: MerchantStorageScheme, ) -> CustomResult { let conn = connection::pg_connection_read(self).await?; let customer: customer::Customer = - storage_types::Customer::find_by_customer_id_merchant_id( - &conn, - customer_id, - merchant_id, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - .async_and_then(|c| async { - c.convert(state, key_store.key.get_inner(), merchant_id.to_string()) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await?; + storage_types::Customer::find_by_global_id(&conn, customer_id, merchant_id) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + .async_and_then(|c| async { + c.convert(state, key_store.key.get_inner(), merchant_id.clone().into()) + .await + .change_context(errors::StorageError::DecryptionError) + }) + .await?; match customer.name { Some(ref name) if name.peek() == REDACTED => { Err(errors::StorageError::CustomerRedacted)? @@ -1086,83 +1177,13 @@ mod storage { _ => Ok(customer), } } - - #[instrument(skip_all)] - async fn list_customers_by_merchant_id( - &self, - state: &KeyManagerState, - merchant_id: &id_type::MerchantId, - key_store: &domain::MerchantKeyStore, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - - let encrypted_customers = - storage_types::Customer::list_by_merchant_id(&conn, merchant_id) - .await - .map_err(|error| report!(errors::StorageError::from(error)))?; - - let customers = try_join_all(encrypted_customers.into_iter().map( - |encrypted_customer| async { - encrypted_customer - .convert(state, key_store.key.get_inner(), merchant_id.to_string()) - .await - .change_context(errors::StorageError::DecryptionError) - }, - )) - .await?; - - Ok(customers) - } - - #[instrument(skip_all)] - async fn insert_customer( - &self, - customer_data: customer::Customer, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - customer_data - .construct_new() - .await - .change_context(errors::StorageError::EncryptionError)? - .insert(&conn) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - .async_and_then(|c| async { - c.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await - } - - #[instrument(skip_all)] - async fn delete_customer_by_customer_id_merchant_id( - &self, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - storage_types::Customer::delete_by_customer_id_merchant_id( - &conn, - customer_id, - merchant_id, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - } } } #[async_trait::async_trait] impl CustomerInterface for MockDb { #[allow(clippy::panic)] + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] async fn find_customer_optional_by_customer_id_merchant_id( &self, state: &KeyManagerState, @@ -1175,7 +1196,7 @@ impl CustomerInterface for MockDb { let customer = customers .iter() .find(|customer| { - customer.get_customer_id() == *customer_id && customer.merchant_id == *merchant_id + customer.get_customer_id() == *customer_id && &customer.merchant_id == merchant_id }) .cloned(); customer @@ -1192,6 +1213,38 @@ impl CustomerInterface for MockDb { .transpose() } + #[allow(clippy::panic)] + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_optional_by_merchant_id_merchant_reference_id( + &self, + state: &KeyManagerState, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &domain::MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult, errors::StorageError> { + let customers = self.customers.lock().await; + let customer = customers + .iter() + .find(|customer| { + customer.get_customer_id() == *customer_id && &customer.merchant_id == merchant_id + }) + .cloned(); + customer + .async_map(|c| async { + c.convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(errors::StorageError::DecryptionError) + }) + .await + .transpose() + } + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] async fn list_customers_by_merchant_id( &self, state: &KeyManagerState, @@ -1221,6 +1274,7 @@ impl CustomerInterface for MockDb { Ok(customers) } + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] #[instrument(skip_all)] async fn update_customer_by_customer_id_merchant_id( &self, @@ -1236,6 +1290,7 @@ impl CustomerInterface for MockDb { Err(errors::StorageError::MockDbError)? } + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] async fn find_customer_by_customer_id_merchant_id( &self, _state: &KeyManagerState, @@ -1248,6 +1303,19 @@ impl CustomerInterface for MockDb { Err(errors::StorageError::MockDbError)? } + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_customer_by_merchant_reference_id_merchant_id( + &self, + _state: &KeyManagerState, + _merchant_reference_id: &id_type::CustomerId, + _merchant_id: &id_type::MerchantId, + _key_store: &domain::MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + // [#172]: Implement function for `MockDb` + Err(errors::StorageError::MockDbError)? + } + #[allow(clippy::panic)] async fn insert_customer( &self, @@ -1274,6 +1342,7 @@ impl CustomerInterface for MockDb { .change_context(errors::StorageError::DecryptionError) } + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] async fn delete_customer_by_customer_id_merchant_id( &self, _customer_id: &id_type::CustomerId, @@ -1282,4 +1351,33 @@ impl CustomerInterface for MockDb { // [#172]: Implement function for `MockDb` Err(errors::StorageError::MockDbError)? } + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + #[allow(clippy::too_many_arguments)] + async fn update_customer_by_global_id( + &self, + _state: &KeyManagerState, + _id: String, + _customer: customer::Customer, + _merchant_id: &id_type::MerchantId, + _customer_update: storage_types::CustomerUpdate, + _key_store: &domain::MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + // [#172]: Implement function for `MockDb` + Err(errors::StorageError::MockDbError)? + } + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_customer_by_global_id( + &self, + _state: &KeyManagerState, + _id: &String, + _merchant_id: &id_type::MerchantId, + _key_store: &domain::MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + // [#172]: Implement function for `MockDb` + Err(errors::StorageError::MockDbError)? + } } diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index ddae52c243..953f06b837 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -343,6 +343,7 @@ impl ConfigInterface for KafkaStore { #[async_trait::async_trait] impl CustomerInterface for KafkaStore { + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] async fn delete_customer_by_customer_id_merchant_id( &self, customer_id: &id_type::CustomerId, @@ -353,6 +354,7 @@ impl CustomerInterface for KafkaStore { .await } + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] async fn find_customer_optional_by_customer_id_merchant_id( &self, state: &KeyManagerState, @@ -372,6 +374,27 @@ impl CustomerInterface for KafkaStore { .await } + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_optional_by_merchant_id_merchant_reference_id( + &self, + state: &KeyManagerState, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &domain::MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult, errors::StorageError> { + self.diesel_store + .find_optional_by_merchant_id_merchant_reference_id( + state, + customer_id, + merchant_id, + key_store, + storage_scheme, + ) + .await + } + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] async fn update_customer_by_customer_id_merchant_id( &self, state: &KeyManagerState, @@ -395,6 +418,31 @@ impl CustomerInterface for KafkaStore { .await } + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn update_customer_by_global_id( + &self, + state: &KeyManagerState, + id: String, + customer: domain::Customer, + merchant_id: &id_type::MerchantId, + customer_update: storage::CustomerUpdate, + key_store: &domain::MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + self.diesel_store + .update_customer_by_global_id( + state, + id, + customer, + merchant_id, + customer_update, + key_store, + storage_scheme, + ) + .await + } + + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] async fn list_customers_by_merchant_id( &self, state: &KeyManagerState, @@ -406,6 +454,7 @@ impl CustomerInterface for KafkaStore { .await } + #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] async fn find_customer_by_customer_id_merchant_id( &self, state: &KeyManagerState, @@ -425,6 +474,40 @@ impl CustomerInterface for KafkaStore { .await } + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_customer_by_merchant_reference_id_merchant_id( + &self, + state: &KeyManagerState, + merchant_reference_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + key_store: &domain::MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + self.diesel_store + .find_customer_by_merchant_reference_id_merchant_id( + state, + merchant_reference_id, + merchant_id, + key_store, + storage_scheme, + ) + .await + } + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_customer_by_global_id( + &self, + state: &KeyManagerState, + id: &String, + merchant_id: &id_type::MerchantId, + key_store: &domain::MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + self.diesel_store + .find_customer_by_global_id(state, id, merchant_id, key_store, storage_scheme) + .await + } + async fn insert_customer( &self, customer_data: domain::Customer, diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index a705c60300..40f6d46f93 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -867,7 +867,6 @@ pub struct Customers; #[cfg(all( feature = "v2", feature = "customer_v2", - feature = "payment_methods_v2", any(feature = "olap", feature = "oltp") ))] impl Customers { @@ -875,7 +874,9 @@ impl Customers { let mut route = web::scope("/v2/customers").app_data(web::Data::new(state)); #[cfg(all(feature = "oltp", feature = "v2", feature = "customer_v2"))] { - route = route.service(web::resource("").route(web::post().to(customers_create))) + route = route + .service(web::resource("").route(web::post().to(customers_create))) + .service(web::resource("/{id}").route(web::put().to(customers_update))) } #[cfg(all(feature = "oltp", feature = "v2", feature = "payment_methods_v2"))] { diff --git a/crates/router/src/routes/customers.rs b/crates/router/src/routes/customers.rs index bb72112c88..a84214374c 100644 --- a/crates/router/src/routes/customers.rs +++ b/crates/router/src/routes/customers.rs @@ -112,17 +112,61 @@ pub async fn customers_update( state: web::Data, req: HttpRequest, path: web::Path, - mut json_payload: web::Json, + mut json_payload: web::Json, ) -> HttpResponse { let flow = Flow::CustomersUpdate; let customer_id = path.into_inner(); json_payload.customer_id = Some(customer_id); + let customer_update_id = customers::UpdateCustomerId::new("temp_global_id".to_string()); Box::pin(api::server_wrap( flow, state, &req, json_payload.into_inner(), - |state, auth, req, _| update_customer(state, auth.merchant_account, req, auth.key_store), + |state, auth, req, _| { + update_customer( + state, + auth.merchant_account, + req, + auth.key_store, + customer_update_id.clone(), + ) + }, + auth::auth_type( + &auth::ApiKeyAuth, + &auth::JWTAuth(Permission::CustomerWrite), + req.headers(), + ), + api_locking::LockAction::NotApplicable, + )) + .await +} + +#[cfg(all(feature = "v2", feature = "customer_v2"))] +#[instrument(skip_all, fields(flow = ?Flow::CustomersUpdate))] +pub async fn customers_update( + state: web::Data, + req: HttpRequest, + path: web::Path, + json_payload: web::Json, +) -> HttpResponse { + let flow = Flow::CustomersUpdate; + let id = path.into_inner().clone(); + let customer_update_id = customers::UpdateCustomerId::new(id); + Box::pin(api::server_wrap( + flow, + state, + &req, + json_payload.into_inner(), + |state, auth, req, _| { + update_customer( + state, + auth.merchant_account, + req, + auth.key_store, + customer_update_id.clone(), + ) + }, auth::auth_type( &auth::HeaderAuth(auth::ApiKeyAuth), &auth::JWTAuth(Permission::CustomerWrite), diff --git a/crates/router/src/routes/payment_methods.rs b/crates/router/src/routes/payment_methods.rs index 7b2c406c77..36f7095720 100644 --- a/crates/router/src/routes/payment_methods.rs +++ b/crates/router/src/routes/payment_methods.rs @@ -232,7 +232,8 @@ pub async fn list_payment_method_api( #[cfg(all( any(feature = "v2", feature = "v1"), - not(feature = "payment_methods_v2") + not(feature = "payment_methods_v2"), + not(feature = "customer_v2") ))] /// List payment methods for a Customer /// @@ -355,7 +356,11 @@ pub async fn list_customer_payment_method_for_payment( .await } -#[cfg(all(feature = "v2", feature = "payment_methods_v2"))] +#[cfg(all( + feature = "v2", + feature = "payment_methods_v2", + feature = "customer_v2" +))] /// List payment methods for a Customer v2 /// /// To filter and list the applicable payment methods for a particular Customer ID, to be used in a non-payments context @@ -418,7 +423,8 @@ pub async fn list_customer_payment_method_api( #[cfg(all( any(feature = "v2", feature = "v1"), - not(feature = "payment_methods_v2") + not(feature = "payment_methods_v2"), + not(feature = "customer_v2") ))] /// List payment methods for a Customer /// diff --git a/crates/router/src/types/api/customers.rs b/crates/router/src/types/api/customers.rs index e3f86f670f..2076ad60c0 100644 --- a/crates/router/src/types/api/customers.rs +++ b/crates/router/src/types/api/customers.rs @@ -1,5 +1,7 @@ use api_models::customers; -pub use api_models::customers::{CustomerDeleteResponse, CustomerId, CustomerRequest}; +pub use api_models::customers::{ + CustomerDeleteResponse, CustomerId, CustomerRequest, CustomerUpdateRequest, UpdateCustomerId, +}; #[cfg(all(feature = "v2", feature = "customer_v2"))] use hyperswitch_domain_models::customer; use serde::Serialize; diff --git a/crates/router/src/utils.rs b/crates/router/src/utils.rs index e71bc04930..ef07ab6a84 100644 --- a/crates/router/src/utils.rs +++ b/crates/router/src/utils.rs @@ -884,6 +884,104 @@ impl CustomerAddress for api_models::customers::CustomerRequest { } } +#[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] +#[async_trait::async_trait] +impl CustomerAddress for api_models::customers::CustomerUpdateRequest { + async fn get_address_update( + &self, + state: &SessionState, + address_details: payments::AddressDetails, + key: &[u8], + storage_scheme: storage::enums::MerchantStorageScheme, + merchant_id: id_type::MerchantId, + ) -> CustomResult { + let encrypted_data = crypto_operation( + &state.into(), + type_name!(storage::Address), + CryptoOperation::BatchEncrypt(AddressDetailsWithPhone::to_encryptable( + AddressDetailsWithPhone { + address: Some(address_details.clone()), + phone_number: self.phone.clone(), + email: self.email.clone(), + }, + )), + Identifier::Merchant(merchant_id), + key, + ) + .await + .and_then(|val| val.try_into_batchoperation())?; + let encryptable_address = AddressDetailsWithPhone::from_encryptable(encrypted_data) + .change_context(common_utils::errors::CryptoError::EncodingFailed)?; + Ok(storage::AddressUpdate::Update { + city: address_details.city, + country: address_details.country, + line1: encryptable_address.line1, + line2: encryptable_address.line2, + line3: encryptable_address.line3, + zip: encryptable_address.zip, + state: encryptable_address.state, + first_name: encryptable_address.first_name, + last_name: encryptable_address.last_name, + phone_number: encryptable_address.phone_number, + country_code: self.phone_country_code.clone(), + updated_by: storage_scheme.to_string(), + email: encryptable_address.email, + }) + } + + async fn get_domain_address( + &self, + state: &SessionState, + address_details: payments::AddressDetails, + merchant_id: &id_type::MerchantId, + customer_id: &id_type::CustomerId, + key: &[u8], + storage_scheme: storage::enums::MerchantStorageScheme, + ) -> CustomResult { + let encrypted_data = crypto_operation( + &state.into(), + type_name!(storage::Address), + CryptoOperation::BatchEncrypt(AddressDetailsWithPhone::to_encryptable( + AddressDetailsWithPhone { + address: Some(address_details.clone()), + phone_number: self.phone.clone(), + email: self.email.clone(), + }, + )), + Identifier::Merchant(merchant_id.to_owned()), + key, + ) + .await + .and_then(|val| val.try_into_batchoperation())?; + let encryptable_address = AddressDetailsWithPhone::from_encryptable(encrypted_data) + .change_context(common_utils::errors::CryptoError::EncodingFailed)?; + let address = domain::Address { + city: address_details.city, + country: address_details.country, + line1: encryptable_address.line1, + line2: encryptable_address.line2, + line3: encryptable_address.line3, + zip: encryptable_address.zip, + state: encryptable_address.state, + first_name: encryptable_address.first_name, + last_name: encryptable_address.last_name, + phone_number: encryptable_address.phone_number, + country_code: self.phone_country_code.clone(), + merchant_id: merchant_id.to_owned(), + address_id: generate_id(consts::ID_LENGTH, "add"), + created_at: common_utils::date_time::now(), + modified_at: common_utils::date_time::now(), + updated_by: storage_scheme.to_string(), + email: encryptable_address.email, + }; + + Ok(domain::CustomerAddress { + address, + customer_id: customer_id.to_owned(), + }) + } +} + pub fn add_apple_pay_flow_metrics( apple_pay_flow: &Option, connector: Option, diff --git a/crates/storage_impl/src/payouts/payouts.rs b/crates/storage_impl/src/payouts/payouts.rs index f449666f6e..a1a1e49466 100644 --- a/crates/storage_impl/src/payouts/payouts.rs +++ b/crates/storage_impl/src/payouts/payouts.rs @@ -1,14 +1,14 @@ #[cfg(feature = "olap")] use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; use common_utils::ext_traits::Encode; +#[cfg(feature = "olap")] +use diesel::{associations::HasTable, ExpressionMethods, QueryDsl}; #[cfg(all( feature = "olap", any(feature = "v1", feature = "v2"), not(feature = "customer_v2") ))] -use diesel::JoinOnDsl; -#[cfg(feature = "olap")] -use diesel::{associations::HasTable, ExpressionMethods, NullableExpressionMethods, QueryDsl}; +use diesel::{JoinOnDsl, NullableExpressionMethods}; #[cfg(feature = "olap")] use diesel_models::{ customers::Customer as DieselCustomer, query::generics::db_metrics, @@ -47,12 +47,15 @@ use router_env::logger; use router_env::{instrument, tracing}; #[cfg(feature = "olap")] -use crate::{ - connection, - store::schema::{ - customers::all_columns as cust_all_columns, payout_attempt::all_columns as poa_all_columns, - payouts::all_columns as po_all_columns, - }, +use crate::connection; +#[cfg(all( + feature = "olap", + any(feature = "v1", feature = "v2"), + not(feature = "customer_v2") +))] +use crate::store::schema::{ + customers::all_columns as cust_all_columns, payout_attempt::all_columns as poa_all_columns, + payouts::all_columns as po_all_columns, }; use crate::{ diesel_error_to_data_error, diff --git a/crates/storage_impl/src/redis/kv_store.rs b/crates/storage_impl/src/redis/kv_store.rs index 7e10584582..5b2194c58a 100644 --- a/crates/storage_impl/src/redis/kv_store.rs +++ b/crates/storage_impl/src/redis/kv_store.rs @@ -34,6 +34,11 @@ pub enum PartitionKey<'a> { merchant_id: &'a common_utils::id_type::MerchantId, customer_id: &'a str, }, + #[cfg(all(feature = "v2", feature = "customer_v2"))] + MerchantIdMerchantReferenceId { + merchant_id: &'a common_utils::id_type::MerchantId, + merchant_reference_id: &'a str, + }, MerchantIdPayoutId { merchant_id: &'a common_utils::id_type::MerchantId, payout_id: &'a str, @@ -46,6 +51,10 @@ pub enum PartitionKey<'a> { merchant_id: &'a common_utils::id_type::MerchantId, mandate_id: &'a str, }, + #[cfg(all(feature = "v2", feature = "customer_v2"))] + GlobalId { + id: &'a str, + }, } // PartitionKey::MerchantIdPaymentId {merchant_id, payment_id} impl<'a> std::fmt::Display for PartitionKey<'a> { @@ -66,6 +75,14 @@ impl<'a> std::fmt::Display for PartitionKey<'a> { "mid_{}_cust_{customer_id}", merchant_id.get_string_repr() )), + #[cfg(all(feature = "v2", feature = "customer_v2"))] + PartitionKey::MerchantIdMerchantReferenceId { + merchant_id, + merchant_reference_id, + } => f.write_str(&format!( + "mid_{}_cust_{merchant_reference_id}", + merchant_id.get_string_repr() + )), PartitionKey::MerchantIdPayoutId { merchant_id, payout_id, @@ -87,6 +104,9 @@ impl<'a> std::fmt::Display for PartitionKey<'a> { "mid_{}_mandate_{mandate_id}", merchant_id.get_string_repr() )), + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + PartitionKey::GlobalId { id } => f.write_str(&format!("cust_{id}",)), } } } diff --git a/v2_migrations/2024-07-30-100323_customer_v2/up.sql b/v2_migrations/2024-07-30-100323_customer_v2/up.sql index e84aa67fb7..7c2c5aee00 100644 --- a/v2_migrations/2024-07-30-100323_customer_v2/up.sql +++ b/v2_migrations/2024-07-30-100323_customer_v2/up.sql @@ -7,6 +7,7 @@ ALTER TABLE customers ADD COLUMN IF NOT EXISTS default_shipping_address BYTEA DE ALTER TABLE customers DROP CONSTRAINT IF EXISTS customers_pkey; ALTER TABLE customers DROP COLUMN IF EXISTS id; + ALTER TABLE customers ADD COLUMN IF NOT EXISTS id VARCHAR(64); -- Back filling before making it primary key