mirror of
				https://github.com/juspay/hyperswitch.git
				synced 2025-11-01 02:57:02 +08:00 
			
		
		
		
	feat(router): add outgoing payment webhooks for v2 (#6613)
Co-authored-by: Narayan Bhat <narayan.bhat@juspay.in> Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Co-authored-by: hrithikesh026 <hrithikesh.vm@juspay.in> Co-authored-by: Aishwariyaa Anand <124241367+Aishwariyaa-Anand@users.noreply.github.com> Co-authored-by: Aishwariyaa Anand <aishwariyaa.anand@Aishwariyaa-Anand-C3PGW02T6Y.local>
This commit is contained in:
		 Sai Harsha Vardhan
					Sai Harsha Vardhan
				
			
				
					committed by
					
						 GitHub
						GitHub
					
				
			
			
				
	
			
			
			 GitHub
						GitHub
					
				
			
						parent
						
							67f38f864e
						
					
				
				
					commit
					aa6ebf8aef
				
			| @ -4,6 +4,8 @@ mod incoming; | ||||
| mod incoming_v2; | ||||
| #[cfg(feature = "v1")] | ||||
| mod outgoing; | ||||
| #[cfg(feature = "v2")] | ||||
| mod outgoing_v2; | ||||
| #[cfg(all(feature = "revenue_recovery", feature = "v2"))] | ||||
| pub mod recovery_incoming; | ||||
| pub mod types; | ||||
| @ -11,8 +13,6 @@ pub mod utils; | ||||
| #[cfg(feature = "olap")] | ||||
| pub mod webhook_events; | ||||
|  | ||||
| #[cfg(feature = "v2")] | ||||
| pub(crate) use self::incoming_v2::incoming_webhooks_wrapper; | ||||
| #[cfg(feature = "v1")] | ||||
| pub(crate) use self::{ | ||||
|     incoming::incoming_webhooks_wrapper, | ||||
| @ -21,5 +21,9 @@ pub(crate) use self::{ | ||||
|         trigger_webhook_and_raise_event, | ||||
|     }, | ||||
| }; | ||||
| #[cfg(feature = "v2")] | ||||
| pub(crate) use self::{ | ||||
|     incoming_v2::incoming_webhooks_wrapper, outgoing_v2::create_event_and_trigger_outgoing_webhook, | ||||
| }; | ||||
|  | ||||
| const MERCHANT_ID: &str = "merchant_id"; | ||||
|  | ||||
| @ -26,7 +26,9 @@ use crate::{ | ||||
|             self, | ||||
|             transformers::{GenerateResponse, ToResponse}, | ||||
|         }, | ||||
|         webhooks::utils::construct_webhook_router_data, | ||||
|         webhooks::{ | ||||
|             create_event_and_trigger_outgoing_webhook, utils::construct_webhook_router_data, | ||||
|         }, | ||||
|     }, | ||||
|     db::StorageInterface, | ||||
|     events::api_logs::ApiEvent, | ||||
| @ -531,22 +533,21 @@ async fn payments_incoming_webhook_flow( | ||||
|             let event_type: Option<enums::EventType> = payments_response.status.foreign_into(); | ||||
|  | ||||
|             // If event is NOT an UnsupportedEvent, trigger Outgoing Webhook | ||||
|             if let Some(_outgoing_event_type) = event_type { | ||||
|                 let _primary_object_created_at = payments_response.created; | ||||
|             if let Some(outgoing_event_type) = event_type { | ||||
|                 let primary_object_created_at = payments_response.created; | ||||
|                 // TODO: trigger an outgoing webhook to merchant | ||||
|                 // Box::pin(super::create_event_and_trigger_outgoing_webhook( | ||||
|                 //     state, | ||||
|                 //     merchant_account, | ||||
|                 //     profile, | ||||
|                 //     &key_store, | ||||
|                 //     outgoing_event_type, | ||||
|                 //     enums::EventClass::Payments, | ||||
|                 //     payment_id.get_string_repr().to_owned(), | ||||
|                 //     enums::EventObjectType::PaymentDetails, | ||||
|                 //     api::OutgoingWebhookContent::PaymentDetails(Box::new(payments_response)), | ||||
|                 //     Some(primary_object_created_at), | ||||
|                 // )) | ||||
|                 // .await?; | ||||
|                 Box::pin(create_event_and_trigger_outgoing_webhook( | ||||
|                     state, | ||||
|                     profile, | ||||
|                     merchant_context.get_merchant_key_store(), | ||||
|                     outgoing_event_type, | ||||
|                     enums::EventClass::Payments, | ||||
|                     payment_id.get_string_repr().to_owned(), | ||||
|                     enums::EventObjectType::PaymentDetails, | ||||
|                     api::OutgoingWebhookContent::PaymentDetails(Box::new(payments_response)), | ||||
|                     primary_object_created_at, | ||||
|                 )) | ||||
|                 .await?; | ||||
|             }; | ||||
|  | ||||
|             let response = WebhookResponseTracker::Payment { payment_id, status }; | ||||
|  | ||||
							
								
								
									
										908
									
								
								crates/router/src/core/webhooks/outgoing_v2.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										908
									
								
								crates/router/src/core/webhooks/outgoing_v2.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,908 @@ | ||||
| use std::collections::HashMap; | ||||
|  | ||||
| use api_models::{webhook_events, webhooks}; | ||||
| use common_utils::{ext_traits, request, type_name, types::keymanager}; | ||||
| use diesel_models::process_tracker::business_status; | ||||
| use error_stack::{report, Report, ResultExt}; | ||||
| use hyperswitch_domain_models::type_encryption::{crypto_operation, CryptoOperation}; | ||||
| use hyperswitch_interfaces::consts; | ||||
| use masking; | ||||
| use router_env::{ | ||||
|     instrument, | ||||
|     tracing::{self, Instrument}, | ||||
| }; | ||||
|  | ||||
| use super::{ | ||||
|     types, | ||||
|     utils::{self, increment_webhook_outgoing_received_count}, | ||||
|     MERCHANT_ID, | ||||
| }; | ||||
| use crate::{ | ||||
|     core::{ | ||||
|         errors::{self, CustomResult}, | ||||
|         metrics, | ||||
|     }, | ||||
|     events::outgoing_webhook_logs, | ||||
|     logger, | ||||
|     routes::{app::SessionStateInfo, SessionState}, | ||||
|     services, | ||||
|     types::{ | ||||
|         api, domain, | ||||
|         storage::{self, enums}, | ||||
|         transformers::ForeignFrom, | ||||
|     }, | ||||
| }; | ||||
|  | ||||
| #[allow(clippy::too_many_arguments)] | ||||
| #[instrument(skip_all)] | ||||
| pub(crate) async fn create_event_and_trigger_outgoing_webhook( | ||||
|     state: SessionState, | ||||
|     business_profile: domain::Profile, | ||||
|     merchant_key_store: &domain::MerchantKeyStore, | ||||
|     event_type: enums::EventType, | ||||
|     event_class: enums::EventClass, | ||||
|     primary_object_id: String, | ||||
|     primary_object_type: enums::EventObjectType, | ||||
|     content: api::OutgoingWebhookContent, | ||||
|     primary_object_created_at: time::PrimitiveDateTime, | ||||
| ) -> CustomResult<(), errors::ApiErrorResponse> { | ||||
|     let delivery_attempt = enums::WebhookDeliveryAttempt::InitialAttempt; | ||||
|     let idempotent_event_id = | ||||
|         utils::get_idempotent_event_id(&primary_object_id, event_type, delivery_attempt); | ||||
|     let webhook_url_result = business_profile | ||||
|         .get_webhook_url_from_profile() | ||||
|         .change_context(errors::WebhooksFlowError::MerchantWebhookUrlNotConfigured); | ||||
|  | ||||
|     if utils::is_outgoing_webhook_disabled( | ||||
|         &state, | ||||
|         &webhook_url_result, | ||||
|         &business_profile, | ||||
|         &idempotent_event_id, | ||||
|     ) { | ||||
|         return Ok(()); | ||||
|     } | ||||
|  | ||||
|     let event_id = utils::generate_event_id(); | ||||
|     let merchant_id = business_profile.merchant_id.clone(); | ||||
|     let now = common_utils::date_time::now(); | ||||
|  | ||||
|     let outgoing_webhook = api::OutgoingWebhook { | ||||
|         merchant_id: merchant_id.clone(), | ||||
|         event_id: event_id.clone(), | ||||
|         event_type, | ||||
|         content: content.clone(), | ||||
|         timestamp: now, | ||||
|     }; | ||||
|  | ||||
|     let request_content = get_outgoing_webhook_request(outgoing_webhook, &business_profile) | ||||
|         .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) | ||||
|         .attach_printable("Failed to construct outgoing webhook request content")?; | ||||
|  | ||||
|     let event_metadata = storage::EventMetadata::foreign_from(&content); | ||||
|     let key_manager_state = &(&state).into(); | ||||
|     let new_event = domain::Event { | ||||
|         event_id: event_id.clone(), | ||||
|         event_type, | ||||
|         event_class, | ||||
|         is_webhook_notified: false, | ||||
|         primary_object_id, | ||||
|         primary_object_type, | ||||
|         created_at: now, | ||||
|         merchant_id: Some(business_profile.merchant_id.clone()), | ||||
|         business_profile_id: Some(business_profile.get_id().to_owned()), | ||||
|         primary_object_created_at: Some(primary_object_created_at), | ||||
|         idempotent_event_id: Some(idempotent_event_id.clone()), | ||||
|         initial_attempt_id: Some(event_id.clone()), | ||||
|         request: Some( | ||||
|             crypto_operation( | ||||
|                 key_manager_state, | ||||
|                 type_name!(domain::Event), | ||||
|                 CryptoOperation::Encrypt( | ||||
|                     ext_traits::Encode::encode_to_string_of_json(&request_content) | ||||
|                         .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) | ||||
|                         .attach_printable("Failed to encode outgoing webhook request content") | ||||
|                         .map(masking::Secret::new)?, | ||||
|                 ), | ||||
|                 keymanager::Identifier::Merchant(merchant_key_store.merchant_id.clone()), | ||||
|                 masking::PeekInterface::peek(merchant_key_store.key.get_inner()), | ||||
|             ) | ||||
|             .await | ||||
|             .and_then(|val| val.try_into_operation()) | ||||
|             .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) | ||||
|             .attach_printable("Failed to encrypt outgoing webhook request content")?, | ||||
|         ), | ||||
|         response: None, | ||||
|         delivery_attempt: Some(delivery_attempt), | ||||
|         metadata: Some(event_metadata), | ||||
|         is_overall_delivery_successful: Some(false), | ||||
|     }; | ||||
|  | ||||
|     let event_insert_result = state | ||||
|         .store | ||||
|         .insert_event(key_manager_state, new_event, merchant_key_store) | ||||
|         .await; | ||||
|  | ||||
|     let event = match event_insert_result { | ||||
|         Ok(event) => Ok(event), | ||||
|         Err(error) => { | ||||
|             if error.current_context().is_db_unique_violation() { | ||||
|                 // If the event_id already exists in the database, it indicates that the event for the resource has already been sent, so we skip the flow | ||||
|                 logger::debug!("Event with idempotent ID `{idempotent_event_id}` already exists in the database"); | ||||
|                 return Ok(()); | ||||
|             } else { | ||||
|                 logger::error!(event_insertion_failure=?error); | ||||
|                 Err(error | ||||
|                     .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) | ||||
|                     .attach_printable("Failed to insert event in events table")) | ||||
|             } | ||||
|         } | ||||
|     }?; | ||||
|  | ||||
|     let cloned_key_store = merchant_key_store.clone(); | ||||
|     // Using a tokio spawn here and not arbiter because not all caller of this function | ||||
|     // may have an actix arbiter | ||||
|     tokio::spawn( | ||||
|         async move { | ||||
|             Box::pin(trigger_webhook_and_raise_event( | ||||
|                 state, | ||||
|                 business_profile, | ||||
|                 &cloned_key_store, | ||||
|                 event, | ||||
|                 request_content, | ||||
|                 delivery_attempt, | ||||
|                 Some(content), | ||||
|             )) | ||||
|             .await; | ||||
|         } | ||||
|         .in_current_span(), | ||||
|     ); | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| #[allow(clippy::too_many_arguments)] | ||||
| #[instrument(skip_all)] | ||||
| pub(crate) async fn trigger_webhook_and_raise_event( | ||||
|     state: SessionState, | ||||
|     business_profile: domain::Profile, | ||||
|     merchant_key_store: &domain::MerchantKeyStore, | ||||
|     event: domain::Event, | ||||
|     request_content: webhook_events::OutgoingWebhookRequestContent, | ||||
|     delivery_attempt: enums::WebhookDeliveryAttempt, | ||||
|     content: Option<api::OutgoingWebhookContent>, | ||||
| ) { | ||||
|     logger::debug!( | ||||
|         event_id=%event.event_id, | ||||
|         idempotent_event_id=?event.idempotent_event_id, | ||||
|         initial_attempt_id=?event.initial_attempt_id, | ||||
|         "Attempting to send webhook" | ||||
|     ); | ||||
|  | ||||
|     let merchant_id = business_profile.merchant_id.clone(); | ||||
|     let trigger_webhook_result = trigger_webhook_to_merchant( | ||||
|         state.clone(), | ||||
|         business_profile, | ||||
|         merchant_key_store, | ||||
|         event.clone(), | ||||
|         request_content, | ||||
|         delivery_attempt, | ||||
|     ) | ||||
|     .await; | ||||
|  | ||||
|     let _ = | ||||
|         raise_webhooks_analytics_event(state, trigger_webhook_result, content, merchant_id, event) | ||||
|             .await; | ||||
| } | ||||
|  | ||||
| async fn trigger_webhook_to_merchant( | ||||
|     state: SessionState, | ||||
|     business_profile: domain::Profile, | ||||
|     merchant_key_store: &domain::MerchantKeyStore, | ||||
|     event: domain::Event, | ||||
|     request_content: webhook_events::OutgoingWebhookRequestContent, | ||||
|     delivery_attempt: enums::WebhookDeliveryAttempt, | ||||
| ) -> CustomResult< | ||||
|     (domain::Event, Option<Report<errors::WebhooksFlowError>>), | ||||
|     errors::WebhooksFlowError, | ||||
| > { | ||||
|     let webhook_url = business_profile | ||||
|         .get_webhook_url_from_profile() | ||||
|         .change_context(errors::WebhooksFlowError::MerchantWebhookUrlNotConfigured)?; | ||||
|  | ||||
|     let response = build_and_send_request(&state, request_content, webhook_url).await; | ||||
|  | ||||
|     metrics::WEBHOOK_OUTGOING_COUNT.add( | ||||
|         1, | ||||
|         router_env::metric_attributes!((MERCHANT_ID, business_profile.merchant_id.clone())), | ||||
|     ); | ||||
|     logger::debug!(outgoing_webhook_response=?response); | ||||
|  | ||||
|     match response { | ||||
|         Ok(response) => { | ||||
|             delivery_attempt | ||||
|                 .handle_success_response( | ||||
|                     state, | ||||
|                     merchant_key_store.clone(), | ||||
|                     &business_profile.merchant_id, | ||||
|                     &event.event_id, | ||||
|                     None, | ||||
|                     response, | ||||
|                 ) | ||||
|                 .await | ||||
|         } | ||||
|         Err(client_error) => { | ||||
|             delivery_attempt | ||||
|                 .handle_error_response( | ||||
|                     state, | ||||
|                     merchant_key_store.clone(), | ||||
|                     &business_profile.merchant_id, | ||||
|                     &event.event_id, | ||||
|                     client_error, | ||||
|                 ) | ||||
|                 .await | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| async fn raise_webhooks_analytics_event( | ||||
|     state: SessionState, | ||||
|     trigger_webhook_result: CustomResult< | ||||
|         (domain::Event, Option<Report<errors::WebhooksFlowError>>), | ||||
|         errors::WebhooksFlowError, | ||||
|     >, | ||||
|     content: Option<api::OutgoingWebhookContent>, | ||||
|     merchant_id: common_utils::id_type::MerchantId, | ||||
|     fallback_event: domain::Event, | ||||
| ) { | ||||
|     let (updated_event, optional_error) = match trigger_webhook_result { | ||||
|         Ok((updated_event, error)) => (updated_event, error), | ||||
|         Err(error) => (fallback_event, Some(error)), | ||||
|     }; | ||||
|     let error = optional_error.and_then(|error| { | ||||
|         logger::error!(?error, "Failed to send webhook to merchant"); | ||||
|  | ||||
|         serde_json::to_value(error.current_context()) | ||||
|             .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) | ||||
|             .inspect_err(|error| { | ||||
|                 logger::error!(?error, "Failed to serialize outgoing webhook error as JSON"); | ||||
|             }) | ||||
|             .ok() | ||||
|     }); | ||||
|  | ||||
|     let outgoing_webhook_event_content = content | ||||
|         .as_ref() | ||||
|         .and_then( | ||||
|             outgoing_webhook_logs::OutgoingWebhookEventMetric::get_outgoing_webhook_event_content, | ||||
|         ) | ||||
|         .or_else(|| { | ||||
|             updated_event | ||||
|                 .metadata | ||||
|                 .map(outgoing_webhook_logs::OutgoingWebhookEventContent::foreign_from) | ||||
|         }); | ||||
|  | ||||
|     // Get status_code from webhook response | ||||
|     let status_code = { | ||||
|         let webhook_response: Option<webhook_events::OutgoingWebhookResponseContent> = | ||||
|             updated_event.response.and_then(|res| { | ||||
|                 ext_traits::StringExt::parse_struct( | ||||
|                     masking::PeekInterface::peek(res.get_inner()), | ||||
|                     "OutgoingWebhookResponseContent", | ||||
|                 ) | ||||
|                 .map_err(|error| { | ||||
|                     logger::error!(?error, "Error deserializing webhook response"); | ||||
|                     error | ||||
|                 }) | ||||
|                 .ok() | ||||
|             }); | ||||
|         webhook_response.and_then(|res| res.status_code) | ||||
|     }; | ||||
|  | ||||
|     let webhook_event = outgoing_webhook_logs::OutgoingWebhookEvent::new( | ||||
|         state.tenant.tenant_id.clone(), | ||||
|         merchant_id, | ||||
|         updated_event.event_id, | ||||
|         updated_event.event_type, | ||||
|         outgoing_webhook_event_content, | ||||
|         error, | ||||
|         updated_event.initial_attempt_id, | ||||
|         status_code, | ||||
|         updated_event.delivery_attempt, | ||||
|     ); | ||||
|     state.event_handler().log_event(&webhook_event); | ||||
| } | ||||
|  | ||||
| pub(crate) fn get_outgoing_webhook_request( | ||||
|     outgoing_webhook: api::OutgoingWebhook, | ||||
|     business_profile: &domain::Profile, | ||||
| ) -> CustomResult<webhook_events::OutgoingWebhookRequestContent, errors::WebhooksFlowError> { | ||||
|     #[inline] | ||||
|     fn get_outgoing_webhook_request_inner<WebhookType: types::OutgoingWebhookType>( | ||||
|         outgoing_webhook: api::OutgoingWebhook, | ||||
|         business_profile: &domain::Profile, | ||||
|     ) -> CustomResult<webhook_events::OutgoingWebhookRequestContent, errors::WebhooksFlowError> | ||||
|     { | ||||
|         let mut headers = vec![ | ||||
|             ( | ||||
|                 reqwest::header::CONTENT_TYPE.to_string(), | ||||
|                 mime::APPLICATION_JSON.essence_str().into(), | ||||
|             ), | ||||
|             ( | ||||
|                 reqwest::header::USER_AGENT.to_string(), | ||||
|                 consts::USER_AGENT.to_string().into(), | ||||
|             ), | ||||
|         ]; | ||||
|  | ||||
|         let transformed_outgoing_webhook = WebhookType::from(outgoing_webhook); | ||||
|         let payment_response_hash_key = business_profile.payment_response_hash_key.clone(); | ||||
|         let custom_headers = business_profile | ||||
|             .outgoing_webhook_custom_http_headers | ||||
|             .clone() | ||||
|             .map(|headers| { | ||||
|                 ext_traits::ValueExt::parse_value::<HashMap<String, String>>( | ||||
|                     masking::ExposeInterface::expose(headers.into_inner()), | ||||
|                     "HashMap<String,String>", | ||||
|                 ) | ||||
|                 .change_context(errors::WebhooksFlowError::OutgoingWebhookEncodingFailed) | ||||
|                 .attach_printable("Failed to deserialize outgoing webhook custom HTTP headers") | ||||
|             }) | ||||
|             .transpose()?; | ||||
|         if let Some(ref map) = custom_headers { | ||||
|             headers.extend( | ||||
|                 map.iter() | ||||
|                     .map(|(key, value)| (key.clone(), masking::Mask::into_masked(value.clone()))), | ||||
|             ); | ||||
|         }; | ||||
|         let outgoing_webhooks_signature = transformed_outgoing_webhook | ||||
|             .get_outgoing_webhooks_signature(payment_response_hash_key)?; | ||||
|  | ||||
|         if let Some(signature) = outgoing_webhooks_signature.signature { | ||||
|             WebhookType::add_webhook_header(&mut headers, signature) | ||||
|         } | ||||
|  | ||||
|         Ok(webhook_events::OutgoingWebhookRequestContent { | ||||
|             body: outgoing_webhooks_signature.payload, | ||||
|             headers: headers | ||||
|                 .into_iter() | ||||
|                 .map(|(name, value)| (name, masking::Secret::new(value.into_inner()))) | ||||
|                 .collect(), | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     get_outgoing_webhook_request_inner::<webhooks::OutgoingWebhook>( | ||||
|         outgoing_webhook, | ||||
|         business_profile, | ||||
|     ) | ||||
| } | ||||
|  | ||||
| async fn build_and_send_request( | ||||
|     state: &SessionState, | ||||
|     request_content: webhook_events::OutgoingWebhookRequestContent, | ||||
|     webhook_url: String, | ||||
| ) -> Result<reqwest::Response, Report<common_enums::ApiClientError>> { | ||||
|     let headers = request_content | ||||
|         .headers | ||||
|         .into_iter() | ||||
|         .map(|(name, value)| (name, masking::Mask::into_masked(value))) | ||||
|         .collect(); | ||||
|     let request = services::RequestBuilder::new() | ||||
|         .method(services::Method::Post) | ||||
|         .url(&webhook_url) | ||||
|         .attach_default_headers() | ||||
|         .headers(headers) | ||||
|         .set_body(request::RequestContent::RawBytes( | ||||
|             masking::ExposeInterface::expose(request_content.body).into_bytes(), | ||||
|         )) | ||||
|         .build(); | ||||
|  | ||||
|     state | ||||
|         .api_client | ||||
|         .send_request( | ||||
|             state, | ||||
|             request, | ||||
|             Some(types::OUTGOING_WEBHOOK_TIMEOUT_SECS), | ||||
|             false, | ||||
|         ) | ||||
|         .await | ||||
| } | ||||
|  | ||||
| async fn api_client_error_handler( | ||||
|     state: SessionState, | ||||
|     merchant_key_store: domain::MerchantKeyStore, | ||||
|     merchant_id: &common_utils::id_type::MerchantId, | ||||
|     event_id: &str, | ||||
|     client_error: Report<errors::ApiClientError>, | ||||
|     delivery_attempt: enums::WebhookDeliveryAttempt, | ||||
|     _schedule_webhook_retry: types::ScheduleWebhookRetry, | ||||
| ) -> CustomResult< | ||||
|     (domain::Event, Option<Report<errors::WebhooksFlowError>>), | ||||
|     errors::WebhooksFlowError, | ||||
| > { | ||||
|     // Not including detailed error message in response information since it contains too | ||||
|     // much of diagnostic information to be exposed to the merchant. | ||||
|     let is_webhook_notified = false; | ||||
|     let response_to_store = webhook_events::OutgoingWebhookResponseContent { | ||||
|         body: None, | ||||
|         headers: None, | ||||
|         status_code: None, | ||||
|         error_message: Some("Unable to send request to merchant server".to_string()), | ||||
|     }; | ||||
|     let updated_event = update_event_in_storage( | ||||
|         state, | ||||
|         is_webhook_notified, | ||||
|         response_to_store, | ||||
|         merchant_key_store, | ||||
|         merchant_id, | ||||
|         event_id, | ||||
|     ) | ||||
|     .await?; | ||||
|  | ||||
|     let error = client_error.change_context(errors::WebhooksFlowError::CallToMerchantFailed); | ||||
|     logger::error!( | ||||
|         ?error, | ||||
|         ?delivery_attempt, | ||||
|         "An error occurred when sending webhook to merchant" | ||||
|     ); | ||||
|  | ||||
|     //TODO: add outgoing webhook retries support | ||||
|     // if let ScheduleWebhookRetry::WithProcessTracker(process_tracker) = schedule_webhook_retry { | ||||
|     //     // Schedule a retry attempt for webhook delivery | ||||
|     //     outgoing_webhook_retry::retry_webhook_delivery_task( | ||||
|     //         &*state.store, | ||||
|     //         merchant_id, | ||||
|     //         *process_tracker, | ||||
|     //     ) | ||||
|     //     .await | ||||
|     //     .change_context(errors::WebhooksFlowError::OutgoingWebhookRetrySchedulingFailed)?; | ||||
|     // } | ||||
|  | ||||
|     Ok((updated_event, Some(error))) | ||||
| } | ||||
|  | ||||
| async fn update_event_in_storage( | ||||
|     state: SessionState, | ||||
|     is_webhook_notified: bool, | ||||
|     outgoing_webhook_response: webhook_events::OutgoingWebhookResponseContent, | ||||
|     merchant_key_store: domain::MerchantKeyStore, | ||||
|     merchant_id: &common_utils::id_type::MerchantId, | ||||
|     event_id: &str, | ||||
| ) -> CustomResult<domain::Event, errors::WebhooksFlowError> { | ||||
|     let key_manager_state = &(&state).into(); | ||||
|     let event_update = domain::EventUpdate::UpdateResponse { | ||||
|         is_webhook_notified, | ||||
|         response: Some( | ||||
|             crypto_operation( | ||||
|                 key_manager_state, | ||||
|                 type_name!(domain::Event), | ||||
|                 CryptoOperation::Encrypt( | ||||
|                     ext_traits::Encode::encode_to_string_of_json(&outgoing_webhook_response) | ||||
|                         .change_context( | ||||
|                             errors::WebhooksFlowError::OutgoingWebhookResponseEncodingFailed, | ||||
|                         ) | ||||
|                         .map(masking::Secret::new)?, | ||||
|                 ), | ||||
|                 keymanager::Identifier::Merchant(merchant_key_store.merchant_id.clone()), | ||||
|                 masking::PeekInterface::peek(merchant_key_store.key.get_inner()), | ||||
|             ) | ||||
|             .await | ||||
|             .and_then(|val| val.try_into_operation()) | ||||
|             .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) | ||||
|             .attach_printable("Failed to encrypt outgoing webhook response content")?, | ||||
|         ), | ||||
|     }; | ||||
|     state | ||||
|         .store | ||||
|         .update_event_by_merchant_id_event_id( | ||||
|             key_manager_state, | ||||
|             merchant_id, | ||||
|             event_id, | ||||
|             event_update, | ||||
|             &merchant_key_store, | ||||
|         ) | ||||
|         .await | ||||
|         .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) | ||||
| } | ||||
|  | ||||
| async fn update_overall_delivery_status_in_storage( | ||||
|     state: SessionState, | ||||
|     merchant_key_store: domain::MerchantKeyStore, | ||||
|     merchant_id: &common_utils::id_type::MerchantId, | ||||
|     updated_event: &domain::Event, | ||||
| ) -> CustomResult<(), errors::WebhooksFlowError> { | ||||
|     let key_manager_state = &(&state).into(); | ||||
|  | ||||
|     let update_overall_delivery_status = domain::EventUpdate::OverallDeliveryStatusUpdate { | ||||
|         is_overall_delivery_successful: true, | ||||
|     }; | ||||
|  | ||||
|     let initial_attempt_id = updated_event.initial_attempt_id.as_ref(); | ||||
|     let delivery_attempt = updated_event.delivery_attempt; | ||||
|  | ||||
|     if let Some(( | ||||
|         initial_attempt_id, | ||||
|         enums::WebhookDeliveryAttempt::InitialAttempt | ||||
|         | enums::WebhookDeliveryAttempt::AutomaticRetry, | ||||
|     )) = initial_attempt_id.zip(delivery_attempt) | ||||
|     { | ||||
|         state | ||||
|             .store | ||||
|             .update_event_by_merchant_id_event_id( | ||||
|                 key_manager_state, | ||||
|                 merchant_id, | ||||
|                 initial_attempt_id.as_str(), | ||||
|                 update_overall_delivery_status, | ||||
|                 &merchant_key_store, | ||||
|             ) | ||||
|             .await | ||||
|             .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) | ||||
|             .attach_printable("Failed to update initial delivery attempt")?; | ||||
|     } | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| async fn handle_successful_delivery( | ||||
|     state: SessionState, | ||||
|     merchant_key_store: domain::MerchantKeyStore, | ||||
|     updated_event: &domain::Event, | ||||
|     merchant_id: &common_utils::id_type::MerchantId, | ||||
|     process_tracker: Option<storage::ProcessTracker>, | ||||
|     business_status: &'static str, | ||||
| ) -> CustomResult<(), errors::WebhooksFlowError> { | ||||
|     update_overall_delivery_status_in_storage( | ||||
|         state.clone(), | ||||
|         merchant_key_store.clone(), | ||||
|         merchant_id, | ||||
|         updated_event, | ||||
|     ) | ||||
|     .await?; | ||||
|  | ||||
|     increment_webhook_outgoing_received_count(merchant_id); | ||||
|  | ||||
|     match process_tracker { | ||||
|         Some(process_tracker) => state | ||||
|             .store | ||||
|             .as_scheduler() | ||||
|             .finish_process_with_business_status(process_tracker, business_status) | ||||
|             .await | ||||
|             .change_context( | ||||
|                 errors::WebhooksFlowError::OutgoingWebhookProcessTrackerTaskUpdateFailed, | ||||
|             ), | ||||
|         None => Ok(()), | ||||
|     } | ||||
| } | ||||
|  | ||||
| async fn handle_failed_delivery( | ||||
|     _state: SessionState, | ||||
|     merchant_id: &common_utils::id_type::MerchantId, | ||||
|     delivery_attempt: enums::WebhookDeliveryAttempt, | ||||
|     status_code: u16, | ||||
|     log_message: &'static str, | ||||
|     _schedule_webhook_retry: types::ScheduleWebhookRetry, | ||||
| ) -> CustomResult<(), errors::WebhooksFlowError> { | ||||
|     utils::increment_webhook_outgoing_not_received_count(merchant_id); | ||||
|  | ||||
|     let error = report!(errors::WebhooksFlowError::NotReceivedByMerchant); | ||||
|     logger::warn!(?error, ?delivery_attempt, status_code, %log_message); | ||||
|  | ||||
|     //TODO: add outgoing webhook retries support | ||||
|     // if let ScheduleWebhookRetry::WithProcessTracker(process_tracker) = schedule_webhook_retry { | ||||
|     //     // Schedule a retry attempt for webhook delivery | ||||
|     //     outgoing_webhook_retry::retry_webhook_delivery_task( | ||||
|     //         &*state.store, | ||||
|     //         merchant_id, | ||||
|     //         *process_tracker, | ||||
|     //     ) | ||||
|     //     .await | ||||
|     //     .change_context(errors::WebhooksFlowError::OutgoingWebhookRetrySchedulingFailed)?; | ||||
|     // } | ||||
|  | ||||
|     Err(error) | ||||
| } | ||||
|  | ||||
| impl ForeignFrom<&api::OutgoingWebhookContent> for storage::EventMetadata { | ||||
|     fn foreign_from(content: &api::OutgoingWebhookContent) -> Self { | ||||
|         match content { | ||||
|             webhooks::OutgoingWebhookContent::PaymentDetails(payments_response) => Self::Payment { | ||||
|                 payment_id: payments_response.id.clone(), | ||||
|             }, | ||||
|             webhooks::OutgoingWebhookContent::RefundDetails(refund_response) => Self::Refund { | ||||
|                 payment_id: refund_response.payment_id.clone(), | ||||
|                 refund_id: refund_response.id.clone(), | ||||
|             }, | ||||
|             webhooks::OutgoingWebhookContent::DisputeDetails(dispute_response) => { | ||||
|                 //TODO: add support for dispute outgoing webhook | ||||
|                 todo!() | ||||
|             } | ||||
|             webhooks::OutgoingWebhookContent::MandateDetails(mandate_response) => Self::Mandate { | ||||
|                 payment_method_id: mandate_response.payment_method_id.clone(), | ||||
|                 mandate_id: mandate_response.mandate_id.clone(), | ||||
|             }, | ||||
|             #[cfg(feature = "payouts")] | ||||
|             webhooks::OutgoingWebhookContent::PayoutDetails(payout_response) => Self::Payout { | ||||
|                 payout_id: payout_response.payout_id.clone(), | ||||
|             }, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl ForeignFrom<storage::EventMetadata> for outgoing_webhook_logs::OutgoingWebhookEventContent { | ||||
|     fn foreign_from(event_metadata: storage::EventMetadata) -> Self { | ||||
|         match event_metadata { | ||||
|             diesel_models::EventMetadata::Payment { payment_id } => Self::Payment { | ||||
|                 payment_id, | ||||
|                 content: serde_json::Value::Null, | ||||
|             }, | ||||
|             diesel_models::EventMetadata::Payout { payout_id } => Self::Payout { | ||||
|                 payout_id, | ||||
|                 content: serde_json::Value::Null, | ||||
|             }, | ||||
|             diesel_models::EventMetadata::Refund { | ||||
|                 payment_id, | ||||
|                 refund_id, | ||||
|             } => Self::Refund { | ||||
|                 payment_id, | ||||
|                 refund_id, | ||||
|                 content: serde_json::Value::Null, | ||||
|             }, | ||||
|             diesel_models::EventMetadata::Dispute { | ||||
|                 payment_id, | ||||
|                 attempt_id, | ||||
|                 dispute_id, | ||||
|             } => Self::Dispute { | ||||
|                 payment_id, | ||||
|                 attempt_id, | ||||
|                 dispute_id, | ||||
|                 content: serde_json::Value::Null, | ||||
|             }, | ||||
|             diesel_models::EventMetadata::Mandate { | ||||
|                 payment_method_id, | ||||
|                 mandate_id, | ||||
|             } => Self::Mandate { | ||||
|                 payment_method_id, | ||||
|                 mandate_id, | ||||
|                 content: serde_json::Value::Null, | ||||
|             }, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| trait OutgoingWebhookResponseHandler { | ||||
|     async fn handle_error_response( | ||||
|         &self, | ||||
|         state: SessionState, | ||||
|         merchant_key_store: domain::MerchantKeyStore, | ||||
|         merchant_id: &common_utils::id_type::MerchantId, | ||||
|         event_id: &str, | ||||
|         client_error: Report<errors::ApiClientError>, | ||||
|     ) -> CustomResult< | ||||
|         (domain::Event, Option<Report<errors::WebhooksFlowError>>), | ||||
|         errors::WebhooksFlowError, | ||||
|     >; | ||||
|  | ||||
|     async fn handle_success_response( | ||||
|         &self, | ||||
|         state: SessionState, | ||||
|         merchant_key_store: domain::MerchantKeyStore, | ||||
|         merchant_id: &common_utils::id_type::MerchantId, | ||||
|         event_id: &str, | ||||
|         process_tracker: Option<storage::ProcessTracker>, | ||||
|         response: reqwest::Response, | ||||
|     ) -> CustomResult< | ||||
|         (domain::Event, Option<Report<errors::WebhooksFlowError>>), | ||||
|         errors::WebhooksFlowError, | ||||
|     >; | ||||
| } | ||||
|  | ||||
| impl OutgoingWebhookResponseHandler for enums::WebhookDeliveryAttempt { | ||||
|     async fn handle_error_response( | ||||
|         &self, | ||||
|         state: SessionState, | ||||
|         merchant_key_store: domain::MerchantKeyStore, | ||||
|         merchant_id: &common_utils::id_type::MerchantId, | ||||
|         event_id: &str, | ||||
|         client_error: Report<errors::ApiClientError>, | ||||
|     ) -> CustomResult< | ||||
|         (domain::Event, Option<Report<errors::WebhooksFlowError>>), | ||||
|         errors::WebhooksFlowError, | ||||
|     > { | ||||
|         let schedule_webhook_retry = match self { | ||||
|             Self::InitialAttempt | Self::ManualRetry => types::ScheduleWebhookRetry::NoSchedule, | ||||
|             Self::AutomaticRetry => { | ||||
|                 // ScheduleWebhookRetry::WithProcessTracker(Box::new(process_tracker)) | ||||
|                 todo!() | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         api_client_error_handler( | ||||
|             state, | ||||
|             merchant_key_store, | ||||
|             merchant_id, | ||||
|             event_id, | ||||
|             client_error, | ||||
|             *self, | ||||
|             schedule_webhook_retry, | ||||
|         ) | ||||
|         .await | ||||
|     } | ||||
|  | ||||
|     async fn handle_success_response( | ||||
|         &self, | ||||
|         state: SessionState, | ||||
|         merchant_key_store: domain::MerchantKeyStore, | ||||
|         merchant_id: &common_utils::id_type::MerchantId, | ||||
|         event_id: &str, | ||||
|         process_tracker: Option<storage::ProcessTracker>, | ||||
|         response: reqwest::Response, | ||||
|     ) -> CustomResult< | ||||
|         (domain::Event, Option<Report<errors::WebhooksFlowError>>), | ||||
|         errors::WebhooksFlowError, | ||||
|     > { | ||||
|         let status_code = response.status(); | ||||
|         let is_webhook_notified = status_code.is_success(); | ||||
|         let response_struct = types::WebhookResponse { response }; | ||||
|         let outgoing_webhook_response = response_struct | ||||
|             .get_outgoing_webhook_response_content() | ||||
|             .await; | ||||
|         let updated_event = update_event_in_storage( | ||||
|             state.clone(), | ||||
|             is_webhook_notified, | ||||
|             outgoing_webhook_response, | ||||
|             merchant_key_store.clone(), | ||||
|             merchant_id, | ||||
|             event_id, | ||||
|         ) | ||||
|         .await?; | ||||
|  | ||||
|         let webhook_action_handler = get_action_handler(*self); | ||||
|         let result = if is_webhook_notified { | ||||
|             webhook_action_handler | ||||
|                 .notified_action( | ||||
|                     state, | ||||
|                     merchant_key_store, | ||||
|                     &updated_event, | ||||
|                     merchant_id, | ||||
|                     process_tracker, | ||||
|                 ) | ||||
|                 .await | ||||
|         } else { | ||||
|             webhook_action_handler | ||||
|                 .not_notified_action(state, merchant_id, status_code.as_u16()) | ||||
|                 .await | ||||
|         }; | ||||
|  | ||||
|         Ok((updated_event, result)) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[async_trait::async_trait] | ||||
| trait WebhookNotificationHandler: Send + Sync { | ||||
|     async fn notified_action( | ||||
|         &self, | ||||
|         state: SessionState, | ||||
|         merchant_key_store: domain::MerchantKeyStore, | ||||
|         updated_event: &domain::Event, | ||||
|         merchant_id: &common_utils::id_type::MerchantId, | ||||
|         process_tracker: Option<storage::ProcessTracker>, | ||||
|     ) -> Option<Report<errors::WebhooksFlowError>>; | ||||
|  | ||||
|     async fn not_notified_action( | ||||
|         &self, | ||||
|         state: SessionState, | ||||
|         merchant_id: &common_utils::id_type::MerchantId, | ||||
|         status_code: u16, | ||||
|     ) -> Option<Report<errors::WebhooksFlowError>>; | ||||
| } | ||||
|  | ||||
| struct InitialAttempt; | ||||
| struct AutomaticRetry; | ||||
| struct ManualRetry; | ||||
|  | ||||
| #[async_trait::async_trait] | ||||
| impl WebhookNotificationHandler for InitialAttempt { | ||||
|     async fn notified_action( | ||||
|         &self, | ||||
|         state: SessionState, | ||||
|         merchant_key_store: domain::MerchantKeyStore, | ||||
|         updated_event: &domain::Event, | ||||
|         merchant_id: &common_utils::id_type::MerchantId, | ||||
|         process_tracker: Option<storage::ProcessTracker>, | ||||
|     ) -> Option<Report<errors::WebhooksFlowError>> { | ||||
|         handle_successful_delivery( | ||||
|             state, | ||||
|             merchant_key_store, | ||||
|             updated_event, | ||||
|             merchant_id, | ||||
|             process_tracker, | ||||
|             business_status::INITIAL_DELIVERY_ATTEMPT_SUCCESSFUL, | ||||
|         ) | ||||
|         .await | ||||
|         .err() | ||||
|         .map(|error: Report<errors::WebhooksFlowError>| report!(error)) | ||||
|     } | ||||
|  | ||||
|     async fn not_notified_action( | ||||
|         &self, | ||||
|         state: SessionState, | ||||
|         merchant_id: &common_utils::id_type::MerchantId, | ||||
|         status_code: u16, | ||||
|     ) -> Option<Report<errors::WebhooksFlowError>> { | ||||
|         handle_failed_delivery( | ||||
|             state.clone(), | ||||
|             merchant_id, | ||||
|             enums::WebhookDeliveryAttempt::InitialAttempt, | ||||
|             status_code, | ||||
|             "Ignoring error when sending webhook to merchant", | ||||
|             types::ScheduleWebhookRetry::NoSchedule, | ||||
|         ) | ||||
|         .await | ||||
|         .err() | ||||
|         .map(|error| report!(error)) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[async_trait::async_trait] | ||||
| impl WebhookNotificationHandler for AutomaticRetry { | ||||
|     async fn notified_action( | ||||
|         &self, | ||||
|         _state: SessionState, | ||||
|         _merchant_key_store: domain::MerchantKeyStore, | ||||
|         _updated_event: &domain::Event, | ||||
|         _merchant_id: &common_utils::id_type::MerchantId, | ||||
|         _process_tracker: Option<storage::ProcessTracker>, | ||||
|     ) -> Option<Report<errors::WebhooksFlowError>> { | ||||
|         todo!() | ||||
|     } | ||||
|  | ||||
|     async fn not_notified_action( | ||||
|         &self, | ||||
|         _state: SessionState, | ||||
|         _merchant_id: &common_utils::id_type::MerchantId, | ||||
|         _status_code: u16, | ||||
|     ) -> Option<Report<errors::WebhooksFlowError>> { | ||||
|         todo!() | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[async_trait::async_trait] | ||||
| impl WebhookNotificationHandler for ManualRetry { | ||||
|     async fn notified_action( | ||||
|         &self, | ||||
|         _state: SessionState, | ||||
|         _merchant_key_store: domain::MerchantKeyStore, | ||||
|         _updated_event: &domain::Event, | ||||
|         merchant_id: &common_utils::id_type::MerchantId, | ||||
|         _process_tracker: Option<storage::ProcessTracker>, | ||||
|     ) -> Option<Report<errors::WebhooksFlowError>> { | ||||
|         increment_webhook_outgoing_received_count(merchant_id); | ||||
|         None | ||||
|     } | ||||
|  | ||||
|     async fn not_notified_action( | ||||
|         &self, | ||||
|         state: SessionState, | ||||
|         merchant_id: &common_utils::id_type::MerchantId, | ||||
|         status_code: u16, | ||||
|     ) -> Option<Report<errors::WebhooksFlowError>> { | ||||
|         handle_failed_delivery( | ||||
|             state.clone(), | ||||
|             merchant_id, | ||||
|             enums::WebhookDeliveryAttempt::ManualRetry, | ||||
|             status_code, | ||||
|             "Ignoring error when sending webhook to merchant", | ||||
|             types::ScheduleWebhookRetry::NoSchedule, | ||||
|         ) | ||||
|         .await | ||||
|         .err() | ||||
|         .map(|error| report!(error)) | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn get_action_handler( | ||||
|     attempt: enums::WebhookDeliveryAttempt, | ||||
| ) -> Box<dyn WebhookNotificationHandler> { | ||||
|     match attempt { | ||||
|         enums::WebhookDeliveryAttempt::InitialAttempt => Box::new(InitialAttempt), | ||||
|         enums::WebhookDeliveryAttempt::AutomaticRetry => Box::new(AutomaticRetry), | ||||
|         enums::WebhookDeliveryAttempt::ManualRetry => Box::new(ManualRetry), | ||||
|     } | ||||
| } | ||||
| @ -1,10 +1,23 @@ | ||||
| use api_models::webhooks; | ||||
| use api_models::{webhook_events, webhooks}; | ||||
| use common_utils::{crypto::SignMessage, ext_traits::Encode}; | ||||
| use error_stack::ResultExt; | ||||
| use masking::Secret; | ||||
| use serde::Serialize; | ||||
|  | ||||
| use crate::{core::errors, headers, services::request::Maskable, types::storage::enums}; | ||||
| use crate::{ | ||||
|     core::errors, | ||||
|     headers, logger, | ||||
|     services::request::Maskable, | ||||
|     types::storage::{self, enums}, | ||||
| }; | ||||
|  | ||||
| pub const OUTGOING_WEBHOOK_TIMEOUT_SECS: u64 = 5; | ||||
|  | ||||
| #[derive(Debug)] | ||||
| pub enum ScheduleWebhookRetry { | ||||
|     WithProcessTracker(Box<storage::ProcessTracker>), | ||||
|     NoSchedule, | ||||
| } | ||||
|  | ||||
| pub struct OutgoingWebhookPayloadWithSignature { | ||||
|     pub payload: Secret<String>, | ||||
| @ -66,3 +79,50 @@ pub(crate) struct OutgoingWebhookTrackingData { | ||||
|     pub(crate) primary_object_type: enums::EventObjectType, | ||||
|     pub(crate) initial_attempt_id: Option<String>, | ||||
| } | ||||
|  | ||||
| pub struct WebhookResponse { | ||||
|     pub response: reqwest::Response, | ||||
| } | ||||
|  | ||||
| impl WebhookResponse { | ||||
|     pub async fn get_outgoing_webhook_response_content( | ||||
|         self, | ||||
|     ) -> webhook_events::OutgoingWebhookResponseContent { | ||||
|         let status_code = self.response.status(); | ||||
|         let response_headers = self | ||||
|             .response | ||||
|             .headers() | ||||
|             .iter() | ||||
|             .map(|(name, value)| { | ||||
|                 ( | ||||
|                     name.as_str().to_owned(), | ||||
|                     value | ||||
|                         .to_str() | ||||
|                         .map(|s| Secret::from(String::from(s))) | ||||
|                         .unwrap_or_else(|error| { | ||||
|                             logger::warn!( | ||||
|                                 "Response header {} contains non-UTF-8 characters: {error:?}", | ||||
|                                 name.as_str() | ||||
|                             ); | ||||
|                             Secret::from(String::from("Non-UTF-8 header value")) | ||||
|                         }), | ||||
|                 ) | ||||
|             }) | ||||
|             .collect::<Vec<_>>(); | ||||
|         let response_body = self | ||||
|             .response | ||||
|             .text() | ||||
|             .await | ||||
|             .map(Secret::from) | ||||
|             .unwrap_or_else(|error| { | ||||
|                 logger::warn!("Response contains non-UTF-8 characters: {error:?}"); | ||||
|                 Secret::from(String::from("Non-UTF-8 response body")) | ||||
|             }); | ||||
|         webhook_events::OutgoingWebhookResponseContent { | ||||
|             body: Some(response_body), | ||||
|             headers: Some(response_headers), | ||||
|             status_code: Some(status_code.as_u16()), | ||||
|             error_message: None, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| @ -1,13 +1,15 @@ | ||||
| use std::marker::PhantomData; | ||||
|  | ||||
| use common_utils::{errors::CustomResult, ext_traits::ValueExt}; | ||||
| use error_stack::ResultExt; | ||||
| use error_stack::{Report, ResultExt}; | ||||
| use redis_interface as redis; | ||||
| use router_env::tracing; | ||||
|  | ||||
| use super::MERCHANT_ID; | ||||
| use crate::{ | ||||
|     core::{ | ||||
|         errors::{self}, | ||||
|         metrics, | ||||
|         payments::helpers, | ||||
|     }, | ||||
|     db::{get_and_deserialize_key, StorageInterface}, | ||||
| @ -159,6 +161,43 @@ pub(crate) fn generate_event_id() -> String { | ||||
|     common_utils::generate_time_ordered_id("evt") | ||||
| } | ||||
|  | ||||
| pub fn increment_webhook_outgoing_received_count(merchant_id: &common_utils::id_type::MerchantId) { | ||||
|     metrics::WEBHOOK_OUTGOING_RECEIVED_COUNT.add( | ||||
|         1, | ||||
|         router_env::metric_attributes!((MERCHANT_ID, merchant_id.clone())), | ||||
|     ) | ||||
| } | ||||
|  | ||||
| pub fn increment_webhook_outgoing_not_received_count( | ||||
|     merchant_id: &common_utils::id_type::MerchantId, | ||||
| ) { | ||||
|     metrics::WEBHOOK_OUTGOING_NOT_RECEIVED_COUNT.add( | ||||
|         1, | ||||
|         router_env::metric_attributes!((MERCHANT_ID, merchant_id.clone())), | ||||
|     ); | ||||
| } | ||||
|  | ||||
| pub fn is_outgoing_webhook_disabled( | ||||
|     state: &SessionState, | ||||
|     webhook_url_result: &Result<String, Report<errors::WebhooksFlowError>>, | ||||
|     business_profile: &domain::Profile, | ||||
|     idempotent_event_id: &str, | ||||
| ) -> bool { | ||||
|     if !state.conf.webhooks.outgoing_enabled | ||||
|         || webhook_url_result.is_err() | ||||
|         || webhook_url_result.as_ref().is_ok_and(String::is_empty) | ||||
|     { | ||||
|         logger::debug!( | ||||
|             business_profile_id=?business_profile.get_id(), | ||||
|             %idempotent_event_id, | ||||
|             "Outgoing webhooks are disabled in application configuration, or merchant webhook URL \ | ||||
|              could not be obtained; skipping outgoing webhooks for event" | ||||
|         ); | ||||
|         return true; | ||||
|     } | ||||
|     false | ||||
| } | ||||
|  | ||||
| const WEBHOOK_LOCK_PREFIX: &str = "WEBHOOK_LOCK"; | ||||
|  | ||||
| pub(super) async fn perform_redis_lock<A>( | ||||
|  | ||||
| @ -796,6 +796,7 @@ mod tests { | ||||
|  | ||||
|     #[allow(clippy::unwrap_used)] | ||||
|     #[tokio::test] | ||||
|     #[cfg(feature = "v1")] | ||||
|     async fn test_mockdb_event_interface() { | ||||
|         #[allow(clippy::expect_used)] | ||||
|         let mockdb = MockDb::new(&redis_interface::RedisSettings::default()) | ||||
| @ -909,4 +910,121 @@ mod tests { | ||||
|         assert_eq!(updated_event.primary_object_id, payment_id); | ||||
|         assert_eq!(updated_event.event_id, event_id); | ||||
|     } | ||||
|  | ||||
|     #[allow(clippy::unwrap_used)] | ||||
|     #[tokio::test] | ||||
|     #[cfg(feature = "v2")] | ||||
|     async fn test_mockdb_event_interface() { | ||||
|         #[allow(clippy::expect_used)] | ||||
|         let mockdb = MockDb::new(&redis_interface::RedisSettings::default()) | ||||
|             .await | ||||
|             .expect("Failed to create Mock store"); | ||||
|         let event_id = "test_event_id"; | ||||
|         let (tx, _) = tokio::sync::oneshot::channel(); | ||||
|         let app_state = Box::pin(routes::AppState::with_storage( | ||||
|             Settings::default(), | ||||
|             StorageImpl::PostgresqlTest, | ||||
|             tx, | ||||
|             Box::new(services::MockApiClient), | ||||
|         )) | ||||
|         .await; | ||||
|         let state = &Arc::new(app_state) | ||||
|             .get_session_state( | ||||
|                 &common_utils::id_type::TenantId::try_from_string("public".to_string()).unwrap(), | ||||
|                 None, | ||||
|                 || {}, | ||||
|             ) | ||||
|             .unwrap(); | ||||
|         let merchant_id = | ||||
|             common_utils::id_type::MerchantId::try_from(std::borrow::Cow::from("merchant_1")) | ||||
|                 .unwrap(); | ||||
|         let business_profile_id = | ||||
|             common_utils::id_type::ProfileId::try_from(std::borrow::Cow::from("profile1")).unwrap(); | ||||
|         let payment_id = "test_payment_id"; | ||||
|         let key_manager_state = &state.into(); | ||||
|         let master_key = mockdb.get_master_key(); | ||||
|         mockdb | ||||
|             .insert_merchant_key_store( | ||||
|                 key_manager_state, | ||||
|                 domain::MerchantKeyStore { | ||||
|                     merchant_id: merchant_id.clone(), | ||||
|                     key: domain::types::crypto_operation( | ||||
|                         key_manager_state, | ||||
|                         type_name!(domain::MerchantKeyStore), | ||||
|                         domain::types::CryptoOperation::Encrypt( | ||||
|                             services::generate_aes256_key().unwrap().to_vec().into(), | ||||
|                         ), | ||||
|                         Identifier::Merchant(merchant_id.to_owned()), | ||||
|                         master_key, | ||||
|                     ) | ||||
|                     .await | ||||
|                     .and_then(|val| val.try_into_operation()) | ||||
|                     .unwrap(), | ||||
|                     created_at: datetime!(2023-02-01 0:00), | ||||
|                 }, | ||||
|                 &master_key.to_vec().into(), | ||||
|             ) | ||||
|             .await | ||||
|             .unwrap(); | ||||
|         let merchant_key_store = mockdb | ||||
|             .get_merchant_key_store_by_merchant_id( | ||||
|                 key_manager_state, | ||||
|                 &merchant_id, | ||||
|                 &master_key.to_vec().into(), | ||||
|             ) | ||||
|             .await | ||||
|             .unwrap(); | ||||
|  | ||||
|         let event1 = mockdb | ||||
|             .insert_event( | ||||
|                 key_manager_state, | ||||
|                 domain::Event { | ||||
|                     event_id: event_id.into(), | ||||
|                     event_type: enums::EventType::PaymentSucceeded, | ||||
|                     event_class: enums::EventClass::Payments, | ||||
|                     is_webhook_notified: false, | ||||
|                     primary_object_id: payment_id.into(), | ||||
|                     primary_object_type: enums::EventObjectType::PaymentDetails, | ||||
|                     created_at: common_utils::date_time::now(), | ||||
|                     merchant_id: Some(merchant_id.to_owned()), | ||||
|                     business_profile_id: Some(business_profile_id.to_owned()), | ||||
|                     primary_object_created_at: Some(common_utils::date_time::now()), | ||||
|                     idempotent_event_id: Some(event_id.into()), | ||||
|                     initial_attempt_id: Some(event_id.into()), | ||||
|                     request: None, | ||||
|                     response: None, | ||||
|                     delivery_attempt: Some(enums::WebhookDeliveryAttempt::InitialAttempt), | ||||
|                     metadata: Some(EventMetadata::Payment { | ||||
|                         payment_id: common_utils::id_type::GlobalPaymentId::try_from( | ||||
|                             std::borrow::Cow::Borrowed(payment_id), | ||||
|                         ) | ||||
|                         .unwrap(), | ||||
|                     }), | ||||
|                     is_overall_delivery_successful: Some(false), | ||||
|                 }, | ||||
|                 &merchant_key_store, | ||||
|             ) | ||||
|             .await | ||||
|             .unwrap(); | ||||
|  | ||||
|         assert_eq!(event1.event_id, event_id); | ||||
|  | ||||
|         let updated_event = mockdb | ||||
|             .update_event_by_merchant_id_event_id( | ||||
|                 key_manager_state, | ||||
|                 &merchant_id, | ||||
|                 event_id, | ||||
|                 domain::EventUpdate::UpdateResponse { | ||||
|                     is_webhook_notified: true, | ||||
|                     response: None, | ||||
|                 }, | ||||
|                 &merchant_key_store, | ||||
|             ) | ||||
|             .await | ||||
|             .unwrap(); | ||||
|  | ||||
|         assert!(updated_event.is_webhook_notified); | ||||
|         assert_eq!(updated_event.primary_object_id, payment_id); | ||||
|         assert_eq!(updated_event.event_id, event_id); | ||||
|     } | ||||
| } | ||||
|  | ||||
| @ -50,15 +50,23 @@ pub enum OutgoingWebhookEventContent { | ||||
|     #[cfg(feature = "v2")] | ||||
|     Refund { | ||||
|         payment_id: common_utils::id_type::GlobalPaymentId, | ||||
|         refund_id: String, | ||||
|         refund_id: common_utils::id_type::GlobalRefundId, | ||||
|         content: Value, | ||||
|     }, | ||||
|     #[cfg(feature = "v1")] | ||||
|     Dispute { | ||||
|         payment_id: common_utils::id_type::PaymentId, | ||||
|         attempt_id: String, | ||||
|         dispute_id: String, | ||||
|         content: Value, | ||||
|     }, | ||||
|     #[cfg(feature = "v2")] | ||||
|     Dispute { | ||||
|         payment_id: common_utils::id_type::GlobalPaymentId, | ||||
|         attempt_id: String, | ||||
|         dispute_id: String, | ||||
|         content: Value, | ||||
|     }, | ||||
|     Mandate { | ||||
|         payment_method_id: String, | ||||
|         mandate_id: String, | ||||
| @ -118,17 +126,14 @@ impl OutgoingWebhookEventMetric for OutgoingWebhookContent { | ||||
|             }), | ||||
|             Self::RefundDetails(refund_payload) => Some(OutgoingWebhookEventContent::Refund { | ||||
|                 payment_id: refund_payload.payment_id.clone(), | ||||
|                 refund_id: refund_payload.get_refund_id_as_string(), | ||||
|                 refund_id: refund_payload.id.clone(), | ||||
|                 content: masking::masked_serialize(&refund_payload) | ||||
|                     .unwrap_or(serde_json::json!({"error":"failed to serialize"})), | ||||
|             }), | ||||
|             Self::DisputeDetails(dispute_payload) => Some(OutgoingWebhookEventContent::Dispute { | ||||
|                 payment_id: dispute_payload.payment_id.clone(), | ||||
|                 attempt_id: dispute_payload.attempt_id.clone(), | ||||
|                 dispute_id: dispute_payload.dispute_id.clone(), | ||||
|                 content: masking::masked_serialize(&dispute_payload) | ||||
|                     .unwrap_or(serde_json::json!({"error":"failed to serialize"})), | ||||
|             }), | ||||
|             Self::DisputeDetails(dispute_payload) => { | ||||
|                 //TODO: add support for dispute outgoing webhook | ||||
|                 todo!() | ||||
|             } | ||||
|             Self::MandateDetails(mandate_payload) => Some(OutgoingWebhookEventContent::Mandate { | ||||
|                 payment_method_id: mandate_payload.payment_method_id.clone(), | ||||
|                 mandate_id: mandate_payload.mandate_id.clone(), | ||||
|  | ||||
		Reference in New Issue
	
	Block a user