mirror of
				https://github.com/juspay/hyperswitch.git
				synced 2025-10-31 10:06:32 +08:00 
			
		
		
		
	feat(router): add kv implementation for update address in update payments flow (#2542)
This commit is contained in:
		 Sai Harsha Vardhan
					Sai Harsha Vardhan
				
			
				
					committed by
					
						 GitHub
						GitHub
					
				
			
			
				
	
			
			
			 GitHub
						GitHub
					
				
			
						parent
						
							81cb8da4d4
						
					
				
				
					commit
					9f446bc174
				
			| @ -49,7 +49,7 @@ pub struct Address { | |||||||
|     pub payment_id: Option<String>, |     pub payment_id: Option<String>, | ||||||
| } | } | ||||||
|  |  | ||||||
| #[derive(Clone, Debug, AsChangeset, router_derive::DebugAsDisplay)] | #[derive(Clone, Debug, AsChangeset, router_derive::DebugAsDisplay, Serialize, Deserialize)] | ||||||
| #[diesel(table_name = address)] | #[diesel(table_name = address)] | ||||||
| pub struct AddressUpdateInternal { | pub struct AddressUpdateInternal { | ||||||
|     pub city: Option<String>, |     pub city: Option<String>, | ||||||
|  | |||||||
| @ -2,7 +2,7 @@ use error_stack::{IntoReport, ResultExt}; | |||||||
| use serde::{Deserialize, Serialize}; | use serde::{Deserialize, Serialize}; | ||||||
|  |  | ||||||
| use crate::{ | use crate::{ | ||||||
|     address::AddressNew, |     address::{Address, AddressNew, AddressUpdateInternal}, | ||||||
|     connector_response::{ConnectorResponse, ConnectorResponseNew, ConnectorResponseUpdate}, |     connector_response::{ConnectorResponse, ConnectorResponseNew, ConnectorResponseUpdate}, | ||||||
|     errors, |     errors, | ||||||
|     payment_attempt::{PaymentAttempt, PaymentAttemptNew, PaymentAttemptUpdate}, |     payment_attempt::{PaymentAttempt, PaymentAttemptNew, PaymentAttemptUpdate}, | ||||||
| @ -54,6 +54,7 @@ pub enum Updateable { | |||||||
|     PaymentAttemptUpdate(PaymentAttemptUpdateMems), |     PaymentAttemptUpdate(PaymentAttemptUpdateMems), | ||||||
|     RefundUpdate(RefundUpdateMems), |     RefundUpdate(RefundUpdateMems), | ||||||
|     ConnectorResponseUpdate(ConnectorResponseUpdateMems), |     ConnectorResponseUpdate(ConnectorResponseUpdateMems), | ||||||
|  |     AddressUpdate(Box<AddressUpdateMems>), | ||||||
| } | } | ||||||
|  |  | ||||||
| #[derive(Debug, Serialize, Deserialize)] | #[derive(Debug, Serialize, Deserialize)] | ||||||
| @ -62,6 +63,12 @@ pub struct ConnectorResponseUpdateMems { | |||||||
|     pub update_data: ConnectorResponseUpdate, |     pub update_data: ConnectorResponseUpdate, | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[derive(Debug, Serialize, Deserialize)] | ||||||
|  | pub struct AddressUpdateMems { | ||||||
|  |     pub orig: Address, | ||||||
|  |     pub update_data: AddressUpdateInternal, | ||||||
|  | } | ||||||
|  |  | ||||||
| #[derive(Debug, Serialize, Deserialize)] | #[derive(Debug, Serialize, Deserialize)] | ||||||
| pub struct PaymentIntentUpdateMems { | pub struct PaymentIntentUpdateMems { | ||||||
|     pub orig: PaymentIntent, |     pub orig: PaymentIntent, | ||||||
|  | |||||||
| @ -56,6 +56,32 @@ impl Address { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     #[instrument(skip(conn))] | ||||||
|  |     pub async fn update( | ||||||
|  |         self, | ||||||
|  |         conn: &PgPooledConn, | ||||||
|  |         address_update_internal: AddressUpdateInternal, | ||||||
|  |     ) -> StorageResult<Self> { | ||||||
|  |         match generics::generic_update_with_unique_predicate_get_result::< | ||||||
|  |             <Self as HasTable>::Table, | ||||||
|  |             _, | ||||||
|  |             _, | ||||||
|  |             _, | ||||||
|  |         >( | ||||||
|  |             conn, | ||||||
|  |             dsl::address_id.eq(self.address_id.clone()), | ||||||
|  |             address_update_internal, | ||||||
|  |         ) | ||||||
|  |         .await | ||||||
|  |         { | ||||||
|  |             Err(error) => match error.current_context() { | ||||||
|  |                 errors::DatabaseError::NoFieldsToUpdate => Ok(self), | ||||||
|  |                 _ => Err(error), | ||||||
|  |             }, | ||||||
|  |             result => result, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|     #[instrument(skip(conn))] |     #[instrument(skip(conn))] | ||||||
|     pub async fn delete_by_address_id( |     pub async fn delete_by_address_id( | ||||||
|         conn: &PgPooledConn, |         conn: &PgPooledConn, | ||||||
|  | |||||||
| @ -271,6 +271,11 @@ async fn drainer( | |||||||
|                             update_op, |                             update_op, | ||||||
|                             connector_response |                             connector_response | ||||||
|                         ), |                         ), | ||||||
|  |                         kv::Updateable::AddressUpdate(a) => macro_util::handle_resp!( | ||||||
|  |                             a.orig.update(&conn, a.update_data).await, | ||||||
|  |                             update_op, | ||||||
|  |                             address | ||||||
|  |                         ), | ||||||
|                     } |                     } | ||||||
|                 }) |                 }) | ||||||
|                 .await; |                 .await; | ||||||
|  | |||||||
| @ -181,10 +181,27 @@ pub async fn create_or_update_address_for_payment_by_request( | |||||||
|                 .await |                 .await | ||||||
|                 .change_context(errors::ApiErrorResponse::InternalServerError) |                 .change_context(errors::ApiErrorResponse::InternalServerError) | ||||||
|                 .attach_printable("Failed while encrypting address")?; |                 .attach_printable("Failed while encrypting address")?; | ||||||
|  |                 let address = db | ||||||
|  |                     .find_address_by_merchant_id_payment_id_address_id( | ||||||
|  |                         merchant_id, | ||||||
|  |                         payment_id, | ||||||
|  |                         id, | ||||||
|  |                         merchant_key_store, | ||||||
|  |                         storage_scheme, | ||||||
|  |                     ) | ||||||
|  |                     .await | ||||||
|  |                     .change_context(errors::ApiErrorResponse::InternalServerError) | ||||||
|  |                     .attach_printable("Error while fetching address")?; | ||||||
|                 Some( |                 Some( | ||||||
|                     db.update_address(id.to_owned(), address_update, merchant_key_store) |                     db.update_address_for_payments( | ||||||
|                         .await |                         address, | ||||||
|                         .to_not_found_response(errors::ApiErrorResponse::AddressNotFound)?, |                         address_update, | ||||||
|  |                         payment_id.to_string(), | ||||||
|  |                         merchant_key_store, | ||||||
|  |                         storage_scheme, | ||||||
|  |                     ) | ||||||
|  |                     .await | ||||||
|  |                     .to_not_found_response(errors::ApiErrorResponse::AddressNotFound)?, | ||||||
|                 ) |                 ) | ||||||
|             } |             } | ||||||
|             None => Some( |             None => Some( | ||||||
|  | |||||||
| @ -28,6 +28,15 @@ where | |||||||
|         key_store: &domain::MerchantKeyStore, |         key_store: &domain::MerchantKeyStore, | ||||||
|     ) -> CustomResult<domain::Address, errors::StorageError>; |     ) -> CustomResult<domain::Address, errors::StorageError>; | ||||||
|  |  | ||||||
|  |     async fn update_address_for_payments( | ||||||
|  |         &self, | ||||||
|  |         this: domain::Address, | ||||||
|  |         address: domain::AddressUpdate, | ||||||
|  |         payment_id: String, | ||||||
|  |         key_store: &domain::MerchantKeyStore, | ||||||
|  |         storage_scheme: MerchantStorageScheme, | ||||||
|  |     ) -> CustomResult<domain::Address, errors::StorageError>; | ||||||
|  |  | ||||||
|     async fn find_address_by_address_id( |     async fn find_address_by_address_id( | ||||||
|         &self, |         &self, | ||||||
|         address_id: &str, |         address_id: &str, | ||||||
| @ -155,6 +164,32 @@ mod storage { | |||||||
|                 .await |                 .await | ||||||
|         } |         } | ||||||
|  |  | ||||||
|  |         async fn update_address_for_payments( | ||||||
|  |             &self, | ||||||
|  |             this: domain::Address, | ||||||
|  |             address_update: domain::AddressUpdate, | ||||||
|  |             _payment_id: String, | ||||||
|  |             key_store: &domain::MerchantKeyStore, | ||||||
|  |             _storage_scheme: MerchantStorageScheme, | ||||||
|  |         ) -> CustomResult<domain::Address, errors::StorageError> { | ||||||
|  |             let conn = connection::pg_connection_write(self).await?; | ||||||
|  |             let address = Conversion::convert(this) | ||||||
|  |                 .await | ||||||
|  |                 .change_context(errors::StorageError::EncryptionError)?; | ||||||
|  |             address | ||||||
|  |                 .update(&conn, address_update.into()) | ||||||
|  |                 .await | ||||||
|  |                 .map_err(Into::into) | ||||||
|  |                 .into_report() | ||||||
|  |                 .async_and_then(|address| async { | ||||||
|  |                     address | ||||||
|  |                         .convert(key_store.key.get_inner()) | ||||||
|  |                         .await | ||||||
|  |                         .change_context(errors::StorageError::DecryptionError) | ||||||
|  |                 }) | ||||||
|  |                 .await | ||||||
|  |         } | ||||||
|  |  | ||||||
|         async fn insert_address_for_payments( |         async fn insert_address_for_payments( | ||||||
|             &self, |             &self, | ||||||
|             _payment_id: &str, |             _payment_id: &str, | ||||||
| @ -241,6 +276,7 @@ mod storage { | |||||||
| mod storage { | mod storage { | ||||||
|     use common_utils::ext_traits::AsyncExt; |     use common_utils::ext_traits::AsyncExt; | ||||||
|     use data_models::MerchantStorageScheme; |     use data_models::MerchantStorageScheme; | ||||||
|  |     use diesel_models::AddressUpdateInternal; | ||||||
|     use error_stack::{IntoReport, ResultExt}; |     use error_stack::{IntoReport, ResultExt}; | ||||||
|     use redis_interface::HsetnxReply; |     use redis_interface::HsetnxReply; | ||||||
|     use router_env::{instrument, tracing}; |     use router_env::{instrument, tracing}; | ||||||
| @ -348,6 +384,79 @@ mod storage { | |||||||
|                 .await |                 .await | ||||||
|         } |         } | ||||||
|  |  | ||||||
|  |         async fn update_address_for_payments( | ||||||
|  |             &self, | ||||||
|  |             this: domain::Address, | ||||||
|  |             address_update: domain::AddressUpdate, | ||||||
|  |             payment_id: String, | ||||||
|  |             key_store: &domain::MerchantKeyStore, | ||||||
|  |             storage_scheme: MerchantStorageScheme, | ||||||
|  |         ) -> CustomResult<domain::Address, errors::StorageError> { | ||||||
|  |             let conn = connection::pg_connection_write(self).await?; | ||||||
|  |             let address = Conversion::convert(this) | ||||||
|  |                 .await | ||||||
|  |                 .change_context(errors::StorageError::EncryptionError)?; | ||||||
|  |             match storage_scheme { | ||||||
|  |                 MerchantStorageScheme::PostgresOnly => { | ||||||
|  |                     address | ||||||
|  |                         .update(&conn, address_update.into()) | ||||||
|  |                         .await | ||||||
|  |                         .map_err(Into::into) | ||||||
|  |                         .into_report() | ||||||
|  |                         .async_and_then(|address| async { | ||||||
|  |                             address | ||||||
|  |                                 .convert(key_store.key.get_inner()) | ||||||
|  |                                 .await | ||||||
|  |                                 .change_context(errors::StorageError::DecryptionError) | ||||||
|  |                         }) | ||||||
|  |                         .await | ||||||
|  |                 } | ||||||
|  |                 MerchantStorageScheme::RedisKv => { | ||||||
|  |                     let key = format!("mid_{}_pid_{}", address.merchant_id.clone(), payment_id); | ||||||
|  |                     let field = format!("add_{}", address.address_id); | ||||||
|  |                     let updated_address = AddressUpdateInternal::from(address_update.clone()) | ||||||
|  |                         .create_address(address.clone()); | ||||||
|  |                     let redis_value = serde_json::to_string(&updated_address) | ||||||
|  |                         .into_report() | ||||||
|  |                         .change_context(errors::StorageError::KVError)?; | ||||||
|  |                     kv_wrapper::<(), _, _>( | ||||||
|  |                         self, | ||||||
|  |                         KvOperation::Hset::<storage_types::Address>((&field, redis_value)), | ||||||
|  |                         &key, | ||||||
|  |                     ) | ||||||
|  |                     .await | ||||||
|  |                     .change_context(errors::StorageError::KVError)? | ||||||
|  |                     .try_into_hset() | ||||||
|  |                     .change_context(errors::StorageError::KVError)?; | ||||||
|  |  | ||||||
|  |                     let redis_entry = kv::TypedSql { | ||||||
|  |                         op: kv::DBOperation::Update { | ||||||
|  |                             updatable: kv::Updateable::AddressUpdate(Box::new( | ||||||
|  |                                 kv::AddressUpdateMems { | ||||||
|  |                                     orig: address, | ||||||
|  |                                     update_data: address_update.into(), | ||||||
|  |                                 }, | ||||||
|  |                             )), | ||||||
|  |                         }, | ||||||
|  |                     }; | ||||||
|  |  | ||||||
|  |                     self.push_to_drainer_stream::<storage_types::Address>( | ||||||
|  |                         redis_entry, | ||||||
|  |                         PartitionKey::MerchantIdPaymentId { | ||||||
|  |                             merchant_id: &updated_address.merchant_id, | ||||||
|  |                             payment_id: &payment_id, | ||||||
|  |                         }, | ||||||
|  |                     ) | ||||||
|  |                     .await | ||||||
|  |                     .change_context(errors::StorageError::KVError)?; | ||||||
|  |                     updated_address | ||||||
|  |                         .convert(key_store.key.get_inner()) | ||||||
|  |                         .await | ||||||
|  |                         .change_context(errors::StorageError::DecryptionError) | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |  | ||||||
|         async fn insert_address_for_payments( |         async fn insert_address_for_payments( | ||||||
|             &self, |             &self, | ||||||
|             payment_id: &str, |             payment_id: &str, | ||||||
| @ -584,6 +693,37 @@ impl AddressInterface for MockDb { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     async fn update_address_for_payments( | ||||||
|  |         &self, | ||||||
|  |         this: domain::Address, | ||||||
|  |         address_update: domain::AddressUpdate, | ||||||
|  |         _payment_id: String, | ||||||
|  |         key_store: &domain::MerchantKeyStore, | ||||||
|  |         _storage_scheme: MerchantStorageScheme, | ||||||
|  |     ) -> CustomResult<domain::Address, errors::StorageError> { | ||||||
|  |         match self | ||||||
|  |             .addresses | ||||||
|  |             .lock() | ||||||
|  |             .await | ||||||
|  |             .iter_mut() | ||||||
|  |             .find(|address| address.address_id == this.address_id) | ||||||
|  |             .map(|a| { | ||||||
|  |                 let address_updated = | ||||||
|  |                     AddressUpdateInternal::from(address_update).create_address(a.clone()); | ||||||
|  |                 *a = address_updated.clone(); | ||||||
|  |                 address_updated | ||||||
|  |             }) { | ||||||
|  |             Some(address_updated) => address_updated | ||||||
|  |                 .convert(key_store.key.get_inner()) | ||||||
|  |                 .await | ||||||
|  |                 .change_context(errors::StorageError::DecryptionError), | ||||||
|  |             None => Err(errors::StorageError::ValueNotFound( | ||||||
|  |                 "cannot find address to update".to_string(), | ||||||
|  |             ) | ||||||
|  |             .into()), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|     async fn insert_address_for_payments( |     async fn insert_address_for_payments( | ||||||
|         &self, |         &self, | ||||||
|         _payment_id: &str, |         _payment_id: &str, | ||||||
|  | |||||||
| @ -125,7 +125,7 @@ impl behaviour::Conversion for Address { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| #[derive(Debug)] | #[derive(Debug, Clone)] | ||||||
| pub enum AddressUpdate { | pub enum AddressUpdate { | ||||||
|     Update { |     Update { | ||||||
|         city: Option<String>, |         city: Option<String>, | ||||||
|  | |||||||
| @ -1,4 +1,4 @@ | |||||||
| pub use diesel_models::kv::{ | pub use diesel_models::kv::{ | ||||||
|     ConnectorResponseUpdateMems, DBOperation, Insertable, PaymentAttemptUpdateMems, |     AddressUpdateMems, ConnectorResponseUpdateMems, DBOperation, Insertable, | ||||||
|     PaymentIntentUpdateMems, RefundUpdateMems, TypedSql, Updateable, |     PaymentAttemptUpdateMems, PaymentIntentUpdateMems, RefundUpdateMems, TypedSql, Updateable, | ||||||
| }; | }; | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user