feat(events): add APIs to list webhook events and webhook delivery attempts (#4131)

This commit is contained in:
Sanchith Hegde
2024-03-21 19:02:17 +05:30
committed by GitHub
parent 4f8461b2a9
commit 14e1bbaf07
54 changed files with 4472 additions and 2705 deletions

View File

@ -1,11 +1,14 @@
pub mod types;
pub mod utils;
#[cfg(feature = "olap")]
pub mod webhook_events;
use std::{str::FromStr, time::Instant};
use actix_web::FromRequest;
use api_models::{
payments::HeaderPayload,
webhook_events::{OutgoingWebhookRequestContent, OutgoingWebhookResponseContent},
webhooks::{self, WebhookResponseTracker},
};
use common_utils::{
@ -669,7 +672,7 @@ pub(crate) async fn create_event_and_trigger_outgoing_webhook(
content: api::OutgoingWebhookContent,
primary_object_created_at: Option<time::PrimitiveDateTime>,
) -> CustomResult<(), errors::ApiErrorResponse> {
let delivery_attempt = types::WebhookDeliveryAttempt::InitialAttempt;
let delivery_attempt = enums::WebhookDeliveryAttempt::InitialAttempt;
let idempotent_event_id =
utils::get_idempotent_event_id(&primary_object_id, event_type, delivery_attempt);
let webhook_url_result = get_webhook_url_from_business_profile(&business_profile);
@ -734,6 +737,7 @@ pub(crate) async fn create_event_and_trigger_outgoing_webhook(
.attach_printable("Failed to encrypt outgoing webhook request content")?,
),
response: None,
delivery_attempt: Some(delivery_attempt),
};
let event_insert_result = state
@ -801,8 +805,8 @@ pub(crate) async fn trigger_webhook_and_raise_event(
business_profile: diesel_models::business_profile::BusinessProfile,
merchant_key_store: &domain::MerchantKeyStore,
event: domain::Event,
request_content: types::OutgoingWebhookRequestContent,
delivery_attempt: types::WebhookDeliveryAttempt,
request_content: OutgoingWebhookRequestContent,
delivery_attempt: enums::WebhookDeliveryAttempt,
content: Option<api::OutgoingWebhookContent>,
process_tracker: Option<storage::ProcessTracker>,
) {
@ -833,8 +837,8 @@ async fn trigger_webhook_to_merchant(
business_profile: diesel_models::business_profile::BusinessProfile,
merchant_key_store: &domain::MerchantKeyStore,
event: domain::Event,
request_content: types::OutgoingWebhookRequestContent,
delivery_attempt: types::WebhookDeliveryAttempt,
request_content: OutgoingWebhookRequestContent,
delivery_attempt: enums::WebhookDeliveryAttempt,
process_tracker: Option<storage::ProcessTracker>,
) -> CustomResult<(), errors::WebhooksFlowError> {
let webhook_url = match (
@ -875,7 +879,7 @@ async fn trigger_webhook_to_merchant(
.attach_default_headers()
.headers(headers)
.set_body(RequestContent::RawBytes(
request_content.payload.expose().into_bytes(),
request_content.body.expose().into_bytes(),
))
.build();
@ -896,7 +900,7 @@ async fn trigger_webhook_to_merchant(
let api_client_error_handler =
|client_error: error_stack::Report<errors::ApiClientError>,
delivery_attempt: types::WebhookDeliveryAttempt| {
delivery_attempt: enums::WebhookDeliveryAttempt| {
let error =
client_error.change_context(errors::WebhooksFlowError::CallToMerchantFailed);
logger::error!(
@ -907,6 +911,7 @@ async fn trigger_webhook_to_merchant(
};
let update_event_in_storage = |state: AppState,
merchant_key_store: domain::MerchantKeyStore,
merchant_id: String,
event_id: String,
response: reqwest::Response| async move {
let status_code = response.status();
@ -931,7 +936,7 @@ async fn trigger_webhook_to_merchant(
)
})
.collect::<Vec<_>>();
let response_payload = response
let response_body = response
.text()
.await
.map(Secret::from)
@ -939,8 +944,8 @@ async fn trigger_webhook_to_merchant(
logger::warn!("Response contains non-UTF-8 characters: {error:?}");
Secret::from(String::from("Non-UTF-8 response body"))
});
let response_to_store = types::OutgoingWebhookResponseContent {
payload: response_payload,
let response_to_store = OutgoingWebhookResponseContent {
body: response_body,
headers: response_headers,
status_code: status_code.as_u16(),
};
@ -964,7 +969,12 @@ async fn trigger_webhook_to_merchant(
};
state
.store
.update_event(event_id, event_update, &merchant_key_store)
.update_event_by_merchant_id_event_id(
&merchant_id,
&event_id,
event_update,
&merchant_key_store,
)
.await
.change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed)
};
@ -992,7 +1002,7 @@ async fn trigger_webhook_to_merchant(
}
};
let error_response_handler = |merchant_id: String,
delivery_attempt: types::WebhookDeliveryAttempt,
delivery_attempt: enums::WebhookDeliveryAttempt,
status_code: u16,
log_message: &'static str| {
metrics::WEBHOOK_OUTGOING_NOT_RECEIVED_COUNT.add(
@ -1006,13 +1016,14 @@ async fn trigger_webhook_to_merchant(
};
match delivery_attempt {
types::WebhookDeliveryAttempt::InitialAttempt => match response {
enums::WebhookDeliveryAttempt::InitialAttempt => match response {
Err(client_error) => api_client_error_handler(client_error, delivery_attempt),
Ok(response) => {
let status_code = response.status();
let _updated_event = update_event_in_storage(
state.clone(),
merchant_key_store.clone(),
business_profile.merchant_id.clone(),
event_id.clone(),
response,
)
@ -1036,7 +1047,7 @@ async fn trigger_webhook_to_merchant(
}
}
},
types::WebhookDeliveryAttempt::AutomaticRetry => {
enums::WebhookDeliveryAttempt::AutomaticRetry => {
let process_tracker = process_tracker
.get_required_value("process_tracker")
.change_context(errors::WebhooksFlowError::OutgoingWebhookRetrySchedulingFailed)
@ -1060,6 +1071,7 @@ async fn trigger_webhook_to_merchant(
let _updated_event = update_event_in_storage(
state.clone(),
merchant_key_store.clone(),
business_profile.merchant_id.clone(),
event_id.clone(),
response,
)
@ -1094,6 +1106,10 @@ async fn trigger_webhook_to_merchant(
}
}
}
enums::WebhookDeliveryAttempt::ManualRetry => {
// Will be updated when manual retry is implemented
Err(errors::WebhooksFlowError::NotReceivedByMerchant).into_report()?
}
}
Ok(())
@ -1717,12 +1733,12 @@ pub(crate) fn get_outgoing_webhook_request(
merchant_account: &domain::MerchantAccount,
outgoing_webhook: api::OutgoingWebhook,
payment_response_hash_key: Option<&str>,
) -> CustomResult<types::OutgoingWebhookRequestContent, errors::WebhooksFlowError> {
) -> CustomResult<OutgoingWebhookRequestContent, errors::WebhooksFlowError> {
#[inline]
fn get_outgoing_webhook_request_inner<WebhookType: types::OutgoingWebhookType>(
outgoing_webhook: api::OutgoingWebhook,
payment_response_hash_key: Option<&str>,
) -> CustomResult<types::OutgoingWebhookRequestContent, errors::WebhooksFlowError> {
) -> CustomResult<OutgoingWebhookRequestContent, errors::WebhooksFlowError> {
let mut headers = vec![(
reqwest::header::CONTENT_TYPE.to_string(),
mime::APPLICATION_JSON.essence_str().into(),
@ -1737,8 +1753,8 @@ pub(crate) fn get_outgoing_webhook_request(
WebhookType::add_webhook_header(&mut headers, signature)
}
Ok(types::OutgoingWebhookRequestContent {
payload: outgoing_webhooks_signature.payload,
Ok(OutgoingWebhookRequestContent {
body: outgoing_webhooks_signature.payload,
headers: headers
.into_iter()
.map(|(name, value)| (name, Secret::new(value.into_inner())))