feat(webhook): add filter by event class and type (#7275)

Co-authored-by: Aishwariyaa Anand <aishwariyaa.anand@Aishwariyaa-Anand-C3PGW02T6Y.local>
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Aishwariyaa Anand
2025-04-11 15:45:23 +05:30
committed by GitHub
parent e677b1389f
commit 989b2c34e1
12 changed files with 346 additions and 219 deletions

View File

@ -1,4 +1,6 @@
use common_utils::{self, fp_utils};
use std::collections::HashSet;
use common_utils::{self, errors::CustomResult, fp_utils};
use error_stack::ResultExt;
use masking::PeekInterface;
use router_env::{instrument, tracing};
@ -40,6 +42,8 @@ pub async fn list_initial_delivery_attempts(
let events_list_begin_time =
(now.date() - time::Duration::days(INITIAL_DELIVERY_ATTEMPTS_LIST_MAX_DAYS)).midnight();
let mut updated_event_types: HashSet<common_enums::EventType> = HashSet::new();
let events = match constraints {
api_models::webhook_events::EventListConstraintsInternal::ObjectIdFilter { object_id } => {
match account {
@ -64,6 +68,8 @@ pub async fn list_initial_delivery_attempts(
created_before,
limit,
offset,
event_classes,
event_types,
is_delivered
} => {
let limit = match limit {
@ -80,6 +86,12 @@ pub async fn list_initial_delivery_attempts(
_ => None,
};
let event_classes = event_classes.unwrap_or(HashSet::new());
updated_event_types = event_types.unwrap_or(HashSet::new());
if !event_classes.is_empty() {
updated_event_types = finalize_event_types(event_classes, updated_event_types).await?;
}
fp_utils::when(!created_after.zip(created_before).map(|(created_after,created_before)| created_after<=created_before).unwrap_or(true), || {
Err(errors::ApiErrorResponse::InvalidRequestData { message: "The `created_after` timestamp must be an earlier timestamp compared to the `created_before` timestamp".to_string() })
})?;
@ -115,6 +127,7 @@ pub async fn list_initial_delivery_attempts(
created_before,
limit,
offset,
updated_event_types.clone(),
is_delivered,
&key_store,
)
@ -126,6 +139,7 @@ pub async fn list_initial_delivery_attempts(
created_before,
limit,
offset,
updated_event_types.clone(),
is_delivered,
&key_store,
)
@ -154,6 +168,7 @@ pub async fn list_initial_delivery_attempts(
profile_id,
created_after,
created_before,
updated_event_types,
is_delivered,
)
.await
@ -384,3 +399,48 @@ async fn get_account_and_key_store(
}
}
}
async fn finalize_event_types(
event_classes: HashSet<common_enums::EventClass>,
mut event_types: HashSet<common_enums::EventType>,
) -> CustomResult<HashSet<common_enums::EventType>, errors::ApiErrorResponse> {
// Examples:
// 1. event_classes = ["payments", "refunds"], event_types = ["payment_succeeded"]
// 2. event_classes = ["refunds"], event_types = ["payment_succeeded"]
// Create possible_event_types based on event_classes
// Example 1: possible_event_types = ["payment_*", "refund_*"]
// Example 2: possible_event_types = ["refund_*"]
let possible_event_types = event_classes
.clone()
.into_iter()
.flat_map(common_enums::EventClass::event_types)
.collect::<HashSet<_>>();
if event_types.is_empty() {
return Ok(possible_event_types);
}
// Extend event_types if disjoint with event_classes
// Example 1: event_types = ["payment_succeeded", "refund_*"], is_disjoint is used to extend "refund_*" and ignore "payment_*".
// Example 2: event_types = ["payment_succeeded", "refund_*"], is_disjoint is only used to extend "refund_*".
event_classes.into_iter().for_each(|class| {
let valid_event_types = class.event_types();
if event_types.is_disjoint(&valid_event_types) {
event_types.extend(valid_event_types);
}
});
// Validate event_types is a subset of possible_event_types
// Example 1: event_types is a subset of possible_event_types (valid)
// Example 2: event_types is not a subset of possible_event_types (error due to "payment_succeeded")
if !event_types.is_subset(&possible_event_types) {
return Err(error_stack::report!(
errors::ApiErrorResponse::InvalidRequestData {
message: "`event_types` must be a subset of `event_classes`".to_string(),
}
));
}
Ok(event_types.clone())
}

View File

@ -1,3 +1,5 @@
use std::collections::HashSet;
use common_utils::{ext_traits::AsyncExt, types::keymanager::KeyManagerState};
use error_stack::{report, ResultExt};
use router_env::{instrument, tracing};
@ -53,6 +55,7 @@ where
created_before: time::PrimitiveDateTime,
limit: Option<i64>,
offset: Option<i64>,
event_types: HashSet<common_enums::EventType>,
is_delivered: Option<bool>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError>;
@ -82,6 +85,7 @@ where
created_before: time::PrimitiveDateTime,
limit: Option<i64>,
offset: Option<i64>,
event_types: HashSet<common_enums::EventType>,
is_delivered: Option<bool>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError>;
@ -101,6 +105,7 @@ where
profile_id: Option<common_utils::id_type::ProfileId>,
created_after: time::PrimitiveDateTime,
created_before: time::PrimitiveDateTime,
event_types: HashSet<common_enums::EventType>,
is_delivered: Option<bool>,
) -> CustomResult<i64, errors::StorageError>;
}
@ -196,6 +201,7 @@ impl EventInterface for Store {
created_before: time::PrimitiveDateTime,
limit: Option<i64>,
offset: Option<i64>,
event_types: HashSet<common_enums::EventType>,
is_delivered: Option<bool>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
@ -207,6 +213,7 @@ impl EventInterface for Store {
created_before,
limit,
offset,
event_types,
is_delivered,
)
.await
@ -309,6 +316,7 @@ impl EventInterface for Store {
created_before: time::PrimitiveDateTime,
limit: Option<i64>,
offset: Option<i64>,
event_types: HashSet<common_enums::EventType>,
is_delivered: Option<bool>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
@ -320,6 +328,7 @@ impl EventInterface for Store {
created_before,
limit,
offset,
event_types,
is_delivered,
)
.await
@ -373,6 +382,7 @@ impl EventInterface for Store {
profile_id: Option<common_utils::id_type::ProfileId>,
created_after: time::PrimitiveDateTime,
created_before: time::PrimitiveDateTime,
event_types: HashSet<common_enums::EventType>,
is_delivered: Option<bool>,
) -> CustomResult<i64, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
@ -382,6 +392,7 @@ impl EventInterface for Store {
profile_id,
created_after,
created_before,
event_types,
is_delivered,
)
.await
@ -492,6 +503,7 @@ impl EventInterface for MockDb {
created_before: time::PrimitiveDateTime,
limit: Option<i64>,
offset: Option<i64>,
event_types: HashSet<common_enums::EventType>,
is_delivered: Option<bool>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
@ -501,6 +513,7 @@ impl EventInterface for MockDb {
&& event.initial_attempt_id.as_ref() == Some(&event.event_id)
&& (event.created_at >= created_after)
&& (event.created_at <= created_before)
&& (event_types.is_empty() || event_types.contains(&event.event_type))
&& (event.is_overall_delivery_successful == is_delivered);
check
@ -626,6 +639,7 @@ impl EventInterface for MockDb {
created_before: time::PrimitiveDateTime,
limit: Option<i64>,
offset: Option<i64>,
event_types: HashSet<common_enums::EventType>,
is_delivered: Option<bool>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
@ -635,6 +649,7 @@ impl EventInterface for MockDb {
&& event.initial_attempt_id.as_ref() == Some(&event.event_id)
&& (event.created_at >= created_after)
&& (event.created_at <= created_before)
&& (event_types.is_empty() || event_types.contains(&event.event_type))
&& (event.is_overall_delivery_successful == is_delivered);
check
@ -733,6 +748,7 @@ impl EventInterface for MockDb {
profile_id: Option<common_utils::id_type::ProfileId>,
created_after: time::PrimitiveDateTime,
created_before: time::PrimitiveDateTime,
event_types: HashSet<common_enums::EventType>,
is_delivered: Option<bool>,
) -> CustomResult<i64, errors::StorageError> {
let locked_events = self.events.lock().await;
@ -743,6 +759,7 @@ impl EventInterface for MockDb {
&& (event.business_profile_id == profile_id)
&& (event.created_at >= created_after)
&& (event.created_at <= created_before)
&& (event_types.is_empty() || event_types.contains(&event.event_type))
&& (event.is_overall_delivery_successful == is_delivered);
check

View File

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{collections::HashSet, sync::Arc};
use ::payment_methods::state::PaymentMethodsStorageInterface;
use common_enums::enums::MerchantStorageScheme;
@ -772,6 +772,7 @@ impl EventInterface for KafkaStore {
created_before: PrimitiveDateTime,
limit: Option<i64>,
offset: Option<i64>,
event_types: HashSet<common_enums::EventType>,
is_delivered: Option<bool>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
@ -783,6 +784,7 @@ impl EventInterface for KafkaStore {
created_before,
limit,
offset,
event_types,
is_delivered,
merchant_key_store,
)
@ -831,6 +833,7 @@ impl EventInterface for KafkaStore {
created_before: PrimitiveDateTime,
limit: Option<i64>,
offset: Option<i64>,
event_types: HashSet<common_enums::EventType>,
is_delivered: Option<bool>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
@ -842,6 +845,7 @@ impl EventInterface for KafkaStore {
created_before,
limit,
offset,
event_types,
is_delivered,
merchant_key_store,
)
@ -873,6 +877,7 @@ impl EventInterface for KafkaStore {
profile_id: Option<id_type::ProfileId>,
created_after: PrimitiveDateTime,
created_before: PrimitiveDateTime,
event_types: HashSet<common_enums::EventType>,
is_delivered: Option<bool>,
) -> CustomResult<i64, errors::StorageError> {
self.diesel_store
@ -881,6 +886,7 @@ impl EventInterface for KafkaStore {
profile_id,
created_after,
created_before,
event_types,
is_delivered,
)
.await

View File

@ -2471,12 +2471,12 @@ impl WebhookEvents {
web::scope("/events")
.app_data(web::Data::new(config))
.service(web::scope("/profile/list").service(web::resource("").route(
web::get().to(webhook_events::list_initial_webhook_delivery_attempts_with_jwtauth),
web::post().to(webhook_events::list_initial_webhook_delivery_attempts_with_jwtauth),
)))
.service(
web::scope("/{merchant_id}")
.service(web::resource("").route(
web::get().to(webhook_events::list_initial_webhook_delivery_attempts),
web::post().to(webhook_events::list_initial_webhook_delivery_attempts),
))
.service(
web::scope("/{event_id}")

View File

@ -20,11 +20,11 @@ pub async fn list_initial_webhook_delivery_attempts(
state: web::Data<AppState>,
req: HttpRequest,
path: web::Path<common_utils::id_type::MerchantId>,
query: web::Query<EventListConstraints>,
json_payload: web::Json<EventListConstraints>,
) -> impl Responder {
let flow = Flow::WebhookEventInitialDeliveryAttemptList;
let merchant_id = path.into_inner();
let constraints = query.into_inner();
let constraints = json_payload.into_inner();
let request_internal = EventListRequestInternal {
merchant_id: merchant_id.clone(),
@ -60,10 +60,10 @@ pub async fn list_initial_webhook_delivery_attempts(
pub async fn list_initial_webhook_delivery_attempts_with_jwtauth(
state: web::Data<AppState>,
req: HttpRequest,
query: web::Query<EventListConstraints>,
json_payload: web::Json<EventListConstraints>,
) -> impl Responder {
let flow = Flow::WebhookEventInitialDeliveryAttemptList;
let constraints = query.into_inner();
let constraints = json_payload.into_inner();
let request_internal = EventListRequestInternal {
merchant_id: common_utils::id_type::MerchantId::default(),

View File

@ -1912,12 +1912,14 @@ impl ForeignTryFrom<api_types::webhook_events::EventListConstraints>
&& (item.created_after.is_some()
|| item.created_before.is_some()
|| item.limit.is_some()
|| item.offset.is_some())
|| item.offset.is_some()
|| item.event_classes.is_some()
|| item.event_types.is_some())
{
return Err(report!(errors::ApiErrorResponse::PreconditionFailed {
message:
"Either only `object_id` must be specified, or one or more of \
`created_after`, `created_before`, `limit` and `offset` must be specified"
`created_after`, `created_before`, `limit`, `offset`, `event_classes` and `event_types` must be specified"
.to_string()
}));
}
@ -1929,6 +1931,8 @@ impl ForeignTryFrom<api_types::webhook_events::EventListConstraints>
created_before: item.created_before,
limit: item.limit.map(i64::from),
offset: item.offset.map(i64::from),
event_classes: item.event_classes,
event_types: item.event_types,
is_delivered: item.is_delivered,
}),
}