From b0d5a1b0463048e4d90b60dcb9674d24e2c0179c Mon Sep 17 00:00:00 2001 From: Venu Madhav Bandarupalli <110053886+VenuMadhav2541@users.noreply.github.com> Date: Fri, 24 Oct 2025 13:32:04 +0530 Subject: [PATCH] feat(webhooks): Adding event search option in the webhooks page (#9907) Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> --- api-reference/v1/openapi_spec_v1.json | 5 +++ crates/api_models/src/webhook_events.rs | 4 ++ crates/diesel_models/src/query/events.rs | 18 ++++++-- .../src/core/webhooks/webhook_events.rs | 42 ++++++++++--------- crates/router/src/db/events.rs | 36 ++++++++++------ crates/router/src/db/kafka_store.rs | 12 ++++-- crates/router/src/types/transformers.rs | 26 +++++++++--- 7 files changed, 98 insertions(+), 45 deletions(-) diff --git a/api-reference/v1/openapi_spec_v1.json b/api-reference/v1/openapi_spec_v1.json index 617dea4b44..5f12cbb1b2 100644 --- a/api-reference/v1/openapi_spec_v1.json +++ b/api-reference/v1/openapi_spec_v1.json @@ -16103,6 +16103,11 @@ "description": "Filter all events associated with the specified object identifier (Payment Intent ID,\nRefund ID, etc.)", "nullable": true }, + "event_id": { + "type": "string", + "description": "Filter all events associated with the specified Event_id", + "nullable": true + }, "profile_id": { "type": "string", "description": "Filter all events associated with the specified business profile ID.", diff --git a/crates/api_models/src/webhook_events.rs b/crates/api_models/src/webhook_events.rs index 4c216dd66d..42a3010c2b 100644 --- a/crates/api_models/src/webhook_events.rs +++ b/crates/api_models/src/webhook_events.rs @@ -27,6 +27,9 @@ pub struct EventListConstraints { /// Refund ID, etc.) pub object_id: Option, + /// Filter all events associated with the specified Event_id + pub event_id: Option, + /// Filter all events associated with the specified business profile ID. #[schema(value_type = Option)] pub profile_id: Option, @@ -53,6 +56,7 @@ pub enum EventListConstraintsInternal { }, ObjectIdFilter { object_id: String, + event_id: String, }, } diff --git a/crates/diesel_models/src/query/events.rs b/crates/diesel_models/src/query/events.rs index e9a0b76045..f2283e17f3 100644 --- a/crates/diesel_models/src/query/events.rs +++ b/crates/diesel_models/src/query/events.rs @@ -46,10 +46,11 @@ impl Event { .await } - pub async fn list_initial_attempts_by_merchant_id_primary_object_id( + pub async fn list_initial_attempts_by_merchant_id_primary_object_id_or_initial_attempt_id( conn: &PgPooledConn, merchant_id: &common_utils::id_type::MerchantId, primary_object_id: &str, + initial_attempt_id: &str, ) -> StorageResult> { generics::generic_filter::<::Table, _, _, _>( conn, @@ -57,7 +58,11 @@ impl Event { .nullable() .eq(dsl::initial_attempt_id) // Filter initial attempts only .and(dsl::merchant_id.eq(merchant_id.to_owned())) - .and(dsl::primary_object_id.eq(primary_object_id.to_owned())), + .and( + dsl::primary_object_id + .eq(primary_object_id.to_owned()) + .or(dsl::initial_attempt_id.eq(initial_attempt_id.to_owned())), + ), None, None, Some(dsl::created_at.desc()), @@ -129,10 +134,11 @@ impl Event { .await } - pub async fn list_initial_attempts_by_profile_id_primary_object_id( + pub async fn list_initial_attempts_by_profile_id_primary_object_id_or_initial_attempt_id( conn: &PgPooledConn, profile_id: &common_utils::id_type::ProfileId, primary_object_id: &str, + initial_attempt_id: &str, ) -> StorageResult> { generics::generic_filter::<::Table, _, _, _>( conn, @@ -140,7 +146,11 @@ impl Event { .nullable() .eq(dsl::initial_attempt_id) // Filter initial attempts only .and(dsl::business_profile_id.eq(profile_id.to_owned())) - .and(dsl::primary_object_id.eq(primary_object_id.to_owned())), + .and( + dsl::primary_object_id + .eq(primary_object_id.to_owned()) + .or(dsl::initial_attempt_id.eq(initial_attempt_id.to_owned())), + ), None, None, Some(dsl::created_at.desc()), diff --git a/crates/router/src/core/webhooks/webhook_events.rs b/crates/router/src/core/webhooks/webhook_events.rs index 700295af0e..0df2921998 100644 --- a/crates/router/src/core/webhooks/webhook_events.rs +++ b/crates/router/src/core/webhooks/webhook_events.rs @@ -43,31 +43,35 @@ pub async fn list_initial_delivery_attempts( (now.date() - time::Duration::days(INITIAL_DELIVERY_ATTEMPTS_LIST_MAX_DAYS)).midnight(); let (events, total_count) = match constraints { - api_models::webhook_events::EventListConstraintsInternal::ObjectIdFilter { object_id } => { - let events = match account { - MerchantAccountOrProfile::MerchantAccount(merchant_account) => { - store - .list_initial_events_by_merchant_id_primary_object_id( + api_models::webhook_events::EventListConstraintsInternal::ObjectIdFilter { + object_id, + event_id, + } => { + let events = + match account { + MerchantAccountOrProfile::MerchantAccount(merchant_account) => store + .list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id( key_manager_state, merchant_account.get_id(), &object_id, + &event_id, &key_store, ) - .await + .await, + MerchantAccountOrProfile::Profile(business_profile) => { + store + .list_initial_events_by_profile_id_primary_object_or_initial_attempt_id( + key_manager_state, + business_profile.get_id(), + &object_id, + &event_id, + &key_store, + ) + .await + } } - MerchantAccountOrProfile::Profile(business_profile) => { - store - .list_initial_events_by_profile_id_primary_object_id( - key_manager_state, - business_profile.get_id(), - &object_id, - &key_store, - ) - .await - } - } - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Failed to list events with specified constraints")?; + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to list events with specified constraints")?; let total_count = i64::try_from(events.len()) .change_context(errors::ApiErrorResponse::InternalServerError) diff --git a/crates/router/src/db/events.rs b/crates/router/src/db/events.rs index aab325fc8b..70b9c6b6f8 100644 --- a/crates/router/src/db/events.rs +++ b/crates/router/src/db/events.rs @@ -46,11 +46,12 @@ where merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult; - async fn list_initial_events_by_merchant_id_primary_object_id( + async fn list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id( &self, state: &KeyManagerState, merchant_id: &common_utils::id_type::MerchantId, primary_object_id: &str, + initial_attempt_id: &str, merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult, errors::StorageError>; @@ -76,11 +77,12 @@ where merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult, errors::StorageError>; - async fn list_initial_events_by_profile_id_primary_object_id( + async fn list_initial_events_by_profile_id_primary_object_or_initial_attempt_id( &self, state: &KeyManagerState, profile_id: &common_utils::id_type::ProfileId, primary_object_id: &str, + initial_attempt_id: &str, merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult, errors::StorageError>; @@ -191,18 +193,20 @@ impl EventInterface for Store { } #[instrument(skip_all)] - async fn list_initial_events_by_merchant_id_primary_object_id( + async fn list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id( &self, state: &KeyManagerState, merchant_id: &common_utils::id_type::MerchantId, primary_object_id: &str, + initial_attempt_id: &str, merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult, errors::StorageError> { let conn = connection::pg_connection_read(self).await?; - storage::Event::list_initial_attempts_by_merchant_id_primary_object_id( + storage::Event::list_initial_attempts_by_merchant_id_primary_object_id_or_initial_attempt_id( &conn, merchant_id, primary_object_id, + initial_attempt_id, ) .await .map_err(|error| report!(errors::StorageError::from(error))) @@ -306,18 +310,20 @@ impl EventInterface for Store { } #[instrument(skip_all)] - async fn list_initial_events_by_profile_id_primary_object_id( + async fn list_initial_events_by_profile_id_primary_object_or_initial_attempt_id( &self, state: &KeyManagerState, profile_id: &common_utils::id_type::ProfileId, primary_object_id: &str, + initial_attempt_id: &str, merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult, errors::StorageError> { let conn = connection::pg_connection_read(self).await?; - storage::Event::list_initial_attempts_by_profile_id_primary_object_id( + storage::Event::list_initial_attempts_by_profile_id_primary_object_id_or_initial_attempt_id( &conn, profile_id, primary_object_id, + initial_attempt_id, ) .await .map_err(|error| report!(errors::StorageError::from(error))) @@ -527,11 +533,12 @@ impl EventInterface for MockDb { ) } - async fn list_initial_events_by_merchant_id_primary_object_id( + async fn list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id( &self, state: &KeyManagerState, merchant_id: &common_utils::id_type::MerchantId, primary_object_id: &str, + initial_attempt_id: &str, merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult, errors::StorageError> { let locked_events = self.events.lock().await; @@ -539,8 +546,9 @@ impl EventInterface for MockDb { .iter() .filter(|event| { event.merchant_id == Some(merchant_id.to_owned()) - && event.initial_attempt_id.as_ref() == Some(&event.event_id) - && event.primary_object_id == primary_object_id + && event.initial_attempt_id.as_deref() == Some(&event.event_id) + && (event.primary_object_id == primary_object_id + || event.initial_attempt_id.as_deref() == Some(initial_attempt_id)) }) .cloned() .collect::>(); @@ -663,11 +671,12 @@ impl EventInterface for MockDb { Ok(domain_events) } - async fn list_initial_events_by_profile_id_primary_object_id( + async fn list_initial_events_by_profile_id_primary_object_or_initial_attempt_id( &self, state: &KeyManagerState, profile_id: &common_utils::id_type::ProfileId, primary_object_id: &str, + initial_attempt_id: &str, merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult, errors::StorageError> { let locked_events = self.events.lock().await; @@ -676,7 +685,8 @@ impl EventInterface for MockDb { .filter(|event| { event.business_profile_id == Some(profile_id.to_owned()) && event.initial_attempt_id.as_ref() == Some(&event.event_id) - && event.primary_object_id == primary_object_id + && (event.primary_object_id == primary_object_id + || event.initial_attempt_id.as_deref() == Some(initial_attempt_id)) }) .cloned() .collect::>(); @@ -1310,6 +1320,7 @@ mod tests { let event_type = enums::EventType::PaymentSucceeded; let event_class = enums::EventClass::Payments; let primary_object_id = Arc::new("concurrent_payment_id".to_string()); + let initial_attempt_id = Arc::new("initial_attempt_id".to_string()); let primary_object_type = enums::EventObjectType::PaymentDetails; let payment_id = common_utils::id_type::PaymentId::try_from(std::borrow::Cow::Borrowed( "pay_mbabizu24mvu3mela5njyhpit10", @@ -1462,10 +1473,11 @@ mod tests { // Collect all initial-attempt events for this payment let events = state .store - .list_initial_events_by_merchant_id_primary_object_id( + .list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id( key_manager_state, &business_profile.merchant_id, &primary_object_id.clone(), + &initial_attempt_id.clone(), merchant_context.get_merchant_key_store(), ) .await?; diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index 063a660531..1ea7da228b 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -782,18 +782,20 @@ impl EventInterface for KafkaStore { .await } - async fn list_initial_events_by_merchant_id_primary_object_id( + async fn list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id( &self, state: &KeyManagerState, merchant_id: &id_type::MerchantId, primary_object_id: &str, + initial_attempt_id: &str, merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult, errors::StorageError> { self.diesel_store - .list_initial_events_by_merchant_id_primary_object_id( + .list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id( state, merchant_id, primary_object_id, + initial_attempt_id, merchant_key_store, ) .await @@ -843,18 +845,20 @@ impl EventInterface for KafkaStore { .await } - async fn list_initial_events_by_profile_id_primary_object_id( + async fn list_initial_events_by_profile_id_primary_object_or_initial_attempt_id( &self, state: &KeyManagerState, profile_id: &id_type::ProfileId, primary_object_id: &str, + initial_attempt_id: &str, merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult, errors::StorageError> { self.diesel_store - .list_initial_events_by_profile_id_primary_object_id( + .list_initial_events_by_profile_id_primary_object_or_initial_attempt_id( state, profile_id, primary_object_id, + initial_attempt_id, merchant_key_store, ) .await diff --git a/crates/router/src/types/transformers.rs b/crates/router/src/types/transformers.rs index d13735633f..579dbb3eeb 100644 --- a/crates/router/src/types/transformers.rs +++ b/crates/router/src/types/transformers.rs @@ -1798,7 +1798,7 @@ impl ForeignTryFrom fn foreign_try_from( item: api_types::webhook_events::EventListConstraints, ) -> Result { - if item.object_id.is_some() + if (item.object_id.is_some() || item.event_id.is_some()) && (item.created_after.is_some() || item.created_before.is_some() || item.limit.is_some() @@ -1808,15 +1808,29 @@ impl ForeignTryFrom { return Err(report!(errors::ApiErrorResponse::PreconditionFailed { message: - "Either only `object_id` must be specified, or one or more of \ - `created_after`, `created_before`, `limit`, `offset`, `event_classes` and `event_types` must be specified" + "Either only `object_id` or `event_id` must be specified, or one or more of \ + `created_after`, `created_before`, `limit`, `offset`, `event_classes` and `event_types` must be specified" .to_string() })); } - match item.object_id { - Some(object_id) => Ok(Self::ObjectIdFilter { object_id }), - None => Ok(Self::GenericFilter { + match (item.object_id.clone(), item.event_id.clone()) { + (Some(object_id), Some(event_id)) => Ok(Self::ObjectIdFilter { + object_id, + event_id, + }), + + (Some(object_id), None) => Ok(Self::ObjectIdFilter { + event_id: object_id.clone(), + object_id, + }), + + (None, Some(event_id)) => Ok(Self::ObjectIdFilter { + object_id: event_id.clone(), + event_id, + }), + + (None, None) => Ok(Self::GenericFilter { created_after: item.created_after, created_before: item.created_before, limit: item.limit.map(i64::from),