feat(webhooks): Adding event search option in the webhooks page (#9907)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Venu Madhav Bandarupalli
2025-10-24 13:32:04 +05:30
committed by GitHub
parent 8fd8884d5c
commit b0d5a1b046
7 changed files with 98 additions and 45 deletions

View File

@ -16103,6 +16103,11 @@
"description": "Filter all events associated with the specified object identifier (Payment Intent ID,\nRefund ID, etc.)",
"nullable": true
},
"event_id": {
"type": "string",
"description": "Filter all events associated with the specified Event_id",
"nullable": true
},
"profile_id": {
"type": "string",
"description": "Filter all events associated with the specified business profile ID.",

View File

@ -27,6 +27,9 @@ pub struct EventListConstraints {
/// Refund ID, etc.)
pub object_id: Option<String>,
/// Filter all events associated with the specified Event_id
pub event_id: Option<String>,
/// Filter all events associated with the specified business profile ID.
#[schema(value_type = Option<String>)]
pub profile_id: Option<common_utils::id_type::ProfileId>,
@ -53,6 +56,7 @@ pub enum EventListConstraintsInternal {
},
ObjectIdFilter {
object_id: String,
event_id: String,
},
}

View File

@ -46,10 +46,11 @@ impl Event {
.await
}
pub async fn list_initial_attempts_by_merchant_id_primary_object_id(
pub async fn list_initial_attempts_by_merchant_id_primary_object_id_or_initial_attempt_id(
conn: &PgPooledConn,
merchant_id: &common_utils::id_type::MerchantId,
primary_object_id: &str,
initial_attempt_id: &str,
) -> StorageResult<Vec<Self>> {
generics::generic_filter::<<Self as HasTable>::Table, _, _, _>(
conn,
@ -57,7 +58,11 @@ impl Event {
.nullable()
.eq(dsl::initial_attempt_id) // Filter initial attempts only
.and(dsl::merchant_id.eq(merchant_id.to_owned()))
.and(dsl::primary_object_id.eq(primary_object_id.to_owned())),
.and(
dsl::primary_object_id
.eq(primary_object_id.to_owned())
.or(dsl::initial_attempt_id.eq(initial_attempt_id.to_owned())),
),
None,
None,
Some(dsl::created_at.desc()),
@ -129,10 +134,11 @@ impl Event {
.await
}
pub async fn list_initial_attempts_by_profile_id_primary_object_id(
pub async fn list_initial_attempts_by_profile_id_primary_object_id_or_initial_attempt_id(
conn: &PgPooledConn,
profile_id: &common_utils::id_type::ProfileId,
primary_object_id: &str,
initial_attempt_id: &str,
) -> StorageResult<Vec<Self>> {
generics::generic_filter::<<Self as HasTable>::Table, _, _, _>(
conn,
@ -140,7 +146,11 @@ impl Event {
.nullable()
.eq(dsl::initial_attempt_id) // Filter initial attempts only
.and(dsl::business_profile_id.eq(profile_id.to_owned()))
.and(dsl::primary_object_id.eq(primary_object_id.to_owned())),
.and(
dsl::primary_object_id
.eq(primary_object_id.to_owned())
.or(dsl::initial_attempt_id.eq(initial_attempt_id.to_owned())),
),
None,
None,
Some(dsl::created_at.desc()),

View File

@ -43,31 +43,35 @@ pub async fn list_initial_delivery_attempts(
(now.date() - time::Duration::days(INITIAL_DELIVERY_ATTEMPTS_LIST_MAX_DAYS)).midnight();
let (events, total_count) = match constraints {
api_models::webhook_events::EventListConstraintsInternal::ObjectIdFilter { object_id } => {
let events = match account {
MerchantAccountOrProfile::MerchantAccount(merchant_account) => {
store
.list_initial_events_by_merchant_id_primary_object_id(
api_models::webhook_events::EventListConstraintsInternal::ObjectIdFilter {
object_id,
event_id,
} => {
let events =
match account {
MerchantAccountOrProfile::MerchantAccount(merchant_account) => store
.list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id(
key_manager_state,
merchant_account.get_id(),
&object_id,
&event_id,
&key_store,
)
.await
.await,
MerchantAccountOrProfile::Profile(business_profile) => {
store
.list_initial_events_by_profile_id_primary_object_or_initial_attempt_id(
key_manager_state,
business_profile.get_id(),
&object_id,
&event_id,
&key_store,
)
.await
}
}
MerchantAccountOrProfile::Profile(business_profile) => {
store
.list_initial_events_by_profile_id_primary_object_id(
key_manager_state,
business_profile.get_id(),
&object_id,
&key_store,
)
.await
}
}
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to list events with specified constraints")?;
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to list events with specified constraints")?;
let total_count = i64::try_from(events.len())
.change_context(errors::ApiErrorResponse::InternalServerError)

View File

@ -46,11 +46,12 @@ where
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<domain::Event, errors::StorageError>;
async fn list_initial_events_by_merchant_id_primary_object_id(
async fn list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id(
&self,
state: &KeyManagerState,
merchant_id: &common_utils::id_type::MerchantId,
primary_object_id: &str,
initial_attempt_id: &str,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError>;
@ -76,11 +77,12 @@ where
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError>;
async fn list_initial_events_by_profile_id_primary_object_id(
async fn list_initial_events_by_profile_id_primary_object_or_initial_attempt_id(
&self,
state: &KeyManagerState,
profile_id: &common_utils::id_type::ProfileId,
primary_object_id: &str,
initial_attempt_id: &str,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError>;
@ -191,18 +193,20 @@ impl EventInterface for Store {
}
#[instrument(skip_all)]
async fn list_initial_events_by_merchant_id_primary_object_id(
async fn list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id(
&self,
state: &KeyManagerState,
merchant_id: &common_utils::id_type::MerchantId,
primary_object_id: &str,
initial_attempt_id: &str,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage::Event::list_initial_attempts_by_merchant_id_primary_object_id(
storage::Event::list_initial_attempts_by_merchant_id_primary_object_id_or_initial_attempt_id(
&conn,
merchant_id,
primary_object_id,
initial_attempt_id,
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
@ -306,18 +310,20 @@ impl EventInterface for Store {
}
#[instrument(skip_all)]
async fn list_initial_events_by_profile_id_primary_object_id(
async fn list_initial_events_by_profile_id_primary_object_or_initial_attempt_id(
&self,
state: &KeyManagerState,
profile_id: &common_utils::id_type::ProfileId,
primary_object_id: &str,
initial_attempt_id: &str,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage::Event::list_initial_attempts_by_profile_id_primary_object_id(
storage::Event::list_initial_attempts_by_profile_id_primary_object_id_or_initial_attempt_id(
&conn,
profile_id,
primary_object_id,
initial_attempt_id,
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
@ -527,11 +533,12 @@ impl EventInterface for MockDb {
)
}
async fn list_initial_events_by_merchant_id_primary_object_id(
async fn list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id(
&self,
state: &KeyManagerState,
merchant_id: &common_utils::id_type::MerchantId,
primary_object_id: &str,
initial_attempt_id: &str,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
let locked_events = self.events.lock().await;
@ -539,8 +546,9 @@ impl EventInterface for MockDb {
.iter()
.filter(|event| {
event.merchant_id == Some(merchant_id.to_owned())
&& event.initial_attempt_id.as_ref() == Some(&event.event_id)
&& event.primary_object_id == primary_object_id
&& event.initial_attempt_id.as_deref() == Some(&event.event_id)
&& (event.primary_object_id == primary_object_id
|| event.initial_attempt_id.as_deref() == Some(initial_attempt_id))
})
.cloned()
.collect::<Vec<_>>();
@ -663,11 +671,12 @@ impl EventInterface for MockDb {
Ok(domain_events)
}
async fn list_initial_events_by_profile_id_primary_object_id(
async fn list_initial_events_by_profile_id_primary_object_or_initial_attempt_id(
&self,
state: &KeyManagerState,
profile_id: &common_utils::id_type::ProfileId,
primary_object_id: &str,
initial_attempt_id: &str,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
let locked_events = self.events.lock().await;
@ -676,7 +685,8 @@ impl EventInterface for MockDb {
.filter(|event| {
event.business_profile_id == Some(profile_id.to_owned())
&& event.initial_attempt_id.as_ref() == Some(&event.event_id)
&& event.primary_object_id == primary_object_id
&& (event.primary_object_id == primary_object_id
|| event.initial_attempt_id.as_deref() == Some(initial_attempt_id))
})
.cloned()
.collect::<Vec<_>>();
@ -1310,6 +1320,7 @@ mod tests {
let event_type = enums::EventType::PaymentSucceeded;
let event_class = enums::EventClass::Payments;
let primary_object_id = Arc::new("concurrent_payment_id".to_string());
let initial_attempt_id = Arc::new("initial_attempt_id".to_string());
let primary_object_type = enums::EventObjectType::PaymentDetails;
let payment_id = common_utils::id_type::PaymentId::try_from(std::borrow::Cow::Borrowed(
"pay_mbabizu24mvu3mela5njyhpit10",
@ -1462,10 +1473,11 @@ mod tests {
// Collect all initial-attempt events for this payment
let events = state
.store
.list_initial_events_by_merchant_id_primary_object_id(
.list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id(
key_manager_state,
&business_profile.merchant_id,
&primary_object_id.clone(),
&initial_attempt_id.clone(),
merchant_context.get_merchant_key_store(),
)
.await?;

View File

@ -782,18 +782,20 @@ impl EventInterface for KafkaStore {
.await
}
async fn list_initial_events_by_merchant_id_primary_object_id(
async fn list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id(
&self,
state: &KeyManagerState,
merchant_id: &id_type::MerchantId,
primary_object_id: &str,
initial_attempt_id: &str,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
self.diesel_store
.list_initial_events_by_merchant_id_primary_object_id(
.list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id(
state,
merchant_id,
primary_object_id,
initial_attempt_id,
merchant_key_store,
)
.await
@ -843,18 +845,20 @@ impl EventInterface for KafkaStore {
.await
}
async fn list_initial_events_by_profile_id_primary_object_id(
async fn list_initial_events_by_profile_id_primary_object_or_initial_attempt_id(
&self,
state: &KeyManagerState,
profile_id: &id_type::ProfileId,
primary_object_id: &str,
initial_attempt_id: &str,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
self.diesel_store
.list_initial_events_by_profile_id_primary_object_id(
.list_initial_events_by_profile_id_primary_object_or_initial_attempt_id(
state,
profile_id,
primary_object_id,
initial_attempt_id,
merchant_key_store,
)
.await

View File

@ -1798,7 +1798,7 @@ impl ForeignTryFrom<api_types::webhook_events::EventListConstraints>
fn foreign_try_from(
item: api_types::webhook_events::EventListConstraints,
) -> Result<Self, Self::Error> {
if item.object_id.is_some()
if (item.object_id.is_some() || item.event_id.is_some())
&& (item.created_after.is_some()
|| item.created_before.is_some()
|| item.limit.is_some()
@ -1808,15 +1808,29 @@ impl ForeignTryFrom<api_types::webhook_events::EventListConstraints>
{
return Err(report!(errors::ApiErrorResponse::PreconditionFailed {
message:
"Either only `object_id` must be specified, or one or more of \
`created_after`, `created_before`, `limit`, `offset`, `event_classes` and `event_types` must be specified"
"Either only `object_id` or `event_id` must be specified, or one or more of \
`created_after`, `created_before`, `limit`, `offset`, `event_classes` and `event_types` must be specified"
.to_string()
}));
}
match item.object_id {
Some(object_id) => Ok(Self::ObjectIdFilter { object_id }),
None => Ok(Self::GenericFilter {
match (item.object_id.clone(), item.event_id.clone()) {
(Some(object_id), Some(event_id)) => Ok(Self::ObjectIdFilter {
object_id,
event_id,
}),
(Some(object_id), None) => Ok(Self::ObjectIdFilter {
event_id: object_id.clone(),
object_id,
}),
(None, Some(event_id)) => Ok(Self::ObjectIdFilter {
object_id: event_id.clone(),
event_id,
}),
(None, None) => Ok(Self::GenericFilter {
created_after: item.created_after,
created_before: item.created_before,
limit: item.limit.map(i64::from),