feat(webhooks): store request and response payloads in events table (#4029)

This commit is contained in:
Sanchith Hegde
2024-03-14 12:30:37 +05:30
committed by GitHub
parent ad17cc7383
commit fd67a6c225
32 changed files with 1267 additions and 644 deletions

View File

@ -8,9 +8,11 @@ use api_models::{
payments::HeaderPayload,
webhooks::{self, WebhookResponseTracker},
};
use common_utils::{errors::ReportSwitchExt, events::ApiEventsType, request::RequestContent};
use common_utils::{
errors::ReportSwitchExt, events::ApiEventsType, ext_traits::Encode, request::RequestContent,
};
use error_stack::{report, IntoReport, ResultExt};
use masking::ExposeInterface;
use masking::{ExposeInterface, Mask, PeekInterface, Secret};
use router_env::{
instrument,
tracing::{self, Instrument},
@ -39,7 +41,7 @@ use crate::{
services::{self, authentication as auth},
types::{
api::{self, mandates::MandateResponseExt},
domain,
domain::{self, types as domain_types},
storage::{self, enums},
transformers::{ForeignInto, ForeignTryInto},
},
@ -96,7 +98,7 @@ pub async fn payments_incoming_webhook_flow<Ctx: PaymentMethodRetrieve>(
>(
state.clone(),
merchant_account.clone(),
key_store,
key_store.clone(),
payments::operations::PaymentStatus,
api::PaymentsRetrieveRequest {
resource_id: id,
@ -168,16 +170,18 @@ pub async fn payments_incoming_webhook_flow<Ctx: PaymentMethodRetrieve>(
// If event is NOT an UnsupportedEvent, trigger Outgoing Webhook
if let Some(outgoing_event_type) = event_type {
let primary_object_created_at = payments_response.created;
create_event_and_trigger_outgoing_webhook(
state,
merchant_account,
business_profile,
&key_store,
outgoing_event_type,
enums::EventClass::Payments,
None,
payment_id.clone(),
enums::EventObjectType::PaymentDetails,
api::OutgoingWebhookContent::PaymentDetails(payments_response),
primary_object_created_at,
)
.await?;
};
@ -258,7 +262,7 @@ pub async fn refunds_incoming_webhook_flow(
Box::pin(refunds::refund_retrieve_core(
state.clone(),
merchant_account.clone(),
key_store,
key_store.clone(),
api_models::refunds::RefundsRetrieveRequest {
refund_id: refund_id.to_owned(),
force_sync: Some(true),
@ -278,12 +282,13 @@ pub async fn refunds_incoming_webhook_flow(
state,
merchant_account,
business_profile,
&key_store,
outgoing_event_type,
enums::EventClass::Refunds,
None,
refund_id,
enums::EventObjectType::RefundDetails,
api::OutgoingWebhookContent::RefundDetails(refund_response),
Some(updated_refund.created_at),
)
.await?;
}
@ -463,7 +468,7 @@ pub async fn mandates_incoming_webhook_flow(
let mandates_response = Box::new(
api::mandates::MandateResponse::from_db_mandate(
&state,
key_store,
key_store.clone(),
updated_mandate.clone(),
)
.await?,
@ -474,12 +479,13 @@ pub async fn mandates_incoming_webhook_flow(
state,
merchant_account,
business_profile,
&key_store,
outgoing_event_type,
enums::EventClass::Mandates,
None,
updated_mandate.mandate_id.clone(),
enums::EventObjectType::MandateDetails,
api::OutgoingWebhookContent::MandateDetails(mandates_response),
Some(updated_mandate.created_at),
)
.await?;
}
@ -499,6 +505,7 @@ pub async fn disputes_incoming_webhook_flow(
state: AppState,
merchant_account: domain::MerchantAccount,
business_profile: diesel_models::business_profile::BusinessProfile,
key_store: domain::MerchantKeyStore,
webhook_details: api::IncomingWebhookDetails,
source_verified: bool,
connector: &(dyn api::Connector + Sync),
@ -541,12 +548,13 @@ pub async fn disputes_incoming_webhook_flow(
state,
merchant_account,
business_profile,
&key_store,
event_type,
enums::EventClass::Disputes,
None,
dispute_object.dispute_id.clone(),
enums::EventObjectType::DisputeDetails,
api::OutgoingWebhookContent::DisputeDetails(disputes_response),
Some(dispute_object.created_at),
)
.await?;
metrics::INCOMING_DISPUTE_WEBHOOK_MERCHANT_NOTIFIED_METRIC.add(&metrics::CONTEXT, 1, &[]);
@ -594,7 +602,7 @@ async fn bank_transfer_webhook_flow<Ctx: PaymentMethodRetrieve>(
>(
state.clone(),
merchant_account.to_owned(),
key_store,
key_store.clone(),
payments::PaymentConfirm,
request,
services::api::AuthFlow::Merchant,
@ -623,16 +631,18 @@ async fn bank_transfer_webhook_flow<Ctx: PaymentMethodRetrieve>(
// If event is NOT an UnsupportedEvent, trigger Outgoing Webhook
if let Some(outgoing_event_type) = event_type {
let primary_object_created_at = payments_response.created;
create_event_and_trigger_outgoing_webhook(
state,
merchant_account,
business_profile,
&key_store,
outgoing_event_type,
enums::EventClass::Payments,
None,
payment_id.clone(),
enums::EventObjectType::PaymentDetails,
api::OutgoingWebhookContent::PaymentDetails(payments_response),
primary_object_created_at,
)
.await?;
}
@ -652,45 +662,91 @@ pub(crate) async fn create_event_and_trigger_outgoing_webhook(
state: AppState,
merchant_account: domain::MerchantAccount,
business_profile: diesel_models::business_profile::BusinessProfile,
merchant_key_store: &domain::MerchantKeyStore,
event_type: enums::EventType,
event_class: enums::EventClass,
intent_reference_id: Option<String>,
primary_object_id: String,
primary_object_type: enums::EventObjectType,
content: api::OutgoingWebhookContent,
primary_object_created_at: Option<time::PrimitiveDateTime>,
) -> CustomResult<(), errors::ApiErrorResponse> {
let event_id = format!("{primary_object_id}_{event_type}");
let delivery_attempt = types::WebhookDeliveryAttempt::InitialAttempt;
let idempotent_event_id =
utils::get_idempotent_event_id(&primary_object_id, event_type, delivery_attempt);
let webhook_url_result = get_webhook_url_from_business_profile(&business_profile);
if !state.conf.webhooks.outgoing_enabled
|| get_webhook_url_from_business_profile(&business_profile).is_err()
|| webhook_url_result.is_err()
|| webhook_url_result.as_ref().is_ok_and(String::is_empty)
{
logger::debug!(
business_profile_id=%business_profile.profile_id,
%event_id,
%idempotent_event_id,
"Outgoing webhooks are disabled in application configuration, or merchant webhook URL \
could not be obtained; skipping outgoing webhooks for event"
);
return Ok(());
}
let event_id = utils::generate_event_id();
let merchant_id = business_profile.merchant_id.clone();
let new_event = storage::EventNew {
let now = common_utils::date_time::now();
let outgoing_webhook = api::OutgoingWebhook {
merchant_id: merchant_id.clone(),
event_id: event_id.clone(),
event_type,
content: content.clone(),
timestamp: now,
};
let request_content = get_outgoing_webhook_request(
&merchant_account,
outgoing_webhook,
business_profile.payment_response_hash_key.as_deref(),
)
.change_context(errors::ApiErrorResponse::WebhookProcessingFailure)
.attach_printable("Failed to construct outgoing webhook request content")?;
let new_event = domain::Event {
event_id: event_id.clone(),
event_type,
event_class,
is_webhook_notified: false,
intent_reference_id,
primary_object_id,
primary_object_type,
created_at: now,
merchant_id: Some(business_profile.merchant_id.clone()),
business_profile_id: Some(business_profile.profile_id.clone()),
primary_object_created_at,
idempotent_event_id: Some(idempotent_event_id.clone()),
initial_attempt_id: Some(event_id.clone()),
request: Some(
domain_types::encrypt(
request_content
.encode_to_string_of_json()
.change_context(errors::ApiErrorResponse::WebhookProcessingFailure)
.attach_printable("Failed to encode outgoing webhook request content")
.map(Secret::new)?,
merchant_key_store.key.get_inner().peek(),
)
.await
.change_context(errors::ApiErrorResponse::WebhookProcessingFailure)
.attach_printable("Failed to encrypt outgoing webhook request content")?,
),
response: None,
};
let event_insert_result = state.store.insert_event(new_event).await;
let event_insert_result = state
.store
.insert_event(new_event, merchant_key_store)
.await;
let event = match event_insert_result {
Ok(event) => Ok(event),
Err(error) => {
if error.current_context().is_db_unique_violation() {
logger::debug!("Event `{event_id}` already exists in the database");
logger::debug!("Event with idempotent ID `{idempotent_event_id}` already exists in the database");
return Ok(());
} else {
logger::error!(event_insertion_failure=?error);
@ -701,14 +757,6 @@ pub(crate) async fn create_event_and_trigger_outgoing_webhook(
}
}?;
let outgoing_webhook = api::OutgoingWebhook {
merchant_id: merchant_id.clone(),
event_id: event.event_id.clone(),
event_type: event.event_type,
content: content.clone(),
timestamp: event.created_at,
};
let process_tracker = add_outgoing_webhook_retry_task_to_process_tracker(
&*state.store,
&business_profile,
@ -724,19 +772,19 @@ pub(crate) async fn create_event_and_trigger_outgoing_webhook(
})
.ok();
let cloned_key_store = merchant_key_store.clone();
// Using a tokio spawn here and not arbiter because not all caller of this function
// may have an actix arbiter
tokio::spawn(
async move {
trigger_appropriate_webhook_and_raise_event(
trigger_webhook_and_raise_event(
state,
merchant_account,
business_profile,
outgoing_webhook,
types::WebhookDeliveryAttempt::InitialAttempt,
content,
event.event_id,
event_type,
&cloned_key_store,
event,
request_content,
delivery_attempt,
Some(content),
process_tracker,
)
.await;
@ -748,83 +796,45 @@ pub(crate) async fn create_event_and_trigger_outgoing_webhook(
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn trigger_appropriate_webhook_and_raise_event(
#[instrument(skip_all)]
pub(crate) async fn trigger_webhook_and_raise_event(
state: AppState,
merchant_account: domain::MerchantAccount,
business_profile: diesel_models::business_profile::BusinessProfile,
outgoing_webhook: api::OutgoingWebhook,
merchant_key_store: &domain::MerchantKeyStore,
event: domain::Event,
request_content: types::OutgoingWebhookRequestContent,
delivery_attempt: types::WebhookDeliveryAttempt,
content: api::OutgoingWebhookContent,
event_id: String,
event_type: enums::EventType,
content: Option<api::OutgoingWebhookContent>,
process_tracker: Option<storage::ProcessTracker>,
) {
match merchant_account.get_compatible_connector() {
#[cfg(feature = "stripe")]
Some(api_models::enums::Connector::Stripe) => {
trigger_webhook_and_raise_event::<stripe_webhooks::StripeOutgoingWebhook>(
state,
business_profile,
outgoing_webhook,
delivery_attempt,
content,
event_id,
event_type,
process_tracker,
)
.await
}
_ => {
trigger_webhook_and_raise_event::<api_models::webhooks::OutgoingWebhook>(
state,
business_profile,
outgoing_webhook,
delivery_attempt,
content,
event_id,
event_type,
process_tracker,
)
.await
}
}
}
logger::debug!(
event_id=%event.event_id,
idempotent_event_id=?event.idempotent_event_id,
initial_attempt_id=?event.initial_attempt_id,
"Attempting to send webhook"
);
#[allow(clippy::too_many_arguments)]
async fn trigger_webhook_and_raise_event<W: types::OutgoingWebhookType>(
state: AppState,
business_profile: diesel_models::business_profile::BusinessProfile,
outgoing_webhook: api::OutgoingWebhook,
delivery_attempt: types::WebhookDeliveryAttempt,
content: api::OutgoingWebhookContent,
event_id: String,
event_type: enums::EventType,
process_tracker: Option<storage::ProcessTracker>,
) {
let merchant_id = business_profile.merchant_id.clone();
let trigger_webhook_result = trigger_webhook_to_merchant::<W>(
let trigger_webhook_result = trigger_webhook_to_merchant(
state.clone(),
business_profile,
outgoing_webhook,
merchant_key_store,
event.clone(),
request_content,
delivery_attempt,
process_tracker,
)
.await;
raise_webhooks_analytics_event(
state,
trigger_webhook_result,
content,
&merchant_id,
&event_id,
event_type,
);
raise_webhooks_analytics_event(state, trigger_webhook_result, content, merchant_id, event);
}
async fn trigger_webhook_to_merchant<W: types::OutgoingWebhookType>(
async fn trigger_webhook_to_merchant(
state: AppState,
business_profile: diesel_models::business_profile::BusinessProfile,
webhook: api::OutgoingWebhook,
merchant_key_store: &domain::MerchantKeyStore,
event: domain::Event,
request_content: types::OutgoingWebhookRequestContent,
delivery_attempt: types::WebhookDeliveryAttempt,
process_tracker: Option<storage::ProcessTracker>,
) -> CustomResult<(), errors::WebhooksFlowError> {
@ -853,28 +863,21 @@ async fn trigger_webhook_to_merchant<W: types::OutgoingWebhookType>(
(Err(error), None) => Err(error),
}?;
let outgoing_webhook_event_id = webhook.event_id.clone();
let transformed_outgoing_webhook = W::from(webhook);
let outgoing_webhooks_signature = transformed_outgoing_webhook
.get_outgoing_webhooks_signature(business_profile.payment_response_hash_key.clone())?;
let mut header = vec![(
reqwest::header::CONTENT_TYPE.to_string(),
mime::APPLICATION_JSON.essence_str().into(),
)];
if let Some(signature) = outgoing_webhooks_signature {
W::add_webhook_header(&mut header, signature)
}
let event_id = event.event_id;
let headers = request_content
.headers
.into_iter()
.map(|(name, value)| (name, value.into_masked()))
.collect();
let request = services::RequestBuilder::new()
.method(services::Method::Post)
.url(&webhook_url)
.attach_default_headers()
.headers(header)
.set_body(RequestContent::Json(Box::new(transformed_outgoing_webhook)))
.headers(headers)
.set_body(RequestContent::RawBytes(
request_content.payload.expose().into_bytes(),
))
.build();
let response = state
@ -903,10 +906,72 @@ async fn trigger_webhook_to_merchant<W: types::OutgoingWebhookType>(
"An error occurred when sending webhook to merchant"
);
};
let update_event_in_storage = |state: AppState,
merchant_key_store: domain::MerchantKeyStore,
event_id: String,
response: reqwest::Response| async move {
let status_code = response.status();
let is_webhook_notified = status_code.is_success();
let response_headers = response
.headers()
.iter()
.map(|(name, value)| {
(
name.as_str().to_owned(),
value
.to_str()
.map(|s| Secret::from(String::from(s)))
.unwrap_or_else(|error| {
logger::warn!(
"Response header {} contains non-UTF-8 characters: {error:?}",
name.as_str()
);
Secret::from(String::from("Non-UTF-8 header value"))
}),
)
})
.collect::<Vec<_>>();
let response_payload = response
.text()
.await
.map(Secret::from)
.unwrap_or_else(|error| {
logger::warn!("Response contains non-UTF-8 characters: {error:?}");
Secret::from(String::from("Non-UTF-8 response body"))
});
let response_to_store = types::OutgoingWebhookResponseContent {
payload: response_payload,
headers: response_headers,
status_code: status_code.as_u16(),
};
let event_update = domain::EventUpdate::UpdateResponse {
is_webhook_notified,
response: Some(
domain_types::encrypt(
response_to_store
.encode_to_string_of_json()
.change_context(
errors::WebhooksFlowError::OutgoingWebhookResponseEncodingFailed,
)
.map(Secret::new)?,
merchant_key_store.key.get_inner().peek(),
)
.await
.change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed)
.attach_printable("Failed to encrypt outgoing webhook request content")?,
),
};
state
.store
.update_event(event_id, event_update, &merchant_key_store)
.await
.change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed)
};
let success_response_handler =
|state: AppState,
merchant_id: String,
outgoing_webhook_event_id: String,
process_tracker: Option<storage::ProcessTracker>,
business_status: &'static str| async move {
metrics::WEBHOOK_OUTGOING_RECEIVED_COUNT.add(
@ -915,15 +980,6 @@ async fn trigger_webhook_to_merchant<W: types::OutgoingWebhookType>(
&[metrics::KeyValue::new(MERCHANT_ID, merchant_id)],
);
let update_event = storage::EventUpdate::UpdateWebhookNotified {
is_webhook_notified: Some(true),
};
state
.store
.update_event(outgoing_webhook_event_id, update_event)
.await
.change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed)?;
match process_tracker {
Some(process_tracker) => state
.store
@ -954,11 +1010,19 @@ async fn trigger_webhook_to_merchant<W: types::OutgoingWebhookType>(
types::WebhookDeliveryAttempt::InitialAttempt => match response {
Err(client_error) => api_client_error_handler(client_error, delivery_attempt),
Ok(response) => {
if response.status().is_success() {
let status_code = response.status();
let _updated_event = update_event_in_storage(
state.clone(),
merchant_key_store.clone(),
event_id.clone(),
response,
)
.await?;
if status_code.is_success() {
success_response_handler(
state.clone(),
business_profile.merchant_id,
outgoing_webhook_event_id,
process_tracker,
"INITIAL_DELIVERY_ATTEMPT_SUCCESSFUL",
)
@ -967,7 +1031,7 @@ async fn trigger_webhook_to_merchant<W: types::OutgoingWebhookType>(
error_response_handler(
business_profile.merchant_id,
delivery_attempt,
response.status().as_u16(),
status_code.as_u16(),
"Ignoring error when sending webhook to merchant",
);
}
@ -993,11 +1057,19 @@ async fn trigger_webhook_to_merchant<W: types::OutgoingWebhookType>(
)?;
}
Ok(response) => {
if response.status().is_success() {
let status_code = response.status();
let _updated_event = update_event_in_storage(
state.clone(),
merchant_key_store.clone(),
event_id.clone(),
response,
)
.await?;
if status_code.is_success() {
success_response_handler(
state.clone(),
business_profile.merchant_id,
outgoing_webhook_event_id,
Some(process_tracker),
"COMPLETED_BY_PT",
)
@ -1006,7 +1078,7 @@ async fn trigger_webhook_to_merchant<W: types::OutgoingWebhookType>(
error_response_handler(
business_profile.merchant_id.clone(),
delivery_attempt,
response.status().as_u16(),
status_code.as_u16(),
"An error occurred when sending webhook to merchant",
);
// Schedule a retry attempt for webhook delivery
@ -1031,10 +1103,9 @@ async fn trigger_webhook_to_merchant<W: types::OutgoingWebhookType>(
fn raise_webhooks_analytics_event(
state: AppState,
trigger_webhook_result: CustomResult<(), errors::WebhooksFlowError>,
content: api::OutgoingWebhookContent,
merchant_id: &str,
event_id: &str,
event_type: enums::EventType,
content: Option<api::OutgoingWebhookContent>,
merchant_id: String,
event: domain::Event,
) {
let error = if let Err(error) = trigger_webhook_result {
logger::error!(?error, "Failed to send webhook to merchant");
@ -1051,13 +1122,16 @@ fn raise_webhooks_analytics_event(
None
};
let outgoing_webhook_event_content = content.get_outgoing_webhook_event_content();
let outgoing_webhook_event_content = content
.as_ref()
.and_then(api::OutgoingWebhookContent::get_outgoing_webhook_event_content);
let webhook_event = OutgoingWebhookEvent::new(
merchant_id.to_owned(),
event_id.to_owned(),
event_type,
merchant_id,
event.event_id,
event.event_type,
outgoing_webhook_event_content,
error,
event.initial_attempt_id,
);
match RawEvent::try_from(webhook_event.clone()) {
@ -1410,6 +1484,7 @@ pub async fn webhooks_core<W: types::OutgoingWebhookType, Ctx: PaymentMethodRetr
state.clone(),
merchant_account,
business_profile,
key_store,
webhook_details,
source_verified,
*connector,
@ -1570,7 +1645,7 @@ async fn fetch_optional_mca_and_connector(
pub async fn add_outgoing_webhook_retry_task_to_process_tracker(
db: &dyn StorageInterface,
business_profile: &diesel_models::business_profile::BusinessProfile,
event: &storage::Event,
event: &domain::Event,
) -> CustomResult<storage::ProcessTracker, errors::StorageError> {
let schedule_time = outgoing_webhook_retry::get_webhook_delivery_retry_schedule_time(
db,
@ -1591,6 +1666,7 @@ pub async fn add_outgoing_webhook_retry_task_to_process_tracker(
event_class: event.event_class,
primary_object_id: event.primary_object_id.clone(),
primary_object_type: event.primary_object_type,
initial_attempt_id: event.initial_attempt_id.clone(),
};
let runner = storage::ProcessTrackerRunner::OutgoingWebhookRetryWorkflow;
@ -1652,3 +1728,50 @@ fn get_webhook_url_from_business_profile(
.change_context(errors::WebhooksFlowError::MerchantWebhookUrlNotConfigured)
.map(ExposeInterface::expose)
}
pub(crate) fn get_outgoing_webhook_request(
merchant_account: &domain::MerchantAccount,
outgoing_webhook: api::OutgoingWebhook,
payment_response_hash_key: Option<&str>,
) -> CustomResult<types::OutgoingWebhookRequestContent, errors::WebhooksFlowError> {
#[inline]
fn get_outgoing_webhook_request_inner<WebhookType: types::OutgoingWebhookType>(
outgoing_webhook: api::OutgoingWebhook,
payment_response_hash_key: Option<&str>,
) -> CustomResult<types::OutgoingWebhookRequestContent, errors::WebhooksFlowError> {
let mut headers = vec![(
reqwest::header::CONTENT_TYPE.to_string(),
mime::APPLICATION_JSON.essence_str().into(),
)];
let transformed_outgoing_webhook = WebhookType::from(outgoing_webhook);
let outgoing_webhooks_signature = transformed_outgoing_webhook
.get_outgoing_webhooks_signature(payment_response_hash_key)?;
if let Some(signature) = outgoing_webhooks_signature.signature {
WebhookType::add_webhook_header(&mut headers, signature)
}
Ok(types::OutgoingWebhookRequestContent {
payload: outgoing_webhooks_signature.payload,
headers: headers
.into_iter()
.map(|(name, value)| (name, Secret::new(value.into_inner())))
.collect(),
})
}
match merchant_account.get_compatible_connector() {
#[cfg(feature = "stripe")]
Some(api_models::enums::Connector::Stripe) => get_outgoing_webhook_request_inner::<
stripe_webhooks::StripeOutgoingWebhook,
>(
outgoing_webhook, payment_response_hash_key
),
_ => get_outgoing_webhook_request_inner::<api_models::webhooks::OutgoingWebhook>(
outgoing_webhook,
payment_response_hash_key,
),
}
}