From ce0ac3d0297da5372772efe19167f0d2f62e82eb Mon Sep 17 00:00:00 2001 From: Sanchith Hegde <22217505+SanchithHegde@users.noreply.github.com> Date: Thu, 7 Mar 2024 18:36:43 +0530 Subject: [PATCH] fix(webhooks): abort outgoing webhook retry task if webhook URL is not available in business profile (#3997) --- crates/router/src/core/errors.rs | 41 ++++---- crates/router/src/core/webhooks.rs | 159 ++++++++++++++++++----------- 2 files changed, 120 insertions(+), 80 deletions(-) diff --git a/crates/router/src/core/errors.rs b/crates/router/src/core/errors.rs index 968181061a..609337744d 100644 --- a/crates/router/src/core/errors.rs +++ b/crates/router/src/core/errors.rs @@ -259,45 +259,44 @@ pub enum WebhooksFlowError { #[error("Webhook details for merchant not configured")] MerchantWebhookDetailsNotFound, #[error("Merchant does not have a webhook URL configured")] - MerchantWebhookURLNotConfigured, - #[error("Payments core flow failed")] - PaymentsCoreFailed, - #[error("Refunds core flow failed")] - RefundsCoreFailed, - #[error("Dispuste core flow failed")] - DisputeCoreFailed, - #[error("Webhook event creation failed")] - WebhookEventCreationFailed, + MerchantWebhookUrlNotConfigured, #[error("Webhook event updation failed")] WebhookEventUpdationFailed, #[error("Outgoing webhook body signing failed")] OutgoingWebhookSigningFailed, - #[error("Unable to fork webhooks flow for outgoing webhooks")] - ForkFlowFailed, #[error("Webhook api call to merchant failed")] CallToMerchantFailed, #[error("Webhook not received by merchant")] NotReceivedByMerchant, - #[error("Resource not found")] - ResourceNotFound, - #[error("Webhook source verification failed")] - WebhookSourceVerificationFailed, - #[error("Webhook event object creation failed")] - WebhookEventObjectCreationFailed, - #[error("Not implemented")] - NotImplemented, #[error("Dispute webhook status validation failed")] DisputeWebhookValidationFailed, #[error("Outgoing webhook body encoding failed")] OutgoingWebhookEncodingFailed, - #[error("Missing required field: {field_name}")] - MissingRequiredField { field_name: &'static str }, #[error("Failed to update outgoing webhook process tracker task")] OutgoingWebhookProcessTrackerTaskUpdateFailed, #[error("Failed to schedule retry attempt for outgoing webhook")] OutgoingWebhookRetrySchedulingFailed, } +impl WebhooksFlowError { + pub(crate) fn is_webhook_delivery_retryable_error(&self) -> bool { + match self { + Self::MerchantConfigNotFound + | Self::MerchantWebhookDetailsNotFound + | Self::MerchantWebhookUrlNotConfigured => false, + + Self::WebhookEventUpdationFailed + | Self::OutgoingWebhookSigningFailed + | Self::CallToMerchantFailed + | Self::NotReceivedByMerchant + | Self::DisputeWebhookValidationFailed + | Self::OutgoingWebhookEncodingFailed + | Self::OutgoingWebhookProcessTrackerTaskUpdateFailed + | Self::OutgoingWebhookRetrySchedulingFailed => true, + } + } +} + #[derive(Debug, thiserror::Error)] pub enum ApplePayDecryptionError { #[error("Failed to base64 decode input data")] diff --git a/crates/router/src/core/webhooks.rs b/crates/router/src/core/webhooks.rs index b3e04e72f3..8d03bea06c 100644 --- a/crates/router/src/core/webhooks.rs +++ b/crates/router/src/core/webhooks.rs @@ -659,8 +659,21 @@ pub(crate) async fn create_event_and_trigger_outgoing_webhook( primary_object_type: enums::EventObjectType, content: api::OutgoingWebhookContent, ) -> CustomResult<(), errors::ApiErrorResponse> { - let merchant_id = business_profile.merchant_id.clone(); let event_id = format!("{primary_object_id}_{event_type}"); + + if !state.conf.webhooks.outgoing_enabled + || get_webhook_url_from_business_profile(&business_profile).is_err() + { + logger::debug!( + business_profile_id=%business_profile.profile_id, + %event_id, + "Outgoing webhooks are disabled in application configuration, or merchant webhook URL \ + could not be obtained; skipping outgoing webhooks for event" + ); + return Ok(()); + } + + let merchant_id = business_profile.merchant_id.clone(); let new_event = storage::EventNew { event_id: event_id.clone(), event_type, @@ -688,50 +701,48 @@ pub(crate) async fn create_event_and_trigger_outgoing_webhook( } }?; - if state.conf.webhooks.outgoing_enabled { - let outgoing_webhook = api::OutgoingWebhook { - merchant_id: merchant_id.clone(), - event_id: event.event_id.clone(), - event_type: event.event_type, - content: content.clone(), - timestamp: event.created_at, - }; + let outgoing_webhook = api::OutgoingWebhook { + merchant_id: merchant_id.clone(), + event_id: event.event_id.clone(), + event_type: event.event_type, + content: content.clone(), + timestamp: event.created_at, + }; - let process_tracker = add_outgoing_webhook_retry_task_to_process_tracker( - &*state.store, - &business_profile, - &event, - ) - .await - .map_err(|error| { - logger::error!( - ?error, - "Failed to add outgoing webhook retry task to process tracker" - ); - error - }) - .ok(); - - // Using a tokio spawn here and not arbiter because not all caller of this function - // may have an actix arbiter - tokio::spawn( - async move { - trigger_appropriate_webhook_and_raise_event( - state, - merchant_account, - business_profile, - outgoing_webhook, - types::WebhookDeliveryAttempt::InitialAttempt, - content, - event.event_id, - event_type, - process_tracker, - ) - .await; - } - .in_current_span(), + let process_tracker = add_outgoing_webhook_retry_task_to_process_tracker( + &*state.store, + &business_profile, + &event, + ) + .await + .map_err(|error| { + logger::error!( + ?error, + "Failed to add outgoing webhook retry task to process tracker" ); - } + error + }) + .ok(); + + // Using a tokio spawn here and not arbiter because not all caller of this function + // may have an actix arbiter + tokio::spawn( + async move { + trigger_appropriate_webhook_and_raise_event( + state, + merchant_account, + business_profile, + outgoing_webhook, + types::WebhookDeliveryAttempt::InitialAttempt, + content, + event.event_id, + event_type, + process_tracker, + ) + .await; + } + .in_current_span(), + ); Ok(()) } @@ -817,21 +828,30 @@ async fn trigger_webhook_to_merchant( delivery_attempt: types::WebhookDeliveryAttempt, process_tracker: Option, ) -> CustomResult<(), errors::WebhooksFlowError> { - let webhook_details_json = business_profile - .webhook_details - .get_required_value("webhook_details") - .change_context(errors::WebhooksFlowError::MerchantWebhookDetailsNotFound)?; - - let webhook_details: api::WebhookDetails = - webhook_details_json - .parse_value("WebhookDetails") - .change_context(errors::WebhooksFlowError::MerchantWebhookDetailsNotFound)?; - - let webhook_url = webhook_details - .webhook_url - .get_required_value("webhook_url") - .change_context(errors::WebhooksFlowError::MerchantWebhookURLNotConfigured) - .map(ExposeInterface::expose)?; + let webhook_url = match ( + get_webhook_url_from_business_profile(&business_profile), + process_tracker.clone(), + ) { + (Ok(webhook_url), _) => Ok(webhook_url), + (Err(error), Some(process_tracker)) => { + if !error + .current_context() + .is_webhook_delivery_retryable_error() + { + logger::debug!("Failed to obtain merchant webhook URL, aborting retries"); + state + .store + .as_scheduler() + .finish_process_with_business_status(process_tracker, "FAILURE".into()) + .await + .change_context( + errors::WebhooksFlowError::OutgoingWebhookProcessTrackerTaskUpdateFailed, + )?; + } + Err(error) + } + (Err(error), None) => Err(error), + }?; let outgoing_webhook_event_id = webhook.event_id.clone(); @@ -1579,7 +1599,7 @@ pub async fn add_outgoing_webhook_retry_task_to_process_tracker( let process_tracker_id = scheduler::utils::get_process_tracker_id( runner, task, - &event.primary_object_id, + &event.event_id, &business_profile.merchant_id, ); let process_tracker_entry = storage::ProcessTrackerNew::new( @@ -1611,3 +1631,24 @@ pub async fn add_outgoing_webhook_retry_task_to_process_tracker( } } } + +fn get_webhook_url_from_business_profile( + business_profile: &diesel_models::business_profile::BusinessProfile, +) -> CustomResult { + let webhook_details_json = business_profile + .webhook_details + .clone() + .get_required_value("webhook_details") + .change_context(errors::WebhooksFlowError::MerchantWebhookDetailsNotFound)?; + + let webhook_details: api::WebhookDetails = + webhook_details_json + .parse_value("WebhookDetails") + .change_context(errors::WebhooksFlowError::MerchantWebhookDetailsNotFound)?; + + webhook_details + .webhook_url + .get_required_value("webhook_url") + .change_context(errors::WebhooksFlowError::MerchantWebhookUrlNotConfigured) + .map(ExposeInterface::expose) +}