mirror of
				https://github.com/juspay/hyperswitch.git
				synced 2025-11-01 02:57:02 +08:00 
			
		
		
		
	feat(core): add redis lock during insertion of event in event table during initial attempt of outgoing webhook delivery (#7579)
Co-authored-by: Debarati Ghatak <debarati.ghatak@Debarati-Ghatak-FW569NC29L.local>
This commit is contained in:
		| @ -160,6 +160,7 @@ max_age = 365     # Max age of a refund in days. | |||||||
|  |  | ||||||
| [webhooks] | [webhooks] | ||||||
| outgoing_enabled = true | outgoing_enabled = true | ||||||
|  | redis_lock_expiry_seconds = 180 | ||||||
|  |  | ||||||
| # Validity of an Ephemeral Key in Hours | # Validity of an Ephemeral Key in Hours | ||||||
| [eph_key] | [eph_key] | ||||||
|  | |||||||
| @ -592,6 +592,7 @@ billwerk = {long_lived_token = false, payment_method = "card"} | |||||||
|  |  | ||||||
| [webhooks] | [webhooks] | ||||||
| outgoing_enabled = true | outgoing_enabled = true | ||||||
|  | redis_lock_expiry_seconds = 180 | ||||||
|  |  | ||||||
| [webhook_source_verification_call] | [webhook_source_verification_call] | ||||||
| connectors_with_webhook_source_verification_call = "paypal"         # List of connectors which has additional source verification api-call | connectors_with_webhook_source_verification_call = "paypal"         # List of connectors which has additional source verification api-call | ||||||
|  | |||||||
| @ -607,6 +607,7 @@ billwerk = {long_lived_token = false, payment_method = "card"} | |||||||
|  |  | ||||||
| [webhooks] | [webhooks] | ||||||
| outgoing_enabled = true | outgoing_enabled = true | ||||||
|  | redis_lock_expiry_seconds = 180 | ||||||
|  |  | ||||||
| [webhook_source_verification_call] | [webhook_source_verification_call] | ||||||
| connectors_with_webhook_source_verification_call = "paypal"     # List of connectors which has additional source verification api-call | connectors_with_webhook_source_verification_call = "paypal"     # List of connectors which has additional source verification api-call | ||||||
|  | |||||||
| @ -608,6 +608,7 @@ billwerk = {long_lived_token = false, payment_method = "card"} | |||||||
|  |  | ||||||
| [webhooks] | [webhooks] | ||||||
| outgoing_enabled = true | outgoing_enabled = true | ||||||
|  | redis_lock_expiry_seconds = 180 | ||||||
|  |  | ||||||
| [webhook_source_verification_call] | [webhook_source_verification_call] | ||||||
| connectors_with_webhook_source_verification_call = "paypal"        # List of connectors which has additional source verification api-call | connectors_with_webhook_source_verification_call = "paypal"        # List of connectors which has additional source verification api-call | ||||||
|  | |||||||
| @ -209,6 +209,7 @@ max_age = 365 | |||||||
|  |  | ||||||
| [webhooks] | [webhooks] | ||||||
| outgoing_enabled = true | outgoing_enabled = true | ||||||
|  | redis_lock_expiry_seconds = 180             # 3 * 60 seconds | ||||||
|  |  | ||||||
| [eph_key] | [eph_key] | ||||||
| validity = 1 | validity = 1 | ||||||
|  | |||||||
| @ -742,6 +742,10 @@ pm_auth_key = "Some_pm_auth_key" | |||||||
| redis_lock_expiry_seconds = 180             # 3 * 60 seconds | redis_lock_expiry_seconds = 180             # 3 * 60 seconds | ||||||
| delay_between_retries_in_milliseconds = 500 | delay_between_retries_in_milliseconds = 500 | ||||||
|  |  | ||||||
|  | [webhooks] | ||||||
|  | outgoing_enabled = true | ||||||
|  | redis_lock_expiry_seconds = 180             # 3 * 60 seconds | ||||||
|  |  | ||||||
| [events.kafka] | [events.kafka] | ||||||
| brokers = ["localhost:9092"] | brokers = ["localhost:9092"] | ||||||
| fraud_check_analytics_topic = "hyperswitch-fraud-check-events" | fraud_check_analytics_topic = "hyperswitch-fraud-check-events" | ||||||
|  | |||||||
| @ -788,6 +788,7 @@ pub struct DrainerSettings { | |||||||
| pub struct WebhooksSettings { | pub struct WebhooksSettings { | ||||||
|     pub outgoing_enabled: bool, |     pub outgoing_enabled: bool, | ||||||
|     pub ignore_error: WebhookIgnoreErrorSettings, |     pub ignore_error: WebhookIgnoreErrorSettings, | ||||||
|  |     pub redis_lock_expiry_seconds: u32, | ||||||
| } | } | ||||||
|  |  | ||||||
| #[derive(Debug, Clone, Deserialize, Default)] | #[derive(Debug, Clone, Deserialize, Default)] | ||||||
|  | |||||||
| @ -181,6 +181,18 @@ impl super::settings::LockSettings { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | impl super::settings::WebhooksSettings { | ||||||
|  |     pub fn validate(&self) -> Result<(), ApplicationError> { | ||||||
|  |         use common_utils::fp_utils::when; | ||||||
|  |  | ||||||
|  |         when(self.redis_lock_expiry_seconds.is_default_or_empty(), || { | ||||||
|  |             Err(ApplicationError::InvalidConfigurationValueError( | ||||||
|  |                 "redis_lock_expiry_seconds must not be empty or 0".into(), | ||||||
|  |             )) | ||||||
|  |         }) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
| impl super::settings::GenericLinkEnvConfig { | impl super::settings::GenericLinkEnvConfig { | ||||||
|     pub fn validate(&self) -> Result<(), ApplicationError> { |     pub fn validate(&self) -> Result<(), ApplicationError> { | ||||||
|         use common_utils::fp_utils::when; |         use common_utils::fp_utils::when; | ||||||
|  | |||||||
| @ -136,6 +136,41 @@ pub(crate) async fn create_event_and_trigger_outgoing_webhook( | |||||||
|         is_overall_delivery_successful: Some(false), |         is_overall_delivery_successful: Some(false), | ||||||
|     }; |     }; | ||||||
|  |  | ||||||
|  |     let lock_value = utils::perform_redis_lock( | ||||||
|  |         &state, | ||||||
|  |         &idempotent_event_id, | ||||||
|  |         merchant_account.get_id().to_owned(), | ||||||
|  |     ) | ||||||
|  |     .await?; | ||||||
|  |  | ||||||
|  |     if lock_value.is_none() { | ||||||
|  |         return Ok(()); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     if (state | ||||||
|  |         .store | ||||||
|  |         .find_event_by_merchant_id_event_id( | ||||||
|  |             key_manager_state, | ||||||
|  |             &merchant_id, | ||||||
|  |             &event_id, | ||||||
|  |             merchant_key_store, | ||||||
|  |         ) | ||||||
|  |         .await) | ||||||
|  |         .is_ok() | ||||||
|  |     { | ||||||
|  |         logger::debug!( | ||||||
|  |             "Event with idempotent ID `{idempotent_event_id}` already exists in the database" | ||||||
|  |         ); | ||||||
|  |         utils::free_redis_lock( | ||||||
|  |             &state, | ||||||
|  |             &idempotent_event_id, | ||||||
|  |             merchant_account.get_id().to_owned(), | ||||||
|  |             lock_value, | ||||||
|  |         ) | ||||||
|  |         .await?; | ||||||
|  |         return Ok(()); | ||||||
|  |     } | ||||||
|  |  | ||||||
|     let event_insert_result = state |     let event_insert_result = state | ||||||
|         .store |         .store | ||||||
|         .insert_event(key_manager_state, new_event, merchant_key_store) |         .insert_event(key_manager_state, new_event, merchant_key_store) | ||||||
| @ -144,18 +179,21 @@ pub(crate) async fn create_event_and_trigger_outgoing_webhook( | |||||||
|     let event = match event_insert_result { |     let event = match event_insert_result { | ||||||
|         Ok(event) => Ok(event), |         Ok(event) => Ok(event), | ||||||
|         Err(error) => { |         Err(error) => { | ||||||
|             if error.current_context().is_db_unique_violation() { |             logger::error!(event_insertion_failure=?error); | ||||||
|                 logger::debug!("Event with idempotent ID `{idempotent_event_id}` already exists in the database"); |             Err(error | ||||||
|                 return Ok(()); |                 .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) | ||||||
|             } else { |                 .attach_printable("Failed to insert event in events table")) | ||||||
|                 logger::error!(event_insertion_failure=?error); |  | ||||||
|                 Err(error |  | ||||||
|                     .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) |  | ||||||
|                     .attach_printable("Failed to insert event in events table")) |  | ||||||
|             } |  | ||||||
|         } |         } | ||||||
|     }?; |     }?; | ||||||
|  |  | ||||||
|  |     utils::free_redis_lock( | ||||||
|  |         &state, | ||||||
|  |         &idempotent_event_id, | ||||||
|  |         merchant_account.get_id().to_owned(), | ||||||
|  |         lock_value, | ||||||
|  |     ) | ||||||
|  |     .await?; | ||||||
|  |  | ||||||
|     let process_tracker = add_outgoing_webhook_retry_task_to_process_tracker( |     let process_tracker = add_outgoing_webhook_retry_task_to_process_tracker( | ||||||
|         &*state.store, |         &*state.store, | ||||||
|         &business_profile, |         &business_profile, | ||||||
|  | |||||||
| @ -2,6 +2,8 @@ use std::marker::PhantomData; | |||||||
|  |  | ||||||
| use common_utils::{errors::CustomResult, ext_traits::ValueExt}; | use common_utils::{errors::CustomResult, ext_traits::ValueExt}; | ||||||
| use error_stack::ResultExt; | use error_stack::ResultExt; | ||||||
|  | use redis_interface as redis; | ||||||
|  | use router_env::tracing; | ||||||
|  |  | ||||||
| use crate::{ | use crate::{ | ||||||
|     core::{ |     core::{ | ||||||
| @ -9,6 +11,8 @@ use crate::{ | |||||||
|         payments::helpers, |         payments::helpers, | ||||||
|     }, |     }, | ||||||
|     db::{get_and_deserialize_key, StorageInterface}, |     db::{get_and_deserialize_key, StorageInterface}, | ||||||
|  |     errors::RouterResult, | ||||||
|  |     routes::app::SessionStateInfo, | ||||||
|     services::logger, |     services::logger, | ||||||
|     types::{self, api, domain, PaymentAddress}, |     types::{self, api, domain, PaymentAddress}, | ||||||
|     SessionState, |     SessionState, | ||||||
| @ -154,3 +158,106 @@ pub(crate) fn get_idempotent_event_id( | |||||||
| pub(crate) fn generate_event_id() -> String { | pub(crate) fn generate_event_id() -> String { | ||||||
|     common_utils::generate_time_ordered_id("evt") |     common_utils::generate_time_ordered_id("evt") | ||||||
| } | } | ||||||
|  |  | ||||||
|  | const WEBHOOK_LOCK_PREFIX: &str = "WEBHOOK_LOCK"; | ||||||
|  |  | ||||||
|  | pub(super) async fn perform_redis_lock<A>( | ||||||
|  |     state: &A, | ||||||
|  |     unique_locking_key: &str, | ||||||
|  |     merchant_id: common_utils::id_type::MerchantId, | ||||||
|  | ) -> RouterResult<Option<String>> | ||||||
|  | where | ||||||
|  |     A: SessionStateInfo, | ||||||
|  | { | ||||||
|  |     let lock_value: String = uuid::Uuid::new_v4().to_string(); | ||||||
|  |     let redis_conn = state | ||||||
|  |         .store() | ||||||
|  |         .get_redis_conn() | ||||||
|  |         .change_context(errors::ApiErrorResponse::InternalServerError) | ||||||
|  |         .attach_printable("Error connecting to redis")?; | ||||||
|  |  | ||||||
|  |     let redis_locking_key = format!( | ||||||
|  |         "{}_{}_{}", | ||||||
|  |         WEBHOOK_LOCK_PREFIX, | ||||||
|  |         merchant_id.get_string_repr(), | ||||||
|  |         unique_locking_key | ||||||
|  |     ); | ||||||
|  |     let redis_lock_expiry_seconds = state.conf().webhooks.redis_lock_expiry_seconds; | ||||||
|  |  | ||||||
|  |     let redis_lock_result = redis_conn | ||||||
|  |         .set_key_if_not_exists_with_expiry( | ||||||
|  |             &redis_locking_key.as_str().into(), | ||||||
|  |             lock_value.clone(), | ||||||
|  |             Some(i64::from(redis_lock_expiry_seconds)), | ||||||
|  |         ) | ||||||
|  |         .await; | ||||||
|  |  | ||||||
|  |     match redis_lock_result { | ||||||
|  |         Ok(redis::SetnxReply::KeySet) => { | ||||||
|  |             logger::info!("Lock acquired for for {redis_locking_key}"); | ||||||
|  |             Ok(Some(lock_value)) | ||||||
|  |         } | ||||||
|  |         Ok(redis::SetnxReply::KeyNotSet) => { | ||||||
|  |             logger::info!("Lock already held for {redis_locking_key}"); | ||||||
|  |             Ok(None) | ||||||
|  |         } | ||||||
|  |         Err(err) => Err(err | ||||||
|  |             .change_context(errors::ApiErrorResponse::InternalServerError) | ||||||
|  |             .attach_printable("Error acquiring redis lock")), | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | pub(super) async fn free_redis_lock<A>( | ||||||
|  |     state: &A, | ||||||
|  |     unique_locking_key: &str, | ||||||
|  |     merchant_id: common_utils::id_type::MerchantId, | ||||||
|  |     lock_value: Option<String>, | ||||||
|  | ) -> RouterResult<()> | ||||||
|  | where | ||||||
|  |     A: SessionStateInfo, | ||||||
|  | { | ||||||
|  |     let redis_conn = state | ||||||
|  |         .store() | ||||||
|  |         .get_redis_conn() | ||||||
|  |         .change_context(errors::ApiErrorResponse::InternalServerError) | ||||||
|  |         .attach_printable("Error connecting to redis")?; | ||||||
|  |  | ||||||
|  |     let redis_locking_key = format!( | ||||||
|  |         "{}_{}_{}", | ||||||
|  |         WEBHOOK_LOCK_PREFIX, | ||||||
|  |         merchant_id.get_string_repr(), | ||||||
|  |         unique_locking_key | ||||||
|  |     ); | ||||||
|  |     match redis_conn | ||||||
|  |         .get_key::<Option<String>>(&redis_locking_key.as_str().into()) | ||||||
|  |         .await | ||||||
|  |     { | ||||||
|  |         Ok(val) => { | ||||||
|  |             if val == lock_value { | ||||||
|  |                 match redis_conn | ||||||
|  |                     .delete_key(&redis_locking_key.as_str().into()) | ||||||
|  |                     .await | ||||||
|  |                 { | ||||||
|  |                     Ok(redis::types::DelReply::KeyDeleted) => { | ||||||
|  |                         logger::info!("Lock freed {redis_locking_key}"); | ||||||
|  |                         tracing::Span::current().record("redis_lock_released", redis_locking_key); | ||||||
|  |                         Ok(()) | ||||||
|  |                     } | ||||||
|  |                     Ok(redis::types::DelReply::KeyNotDeleted) => Err( | ||||||
|  |                         errors::ApiErrorResponse::InternalServerError, | ||||||
|  |                     ) | ||||||
|  |                     .attach_printable("Status release lock called but key is not found in redis"), | ||||||
|  |                     Err(error) => Err(error) | ||||||
|  |                         .change_context(errors::ApiErrorResponse::InternalServerError) | ||||||
|  |                         .attach_printable("Error while deleting redis key"), | ||||||
|  |                 } | ||||||
|  |             } else { | ||||||
|  |                 Err(errors::ApiErrorResponse::InternalServerError) | ||||||
|  |                             .attach_printable("The redis value which acquired the lock is not equal to the redis value requesting for releasing the lock") | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |         Err(error) => Err(error) | ||||||
|  |             .change_context(errors::ApiErrorResponse::InternalServerError) | ||||||
|  |             .attach_printable("Error while deleting redis key"), | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | |||||||
| @ -67,6 +67,7 @@ vault_private_key = "" | |||||||
|  |  | ||||||
| [webhooks] | [webhooks] | ||||||
| outgoing_enabled = true | outgoing_enabled = true | ||||||
|  | redis_lock_expiry_seconds = 180 | ||||||
|  |  | ||||||
| [api_keys] | [api_keys] | ||||||
| hash_key = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef" | hash_key = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef" | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user
	 Debarati Ghatak
					Debarati Ghatak