diff --git a/crates/diesel_models/src/address.rs b/crates/diesel_models/src/address.rs index 028d878f7d..569df3f551 100644 --- a/crates/diesel_models/src/address.rs +++ b/crates/diesel_models/src/address.rs @@ -49,7 +49,7 @@ pub struct Address { pub payment_id: Option, } -#[derive(Clone, Debug, AsChangeset, router_derive::DebugAsDisplay)] +#[derive(Clone, Debug, AsChangeset, router_derive::DebugAsDisplay, Serialize, Deserialize)] #[diesel(table_name = address)] pub struct AddressUpdateInternal { pub city: Option, diff --git a/crates/diesel_models/src/kv.rs b/crates/diesel_models/src/kv.rs index 93d045bd71..ed886c783f 100644 --- a/crates/diesel_models/src/kv.rs +++ b/crates/diesel_models/src/kv.rs @@ -2,7 +2,7 @@ use error_stack::{IntoReport, ResultExt}; use serde::{Deserialize, Serialize}; use crate::{ - address::AddressNew, + address::{Address, AddressNew, AddressUpdateInternal}, connector_response::{ConnectorResponse, ConnectorResponseNew, ConnectorResponseUpdate}, errors, payment_attempt::{PaymentAttempt, PaymentAttemptNew, PaymentAttemptUpdate}, @@ -54,6 +54,7 @@ pub enum Updateable { PaymentAttemptUpdate(PaymentAttemptUpdateMems), RefundUpdate(RefundUpdateMems), ConnectorResponseUpdate(ConnectorResponseUpdateMems), + AddressUpdate(Box), } #[derive(Debug, Serialize, Deserialize)] @@ -62,6 +63,12 @@ pub struct ConnectorResponseUpdateMems { pub update_data: ConnectorResponseUpdate, } +#[derive(Debug, Serialize, Deserialize)] +pub struct AddressUpdateMems { + pub orig: Address, + pub update_data: AddressUpdateInternal, +} + #[derive(Debug, Serialize, Deserialize)] pub struct PaymentIntentUpdateMems { pub orig: PaymentIntent, diff --git a/crates/diesel_models/src/query/address.rs b/crates/diesel_models/src/query/address.rs index 9a4f20942c..ada4762299 100644 --- a/crates/diesel_models/src/query/address.rs +++ b/crates/diesel_models/src/query/address.rs @@ -56,6 +56,32 @@ impl Address { } } + #[instrument(skip(conn))] + pub async fn update( + self, + conn: &PgPooledConn, + address_update_internal: AddressUpdateInternal, + ) -> StorageResult { + match generics::generic_update_with_unique_predicate_get_result::< + ::Table, + _, + _, + _, + >( + conn, + dsl::address_id.eq(self.address_id.clone()), + address_update_internal, + ) + .await + { + Err(error) => match error.current_context() { + errors::DatabaseError::NoFieldsToUpdate => Ok(self), + _ => Err(error), + }, + result => result, + } + } + #[instrument(skip(conn))] pub async fn delete_by_address_id( conn: &PgPooledConn, diff --git a/crates/drainer/src/lib.rs b/crates/drainer/src/lib.rs index e09c565405..632fb4a0c1 100644 --- a/crates/drainer/src/lib.rs +++ b/crates/drainer/src/lib.rs @@ -271,6 +271,11 @@ async fn drainer( update_op, connector_response ), + kv::Updateable::AddressUpdate(a) => macro_util::handle_resp!( + a.orig.update(&conn, a.update_data).await, + update_op, + address + ), } }) .await; diff --git a/crates/router/src/core/payments/helpers.rs b/crates/router/src/core/payments/helpers.rs index 143b992395..a4cb7b2b9e 100644 --- a/crates/router/src/core/payments/helpers.rs +++ b/crates/router/src/core/payments/helpers.rs @@ -181,10 +181,27 @@ pub async fn create_or_update_address_for_payment_by_request( .await .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Failed while encrypting address")?; + let address = db + .find_address_by_merchant_id_payment_id_address_id( + merchant_id, + payment_id, + id, + merchant_key_store, + storage_scheme, + ) + .await + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Error while fetching address")?; Some( - db.update_address(id.to_owned(), address_update, merchant_key_store) - .await - .to_not_found_response(errors::ApiErrorResponse::AddressNotFound)?, + db.update_address_for_payments( + address, + address_update, + payment_id.to_string(), + merchant_key_store, + storage_scheme, + ) + .await + .to_not_found_response(errors::ApiErrorResponse::AddressNotFound)?, ) } None => Some( diff --git a/crates/router/src/db/address.rs b/crates/router/src/db/address.rs index dda6838c97..0b7bffda2f 100644 --- a/crates/router/src/db/address.rs +++ b/crates/router/src/db/address.rs @@ -28,6 +28,15 @@ where key_store: &domain::MerchantKeyStore, ) -> CustomResult; + async fn update_address_for_payments( + &self, + this: domain::Address, + address: domain::AddressUpdate, + payment_id: String, + key_store: &domain::MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; + async fn find_address_by_address_id( &self, address_id: &str, @@ -155,6 +164,32 @@ mod storage { .await } + async fn update_address_for_payments( + &self, + this: domain::Address, + address_update: domain::AddressUpdate, + _payment_id: String, + key_store: &domain::MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + let address = Conversion::convert(this) + .await + .change_context(errors::StorageError::EncryptionError)?; + address + .update(&conn, address_update.into()) + .await + .map_err(Into::into) + .into_report() + .async_and_then(|address| async { + address + .convert(key_store.key.get_inner()) + .await + .change_context(errors::StorageError::DecryptionError) + }) + .await + } + async fn insert_address_for_payments( &self, _payment_id: &str, @@ -241,6 +276,7 @@ mod storage { mod storage { use common_utils::ext_traits::AsyncExt; use data_models::MerchantStorageScheme; + use diesel_models::AddressUpdateInternal; use error_stack::{IntoReport, ResultExt}; use redis_interface::HsetnxReply; use router_env::{instrument, tracing}; @@ -348,6 +384,79 @@ mod storage { .await } + async fn update_address_for_payments( + &self, + this: domain::Address, + address_update: domain::AddressUpdate, + payment_id: String, + key_store: &domain::MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + let address = Conversion::convert(this) + .await + .change_context(errors::StorageError::EncryptionError)?; + match storage_scheme { + MerchantStorageScheme::PostgresOnly => { + address + .update(&conn, address_update.into()) + .await + .map_err(Into::into) + .into_report() + .async_and_then(|address| async { + address + .convert(key_store.key.get_inner()) + .await + .change_context(errors::StorageError::DecryptionError) + }) + .await + } + MerchantStorageScheme::RedisKv => { + let key = format!("mid_{}_pid_{}", address.merchant_id.clone(), payment_id); + let field = format!("add_{}", address.address_id); + let updated_address = AddressUpdateInternal::from(address_update.clone()) + .create_address(address.clone()); + let redis_value = serde_json::to_string(&updated_address) + .into_report() + .change_context(errors::StorageError::KVError)?; + kv_wrapper::<(), _, _>( + self, + KvOperation::Hset::((&field, redis_value)), + &key, + ) + .await + .change_context(errors::StorageError::KVError)? + .try_into_hset() + .change_context(errors::StorageError::KVError)?; + + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Update { + updatable: kv::Updateable::AddressUpdate(Box::new( + kv::AddressUpdateMems { + orig: address, + update_data: address_update.into(), + }, + )), + }, + }; + + self.push_to_drainer_stream::( + redis_entry, + PartitionKey::MerchantIdPaymentId { + merchant_id: &updated_address.merchant_id, + payment_id: &payment_id, + }, + ) + .await + .change_context(errors::StorageError::KVError)?; + updated_address + .convert(key_store.key.get_inner()) + .await + .change_context(errors::StorageError::DecryptionError) + } + } + } + async fn insert_address_for_payments( &self, payment_id: &str, @@ -584,6 +693,37 @@ impl AddressInterface for MockDb { } } + async fn update_address_for_payments( + &self, + this: domain::Address, + address_update: domain::AddressUpdate, + _payment_id: String, + key_store: &domain::MerchantKeyStore, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + match self + .addresses + .lock() + .await + .iter_mut() + .find(|address| address.address_id == this.address_id) + .map(|a| { + let address_updated = + AddressUpdateInternal::from(address_update).create_address(a.clone()); + *a = address_updated.clone(); + address_updated + }) { + Some(address_updated) => address_updated + .convert(key_store.key.get_inner()) + .await + .change_context(errors::StorageError::DecryptionError), + None => Err(errors::StorageError::ValueNotFound( + "cannot find address to update".to_string(), + ) + .into()), + } + } + async fn insert_address_for_payments( &self, _payment_id: &str, diff --git a/crates/router/src/types/domain/address.rs b/crates/router/src/types/domain/address.rs index fadd83b34e..bd9d034e8c 100644 --- a/crates/router/src/types/domain/address.rs +++ b/crates/router/src/types/domain/address.rs @@ -125,7 +125,7 @@ impl behaviour::Conversion for Address { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum AddressUpdate { Update { city: Option, diff --git a/crates/router/src/types/storage/kv.rs b/crates/router/src/types/storage/kv.rs index 51c9eb2b42..2afc73e663 100644 --- a/crates/router/src/types/storage/kv.rs +++ b/crates/router/src/types/storage/kv.rs @@ -1,4 +1,4 @@ pub use diesel_models::kv::{ - ConnectorResponseUpdateMems, DBOperation, Insertable, PaymentAttemptUpdateMems, - PaymentIntentUpdateMems, RefundUpdateMems, TypedSql, Updateable, + AddressUpdateMems, ConnectorResponseUpdateMems, DBOperation, Insertable, + PaymentAttemptUpdateMems, PaymentIntentUpdateMems, RefundUpdateMems, TypedSql, Updateable, };