feat(webhook): Add is_delivered filter to list initial attempts endpoint (#7344)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Amey Wale
2025-03-28 15:59:31 +05:30
committed by GitHub
parent 4b39dc85d7
commit 55d27ce15f
17 changed files with 194 additions and 12 deletions

View File

@ -133,6 +133,7 @@ pub(crate) async fn create_event_and_trigger_outgoing_webhook(
response: None,
delivery_attempt: Some(delivery_attempt),
metadata: Some(event_metadata),
is_overall_delivery_successful: Some(false),
};
let event_insert_result = state
@ -312,7 +313,7 @@ async fn trigger_webhook_to_merchant(
}
Ok(response) => {
let status_code = response.status();
let _updated_event = update_event_in_storage(
let updated_event = update_event_in_storage(
state.clone(),
merchant_key_store.clone(),
&business_profile.merchant_id,
@ -322,6 +323,14 @@ async fn trigger_webhook_to_merchant(
.await?;
if status_code.is_success() {
update_overall_delivery_status_in_storage(
state.clone(),
merchant_key_store.clone(),
&business_profile.merchant_id,
updated_event,
)
.await?;
success_response_handler(
state.clone(),
&business_profile.merchant_id,
@ -362,7 +371,7 @@ async fn trigger_webhook_to_merchant(
}
Ok(response) => {
let status_code = response.status();
let _updated_event = update_event_in_storage(
let updated_event = update_event_in_storage(
state.clone(),
merchant_key_store.clone(),
&business_profile.merchant_id,
@ -372,6 +381,14 @@ async fn trigger_webhook_to_merchant(
.await?;
if status_code.is_success() {
update_overall_delivery_status_in_storage(
state.clone(),
merchant_key_store.clone(),
&business_profile.merchant_id,
updated_event,
)
.await?;
success_response_handler(
state.clone(),
&business_profile.merchant_id,
@ -837,6 +854,44 @@ async fn update_event_in_storage(
.change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed)
}
async fn update_overall_delivery_status_in_storage(
state: SessionState,
merchant_key_store: domain::MerchantKeyStore,
merchant_id: &common_utils::id_type::MerchantId,
updated_event: domain::Event,
) -> CustomResult<(), errors::WebhooksFlowError> {
let key_manager_state = &(&state).into();
let update_overall_delivery_status = domain::EventUpdate::OverallDeliveryStatusUpdate {
is_overall_delivery_successful: true,
};
let initial_attempt_id = updated_event.initial_attempt_id.as_ref();
let delivery_attempt = updated_event.delivery_attempt;
if let Some((
initial_attempt_id,
enums::WebhookDeliveryAttempt::InitialAttempt
| enums::WebhookDeliveryAttempt::AutomaticRetry,
)) = initial_attempt_id.zip(delivery_attempt)
{
state
.store
.update_event_by_merchant_id_event_id(
key_manager_state,
merchant_id,
initial_attempt_id.as_str(),
update_overall_delivery_status,
&merchant_key_store,
)
.await
.change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed)
.attach_printable("Failed to update initial delivery attempt")?;
}
Ok(())
}
fn increment_webhook_outgoing_received_count(merchant_id: &common_utils::id_type::MerchantId) {
metrics::WEBHOOK_OUTGOING_RECEIVED_COUNT.add(
1,

View File

@ -64,6 +64,7 @@ pub async fn list_initial_delivery_attempts(
created_before,
limit,
offset,
is_delivered
} => {
let limit = match limit {
Some(limit) if limit <= INITIAL_DELIVERY_ATTEMPTS_LIST_MAX_LIMIT => Ok(Some(limit)),
@ -114,6 +115,7 @@ pub async fn list_initial_delivery_attempts(
created_before,
limit,
offset,
is_delivered,
&key_store,
)
.await,
@ -124,6 +126,7 @@ pub async fn list_initial_delivery_attempts(
created_before,
limit,
offset,
is_delivered,
&key_store,
)
.await,
@ -143,12 +146,15 @@ pub async fn list_initial_delivery_attempts(
.unwrap_or(events_list_begin_time);
let created_before = api_constraints.created_before.unwrap_or(now);
let is_delivered = api_constraints.is_delivered;
let total_count = store
.count_initial_events_by_constraints(
&merchant_id,
profile_id,
created_after,
created_before,
is_delivered,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
@ -269,6 +275,7 @@ pub async fn retry_delivery_attempt(
response: None,
delivery_attempt: Some(delivery_attempt),
metadata: event_to_retry.metadata,
is_overall_delivery_successful: Some(false),
};
let event = store

View File

@ -53,6 +53,7 @@ where
created_before: time::PrimitiveDateTime,
limit: Option<i64>,
offset: Option<i64>,
is_delivered: Option<bool>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError>;
@ -81,6 +82,7 @@ where
created_before: time::PrimitiveDateTime,
limit: Option<i64>,
offset: Option<i64>,
is_delivered: Option<bool>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError>;
@ -99,6 +101,7 @@ where
profile_id: Option<common_utils::id_type::ProfileId>,
created_after: time::PrimitiveDateTime,
created_before: time::PrimitiveDateTime,
is_delivered: Option<bool>,
) -> CustomResult<i64, errors::StorageError>;
}
@ -193,6 +196,7 @@ impl EventInterface for Store {
created_before: time::PrimitiveDateTime,
limit: Option<i64>,
offset: Option<i64>,
is_delivered: Option<bool>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
@ -203,6 +207,7 @@ impl EventInterface for Store {
created_before,
limit,
offset,
is_delivered,
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
@ -304,6 +309,7 @@ impl EventInterface for Store {
created_before: time::PrimitiveDateTime,
limit: Option<i64>,
offset: Option<i64>,
is_delivered: Option<bool>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
@ -314,6 +320,7 @@ impl EventInterface for Store {
created_before,
limit,
offset,
is_delivered,
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
@ -366,6 +373,7 @@ impl EventInterface for Store {
profile_id: Option<common_utils::id_type::ProfileId>,
created_after: time::PrimitiveDateTime,
created_before: time::PrimitiveDateTime,
is_delivered: Option<bool>,
) -> CustomResult<i64, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage::Event::count_initial_attempts_by_constraints(
@ -374,6 +382,7 @@ impl EventInterface for Store {
profile_id,
created_after,
created_before,
is_delivered,
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
@ -483,6 +492,7 @@ impl EventInterface for MockDb {
created_before: time::PrimitiveDateTime,
limit: Option<i64>,
offset: Option<i64>,
is_delivered: Option<bool>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
let locked_events = self.events.lock().await;
@ -490,10 +500,12 @@ impl EventInterface for MockDb {
let check = event.merchant_id == Some(merchant_id.to_owned())
&& event.initial_attempt_id.as_ref() == Some(&event.event_id)
&& (event.created_at >= created_after)
&& (event.created_at <= created_before);
&& (event.created_at <= created_before)
&& (event.is_overall_delivery_successful == is_delivered);
check
});
let offset: usize = if let Some(offset) = offset {
if offset < 0 {
Err(errors::StorageError::MockDbError)?;
@ -614,6 +626,7 @@ impl EventInterface for MockDb {
created_before: time::PrimitiveDateTime,
limit: Option<i64>,
offset: Option<i64>,
is_delivered: Option<bool>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
let locked_events = self.events.lock().await;
@ -621,7 +634,8 @@ impl EventInterface for MockDb {
let check = event.business_profile_id == Some(profile_id.to_owned())
&& event.initial_attempt_id.as_ref() == Some(&event.event_id)
&& (event.created_at >= created_after)
&& (event.created_at <= created_before);
&& (event.created_at <= created_before)
&& (event.is_overall_delivery_successful == is_delivered);
check
});
@ -694,6 +708,12 @@ impl EventInterface for MockDb {
event_to_update.is_webhook_notified = is_webhook_notified;
event_to_update.response = response.map(Into::into);
}
domain::EventUpdate::OverallDeliveryStatusUpdate {
is_overall_delivery_successful,
} => {
event_to_update.is_overall_delivery_successful =
Some(is_overall_delivery_successful)
}
}
event_to_update
@ -713,6 +733,7 @@ impl EventInterface for MockDb {
profile_id: Option<common_utils::id_type::ProfileId>,
created_after: time::PrimitiveDateTime,
created_before: time::PrimitiveDateTime,
is_delivered: Option<bool>,
) -> CustomResult<i64, errors::StorageError> {
let locked_events = self.events.lock().await;
@ -721,7 +742,8 @@ impl EventInterface for MockDb {
&& (event.merchant_id == Some(merchant_id.to_owned()))
&& (event.business_profile_id == profile_id)
&& (event.created_at >= created_after)
&& (event.created_at <= created_before);
&& (event.created_at <= created_before)
&& (event.is_overall_delivery_successful == is_delivered);
check
});
@ -843,6 +865,7 @@ mod tests {
)
.unwrap(),
}),
is_overall_delivery_successful: Some(false),
},
&merchant_key_store,
)

View File

@ -772,6 +772,7 @@ impl EventInterface for KafkaStore {
created_before: PrimitiveDateTime,
limit: Option<i64>,
offset: Option<i64>,
is_delivered: Option<bool>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
self.diesel_store
@ -782,6 +783,7 @@ impl EventInterface for KafkaStore {
created_before,
limit,
offset,
is_delivered,
merchant_key_store,
)
.await
@ -829,6 +831,7 @@ impl EventInterface for KafkaStore {
created_before: PrimitiveDateTime,
limit: Option<i64>,
offset: Option<i64>,
is_delivered: Option<bool>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
self.diesel_store
@ -839,6 +842,7 @@ impl EventInterface for KafkaStore {
created_before,
limit,
offset,
is_delivered,
merchant_key_store,
)
.await
@ -869,6 +873,7 @@ impl EventInterface for KafkaStore {
profile_id: Option<id_type::ProfileId>,
created_after: PrimitiveDateTime,
created_before: PrimitiveDateTime,
is_delivered: Option<bool>,
) -> CustomResult<i64, errors::StorageError> {
self.diesel_store
.count_initial_events_by_constraints(
@ -876,6 +881,7 @@ impl EventInterface for KafkaStore {
profile_id,
created_after,
created_before,
is_delivered,
)
.await
}

View File

@ -19,24 +19,58 @@ use crate::{
#[derive(Clone, Debug, router_derive::ToEncryption)]
pub struct Event {
/// A string that uniquely identifies the event.
pub event_id: String,
/// Represents the type of event for the webhook.
pub event_type: EventType,
/// Represents the class of event for the webhook.
pub event_class: EventClass,
/// Indicates whether the current webhook delivery was successful.
pub is_webhook_notified: bool,
/// Reference to the object for which the webhook was created.
pub primary_object_id: String,
/// Type of the object type for which the webhook was created.
pub primary_object_type: EventObjectType,
/// The timestamp when the webhook was created.
pub created_at: time::PrimitiveDateTime,
/// Merchant Account identifier to which the object is associated with.
pub merchant_id: Option<common_utils::id_type::MerchantId>,
/// Business Profile identifier to which the object is associated with.
pub business_profile_id: Option<common_utils::id_type::ProfileId>,
/// The timestamp when the primary object was created.
pub primary_object_created_at: Option<time::PrimitiveDateTime>,
/// This allows the event to be uniquely identified to prevent multiple processing.
pub idempotent_event_id: Option<String>,
/// Links to the initial attempt of the event.
pub initial_attempt_id: Option<String>,
/// This field contains the encrypted request data sent as part of the event.
#[encrypt]
pub request: Option<Encryptable<Secret<String>>>,
/// This field contains the encrypted response data received as part of the event.
#[encrypt]
pub response: Option<Encryptable<Secret<String>>>,
/// Represents the event delivery type.
pub delivery_attempt: Option<WebhookDeliveryAttempt>,
/// Holds any additional data related to the event.
pub metadata: Option<EventMetadata>,
/// Indicates whether the event was ultimately delivered.
pub is_overall_delivery_successful: Option<bool>,
}
#[derive(Debug)]
@ -45,6 +79,9 @@ pub enum EventUpdate {
is_webhook_notified: bool,
response: OptionalEncryptableSecretString,
},
OverallDeliveryStatusUpdate {
is_overall_delivery_successful: bool,
},
}
impl From<EventUpdate> for EventUpdateInternal {
@ -56,6 +93,14 @@ impl From<EventUpdate> for EventUpdateInternal {
} => Self {
is_webhook_notified: Some(is_webhook_notified),
response: response.map(Into::into),
is_overall_delivery_successful: None,
},
EventUpdate::OverallDeliveryStatusUpdate {
is_overall_delivery_successful,
} => Self {
is_webhook_notified: None,
response: None,
is_overall_delivery_successful: Some(is_overall_delivery_successful),
},
}
}
@ -84,6 +129,7 @@ impl super::behaviour::Conversion for Event {
response: self.response.map(Into::into),
delivery_attempt: self.delivery_attempt,
metadata: self.metadata,
is_overall_delivery_successful: self.is_overall_delivery_successful,
})
}
@ -133,6 +179,7 @@ impl super::behaviour::Conversion for Event {
response: encryptable_event.response,
delivery_attempt: item.delivery_attempt,
metadata: item.metadata,
is_overall_delivery_successful: item.is_overall_delivery_successful,
})
}
@ -154,6 +201,7 @@ impl super::behaviour::Conversion for Event {
response: self.response.map(Into::into),
delivery_attempt: self.delivery_attempt,
metadata: self.metadata,
is_overall_delivery_successful: self.is_overall_delivery_successful,
})
}
}

View File

@ -1936,6 +1936,7 @@ impl ForeignTryFrom<api_types::webhook_events::EventListConstraints>
created_before: item.created_before,
limit: item.limit.map(i64::from),
offset: item.offset.map(i64::from),
is_delivered: item.is_delivered,
}),
}
}
@ -1971,7 +1972,7 @@ impl TryFrom<domain::Event> for api_models::webhook_events::EventListItemRespons
object_id: item.primary_object_id,
event_type: item.event_type,
event_class: item.event_class,
is_delivery_successful: item.is_webhook_notified,
is_delivery_successful: item.is_overall_delivery_successful,
initial_attempt_id,
created: item.created_at,
})

View File

@ -118,6 +118,7 @@ impl ProcessTrackerWorkflow<SessionState> for OutgoingWebhookRetryWorkflow {
response: None,
delivery_attempt: Some(delivery_attempt),
metadata: initial_event.metadata,
is_overall_delivery_successful: Some(false),
};
let event = db