feat(webhooks): allow manually retrying delivery of outgoing webhooks (#4176)

This commit is contained in:
Sanchith Hegde
2024-04-04 14:37:51 +05:30
committed by GitHub
parent 21e2d78117
commit 63d2b6855a
16 changed files with 449 additions and 64 deletions

View File

@ -1,19 +1,21 @@
use error_stack::ResultExt;
use masking::PeekInterface;
use router_env::{instrument, tracing};
use crate::{
core::errors::{self, RouterResponse, StorageErrorExt},
routes::AppState,
services::ApplicationResponse,
types::{api, domain, transformers::ForeignTryFrom},
types::{api, domain, storage, transformers::ForeignTryFrom},
utils::{OptionExt, StringExt},
};
const INITIAL_DELIVERY_ATTEMPTS_LIST_MAX_LIMIT: i64 = 100;
#[derive(Debug)]
enum MerchantIdOrProfileId {
MerchantId(String),
ProfileId(String),
enum MerchantAccountOrBusinessProfile {
MerchantAccount(domain::MerchantAccount),
BusinessProfile(storage::BusinessProfile),
}
#[instrument(skip(state))]
@ -27,22 +29,22 @@ pub async fn list_initial_delivery_attempts(
let store = state.store.as_ref();
let (identifier, key_store) =
let (account, key_store) =
determine_identifier_and_get_key_store(state.clone(), merchant_id_or_profile_id).await?;
let events = match constraints {
api_models::webhook_events::EventListConstraintsInternal::ObjectIdFilter { object_id } => {
match identifier {
MerchantIdOrProfileId::MerchantId(merchant_id) => store
match account {
MerchantAccountOrBusinessProfile::MerchantAccount(merchant_account) => store
.list_initial_events_by_merchant_id_primary_object_id(
&merchant_id,
&merchant_account.merchant_id,
&object_id,
&key_store,
)
.await,
MerchantIdOrProfileId::ProfileId(profile_id) => store
MerchantAccountOrBusinessProfile::BusinessProfile(business_profile) => store
.list_initial_events_by_profile_id_primary_object_id(
&profile_id,
&business_profile.profile_id,
&object_id,
&key_store,
)
@ -69,10 +71,10 @@ pub async fn list_initial_delivery_attempts(
_ => None,
};
match identifier {
MerchantIdOrProfileId::MerchantId(merchant_id) => store
match account {
MerchantAccountOrBusinessProfile::MerchantAccount(merchant_account) => store
.list_initial_events_by_merchant_id_constraints(
&merchant_id,
&merchant_account.merchant_id,
created_after,
created_before,
limit,
@ -80,9 +82,9 @@ pub async fn list_initial_delivery_attempts(
&key_store,
)
.await,
MerchantIdOrProfileId::ProfileId(profile_id) => store
MerchantAccountOrBusinessProfile::BusinessProfile(business_profile) => store
.list_initial_events_by_profile_id_constraints(
&profile_id,
&business_profile.profile_id,
created_after,
created_before,
limit,
@ -112,23 +114,23 @@ pub async fn list_delivery_attempts(
) -> RouterResponse<Vec<api::webhook_events::EventRetrieveResponse>> {
let store = state.store.as_ref();
let (identifier, key_store) =
let (account, key_store) =
determine_identifier_and_get_key_store(state.clone(), merchant_id_or_profile_id).await?;
let events = match identifier {
MerchantIdOrProfileId::MerchantId(merchant_id) => {
let events = match account {
MerchantAccountOrBusinessProfile::MerchantAccount(merchant_account) => {
store
.list_events_by_merchant_id_initial_attempt_id(
&merchant_id,
&merchant_account.merchant_id,
&initial_attempt_id,
&key_store,
)
.await
}
MerchantIdOrProfileId::ProfileId(profile_id) => {
MerchantAccountOrBusinessProfile::BusinessProfile(business_profile) => {
store
.list_events_by_profile_id_initial_attempt_id(
&profile_id,
&business_profile.profile_id,
&initial_attempt_id,
&key_store,
)
@ -153,10 +155,108 @@ pub async fn list_delivery_attempts(
}
}
#[instrument(skip(state))]
pub async fn retry_delivery_attempt(
state: AppState,
merchant_id_or_profile_id: String,
event_id: String,
) -> RouterResponse<api::webhook_events::EventRetrieveResponse> {
let store = state.store.as_ref();
let (account, key_store) =
determine_identifier_and_get_key_store(state.clone(), merchant_id_or_profile_id).await?;
let event_to_retry = store
.find_event_by_merchant_id_event_id(&key_store.merchant_id, &event_id, &key_store)
.await
.to_not_found_response(errors::ApiErrorResponse::EventNotFound)?;
let business_profile = match account {
MerchantAccountOrBusinessProfile::MerchantAccount(_) => {
let business_profile_id = event_to_retry
.business_profile_id
.get_required_value("business_profile_id")
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to read business profile ID from event to retry")?;
store
.find_business_profile_by_profile_id(&business_profile_id)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to find business profile")
}
MerchantAccountOrBusinessProfile::BusinessProfile(business_profile) => Ok(business_profile),
}?;
let delivery_attempt = storage::enums::WebhookDeliveryAttempt::ManualRetry;
let new_event_id = super::utils::generate_event_id();
let idempotent_event_id = super::utils::get_idempotent_event_id(
&event_to_retry.primary_object_id,
event_to_retry.event_type,
delivery_attempt,
);
let now = common_utils::date_time::now();
let new_event = domain::Event {
event_id: new_event_id.clone(),
event_type: event_to_retry.event_type,
event_class: event_to_retry.event_class,
is_webhook_notified: false,
primary_object_id: event_to_retry.primary_object_id,
primary_object_type: event_to_retry.primary_object_type,
created_at: now,
merchant_id: Some(business_profile.merchant_id.clone()),
business_profile_id: Some(business_profile.profile_id.clone()),
primary_object_created_at: event_to_retry.primary_object_created_at,
idempotent_event_id: Some(idempotent_event_id),
initial_attempt_id: event_to_retry.initial_attempt_id,
request: event_to_retry.request,
response: None,
delivery_attempt: Some(delivery_attempt),
};
let event = store
.insert_event(new_event, &key_store)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to insert event")?;
// We only allow retrying deliveries for events with `request` populated.
let request_content = event
.request
.as_ref()
.get_required_value("request")
.change_context(errors::ApiErrorResponse::InternalServerError)?
.peek()
.parse_struct("OutgoingWebhookRequestContent")
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to parse webhook event request information")?;
super::trigger_webhook_and_raise_event(
state.clone(),
business_profile,
&key_store,
event,
request_content,
delivery_attempt,
None,
None,
)
.await;
let updated_event = store
.find_event_by_merchant_id_event_id(&key_store.merchant_id, &new_event_id, &key_store)
.await
.to_not_found_response(errors::ApiErrorResponse::EventNotFound)?;
Ok(ApplicationResponse::Json(
api::webhook_events::EventRetrieveResponse::try_from(updated_event)?,
))
}
async fn determine_identifier_and_get_key_store(
state: AppState,
merchant_id_or_profile_id: String,
) -> errors::RouterResult<(MerchantIdOrProfileId, domain::MerchantKeyStore)> {
) -> errors::RouterResult<(MerchantAccountOrBusinessProfile, domain::MerchantKeyStore)> {
let store = state.store.as_ref();
match store
.get_merchant_key_store_by_merchant_id(
@ -165,13 +265,25 @@ async fn determine_identifier_and_get_key_store(
)
.await
{
// Valid merchant ID
Ok(key_store) => Ok((
MerchantIdOrProfileId::MerchantId(merchant_id_or_profile_id),
key_store,
)),
// Since a merchant key store was found with `merchant_id` = `merchant_id_or_profile_id`,
// `merchant_id_or_profile_id` is a valid merchant ID.
// Find a merchant account having `merchant_id` = `merchant_id_or_profile_id`.
Ok(key_store) => {
let merchant_account = store
.find_merchant_account_by_merchant_id(&merchant_id_or_profile_id, &key_store)
.await
.to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound)?;
// Invalid merchant ID, check if we can find a business profile with the identifier
Ok((
MerchantAccountOrBusinessProfile::MerchantAccount(merchant_account),
key_store,
))
}
// Since no merchant key store was found with `merchant_id` = `merchant_id_or_profile_id`,
// `merchant_id_or_profile_id` is not a valid merchant ID.
// Assuming that `merchant_id_or_profile_id` is a business profile ID, try to find a
// business profile having `profile_id` = `merchant_id_or_profile_id`.
Err(error) if error.current_context().is_db_not_found() => {
router_env::logger::debug!(
?error,
@ -195,7 +307,7 @@ async fn determine_identifier_and_get_key_store(
.to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound)?;
Ok((
MerchantIdOrProfileId::ProfileId(business_profile.profile_id),
MerchantAccountOrBusinessProfile::BusinessProfile(business_profile),
key_store,
))
}