mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-28 12:15:40 +08:00
feat(webhook): Return events list and total_count on list initial delivery attempt call (#7243)
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
@ -5395,10 +5395,7 @@
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/components/schemas/EventListItemResponse"
|
||||
}
|
||||
"$ref": "#/components/schemas/TotalEventsResponse"
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -26727,6 +26724,28 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"TotalEventsResponse": {
|
||||
"type": "object",
|
||||
"description": "The response body of list initial delivery attempts api call.",
|
||||
"required": [
|
||||
"events",
|
||||
"total_count"
|
||||
],
|
||||
"properties": {
|
||||
"events": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/components/schemas/EventListItemResponse"
|
||||
},
|
||||
"description": "The list of events"
|
||||
},
|
||||
"total_count": {
|
||||
"type": "integer",
|
||||
"format": "int64",
|
||||
"description": "Count of total events"
|
||||
}
|
||||
}
|
||||
},
|
||||
"TouchNGoRedirection": {
|
||||
"type": "object"
|
||||
},
|
||||
|
||||
@ -5,7 +5,7 @@ use time::PrimitiveDateTime;
|
||||
use utoipa::ToSchema;
|
||||
|
||||
/// The constraints to apply when filtering events.
|
||||
#[derive(Debug, Serialize, Deserialize, ToSchema)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)]
|
||||
pub struct EventListConstraints {
|
||||
/// Filter events created after the specified time.
|
||||
#[serde(default, with = "common_utils::custom_serde::iso8601::option")]
|
||||
@ -82,6 +82,32 @@ pub struct EventListItemResponse {
|
||||
pub created: PrimitiveDateTime,
|
||||
}
|
||||
|
||||
/// The response body of list initial delivery attempts api call.
|
||||
#[derive(Debug, Serialize, ToSchema)]
|
||||
pub struct TotalEventsResponse {
|
||||
/// The list of events
|
||||
pub events: Vec<EventListItemResponse>,
|
||||
/// Count of total events
|
||||
pub total_count: i64,
|
||||
}
|
||||
|
||||
impl TotalEventsResponse {
|
||||
pub fn new(total_count: i64, events: Vec<EventListItemResponse>) -> Self {
|
||||
Self {
|
||||
events,
|
||||
total_count,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl common_utils::events::ApiEventMetric for TotalEventsResponse {
|
||||
fn get_api_event_type(&self) -> Option<common_utils::events::ApiEventsType> {
|
||||
Some(common_utils::events::ApiEventsType::Events {
|
||||
merchant_id: self.events.first().map(|event| event.merchant_id.clone())?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// The response body for retrieving an event.
|
||||
#[derive(Debug, Serialize, ToSchema)]
|
||||
pub struct EventRetrieveResponse {
|
||||
|
||||
@ -52,8 +52,8 @@ impl Event {
|
||||
pub async fn list_initial_attempts_by_merchant_id_constraints(
|
||||
conn: &PgPooledConn,
|
||||
merchant_id: &common_utils::id_type::MerchantId,
|
||||
created_after: Option<time::PrimitiveDateTime>,
|
||||
created_before: Option<time::PrimitiveDateTime>,
|
||||
created_after: time::PrimitiveDateTime,
|
||||
created_before: time::PrimitiveDateTime,
|
||||
limit: Option<i64>,
|
||||
offset: Option<i64>,
|
||||
) -> StorageResult<Vec<Self>> {
|
||||
@ -75,21 +75,13 @@ impl Event {
|
||||
.order(dsl::created_at.desc())
|
||||
.into_boxed();
|
||||
|
||||
if let Some(created_after) = created_after {
|
||||
query = query.filter(dsl::created_at.ge(created_after));
|
||||
}
|
||||
|
||||
if let Some(created_before) = created_before {
|
||||
query = query.filter(dsl::created_at.le(created_before));
|
||||
}
|
||||
|
||||
if let Some(limit) = limit {
|
||||
query = query.limit(limit);
|
||||
}
|
||||
|
||||
if let Some(offset) = offset {
|
||||
query = query.offset(offset);
|
||||
}
|
||||
query = Self::apply_filters(
|
||||
query,
|
||||
None,
|
||||
(dsl::created_at, created_after, created_before),
|
||||
limit,
|
||||
offset,
|
||||
);
|
||||
|
||||
logger::debug!(query = %debug_query::<Pg, _>(&query).to_string());
|
||||
|
||||
@ -138,8 +130,8 @@ impl Event {
|
||||
pub async fn list_initial_attempts_by_profile_id_constraints(
|
||||
conn: &PgPooledConn,
|
||||
profile_id: &common_utils::id_type::ProfileId,
|
||||
created_after: Option<time::PrimitiveDateTime>,
|
||||
created_before: Option<time::PrimitiveDateTime>,
|
||||
created_after: time::PrimitiveDateTime,
|
||||
created_before: time::PrimitiveDateTime,
|
||||
limit: Option<i64>,
|
||||
offset: Option<i64>,
|
||||
) -> StorageResult<Vec<Self>> {
|
||||
@ -161,21 +153,13 @@ impl Event {
|
||||
.order(dsl::created_at.desc())
|
||||
.into_boxed();
|
||||
|
||||
if let Some(created_after) = created_after {
|
||||
query = query.filter(dsl::created_at.ge(created_after));
|
||||
}
|
||||
|
||||
if let Some(created_before) = created_before {
|
||||
query = query.filter(dsl::created_at.le(created_before));
|
||||
}
|
||||
|
||||
if let Some(limit) = limit {
|
||||
query = query.limit(limit);
|
||||
}
|
||||
|
||||
if let Some(offset) = offset {
|
||||
query = query.offset(offset);
|
||||
}
|
||||
query = Self::apply_filters(
|
||||
query,
|
||||
None,
|
||||
(dsl::created_at, created_after, created_before),
|
||||
limit,
|
||||
offset,
|
||||
);
|
||||
|
||||
logger::debug!(query = %debug_query::<Pg, _>(&query).to_string());
|
||||
|
||||
@ -222,4 +206,94 @@ impl Event {
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
fn apply_filters<T>(
|
||||
mut query: T,
|
||||
profile_id: Option<common_utils::id_type::ProfileId>,
|
||||
(column, created_after, created_before): (
|
||||
dsl::created_at,
|
||||
time::PrimitiveDateTime,
|
||||
time::PrimitiveDateTime,
|
||||
),
|
||||
limit: Option<i64>,
|
||||
offset: Option<i64>,
|
||||
) -> T
|
||||
where
|
||||
T: diesel::query_dsl::methods::LimitDsl<Output = T>
|
||||
+ diesel::query_dsl::methods::OffsetDsl<Output = T>,
|
||||
T: diesel::query_dsl::methods::FilterDsl<
|
||||
diesel::dsl::GtEq<dsl::created_at, time::PrimitiveDateTime>,
|
||||
Output = T,
|
||||
>,
|
||||
T: diesel::query_dsl::methods::FilterDsl<
|
||||
diesel::dsl::LtEq<dsl::created_at, time::PrimitiveDateTime>,
|
||||
Output = T,
|
||||
>,
|
||||
T: diesel::query_dsl::methods::FilterDsl<
|
||||
diesel::dsl::Eq<dsl::business_profile_id, common_utils::id_type::ProfileId>,
|
||||
Output = T,
|
||||
>,
|
||||
{
|
||||
if let Some(profile_id) = profile_id {
|
||||
query = query.filter(dsl::business_profile_id.eq(profile_id));
|
||||
}
|
||||
|
||||
query = query
|
||||
.filter(column.ge(created_after))
|
||||
.filter(column.le(created_before));
|
||||
|
||||
if let Some(limit) = limit {
|
||||
query = query.limit(limit);
|
||||
}
|
||||
|
||||
if let Some(offset) = offset {
|
||||
query = query.offset(offset);
|
||||
}
|
||||
|
||||
query
|
||||
}
|
||||
|
||||
pub async fn count_initial_attempts_by_constraints(
|
||||
conn: &PgPooledConn,
|
||||
merchant_id: &common_utils::id_type::MerchantId,
|
||||
profile_id: Option<common_utils::id_type::ProfileId>,
|
||||
created_after: time::PrimitiveDateTime,
|
||||
created_before: time::PrimitiveDateTime,
|
||||
) -> StorageResult<i64> {
|
||||
use async_bb8_diesel::AsyncRunQueryDsl;
|
||||
use diesel::{debug_query, pg::Pg, QueryDsl};
|
||||
use error_stack::ResultExt;
|
||||
use router_env::logger;
|
||||
|
||||
use super::generics::db_metrics::{track_database_call, DatabaseOperation};
|
||||
use crate::errors::DatabaseError;
|
||||
|
||||
let mut query = Self::table()
|
||||
.count()
|
||||
.filter(
|
||||
dsl::event_id
|
||||
.nullable()
|
||||
.eq(dsl::initial_attempt_id) // Filter initial attempts only
|
||||
.and(dsl::merchant_id.eq(merchant_id.to_owned())),
|
||||
)
|
||||
.into_boxed();
|
||||
|
||||
query = Self::apply_filters(
|
||||
query,
|
||||
profile_id,
|
||||
(dsl::created_at, created_after, created_before),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
logger::debug!(query = %debug_query::<Pg, _>(&query).to_string());
|
||||
|
||||
track_database_call::<Self, _, _>(
|
||||
query.get_result_async::<i64>(conn),
|
||||
DatabaseOperation::Count,
|
||||
)
|
||||
.await
|
||||
.change_context(DatabaseError::Others)
|
||||
.attach_printable("Error counting events by constraints")
|
||||
}
|
||||
}
|
||||
|
||||
@ -678,6 +678,7 @@ Never share your secret api keys. Keep them guarded and secure.
|
||||
api_models::webhook_events::EventRetrieveResponse,
|
||||
api_models::webhook_events::OutgoingWebhookRequestContent,
|
||||
api_models::webhook_events::OutgoingWebhookResponseContent,
|
||||
api_models::webhook_events::TotalEventsResponse,
|
||||
api_models::enums::WebhookDeliveryAttempt,
|
||||
api_models::enums::PaymentChargeType,
|
||||
api_models::enums::StripeChargeType,
|
||||
|
||||
@ -47,7 +47,7 @@
|
||||
),
|
||||
),
|
||||
responses(
|
||||
(status = 200, description = "List of Events retrieved successfully", body = Vec<EventListItemResponse>),
|
||||
(status = 200, description = "List of Events retrieved successfully", body = TotalEventsResponse),
|
||||
),
|
||||
tag = "Event",
|
||||
operation_id = "List all Events associated with a Merchant Account or Profile",
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
use common_utils::{self, fp_utils};
|
||||
use error_stack::ResultExt;
|
||||
use masking::PeekInterface;
|
||||
use router_env::{instrument, tracing};
|
||||
@ -11,6 +12,7 @@ use crate::{
|
||||
};
|
||||
|
||||
const INITIAL_DELIVERY_ATTEMPTS_LIST_MAX_LIMIT: i64 = 100;
|
||||
const INITIAL_DELIVERY_ATTEMPTS_LIST_MAX_DAYS: i64 = 90;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum MerchantAccountOrProfile {
|
||||
@ -22,16 +24,21 @@ enum MerchantAccountOrProfile {
|
||||
pub async fn list_initial_delivery_attempts(
|
||||
state: SessionState,
|
||||
merchant_id: common_utils::id_type::MerchantId,
|
||||
constraints: api::webhook_events::EventListConstraints,
|
||||
) -> RouterResponse<Vec<api::webhook_events::EventListItemResponse>> {
|
||||
let profile_id = constraints.profile_id.clone();
|
||||
let constraints =
|
||||
api::webhook_events::EventListConstraintsInternal::foreign_try_from(constraints)?;
|
||||
api_constraints: api::webhook_events::EventListConstraints,
|
||||
) -> RouterResponse<api::webhook_events::TotalEventsResponse> {
|
||||
let profile_id = api_constraints.profile_id.clone();
|
||||
let constraints = api::webhook_events::EventListConstraintsInternal::foreign_try_from(
|
||||
api_constraints.clone(),
|
||||
)?;
|
||||
|
||||
let store = state.store.as_ref();
|
||||
let key_manager_state = &(&state).into();
|
||||
let (account, key_store) =
|
||||
get_account_and_key_store(state.clone(), merchant_id, profile_id).await?;
|
||||
get_account_and_key_store(state.clone(), merchant_id.clone(), profile_id.clone()).await?;
|
||||
|
||||
let now = common_utils::date_time::now();
|
||||
let events_list_begin_time =
|
||||
(now.date() - time::Duration::days(INITIAL_DELIVERY_ATTEMPTS_LIST_MAX_DAYS)).midnight();
|
||||
|
||||
let events = match constraints {
|
||||
api_models::webhook_events::EventListConstraintsInternal::ObjectIdFilter { object_id } => {
|
||||
@ -72,6 +79,33 @@ pub async fn list_initial_delivery_attempts(
|
||||
_ => None,
|
||||
};
|
||||
|
||||
fp_utils::when(!created_after.zip(created_before).map(|(created_after,created_before)| created_after<=created_before).unwrap_or(true), || {
|
||||
Err(errors::ApiErrorResponse::InvalidRequestData { message: "The `created_after` timestamp must be an earlier timestamp compared to the `created_before` timestamp".to_string() })
|
||||
})?;
|
||||
|
||||
let created_after = match created_after {
|
||||
Some(created_after) => {
|
||||
if created_after < events_list_begin_time {
|
||||
Err(errors::ApiErrorResponse::InvalidRequestData { message: format!("`created_after` must be a timestamp within the past {INITIAL_DELIVERY_ATTEMPTS_LIST_MAX_DAYS} days.") })
|
||||
}else{
|
||||
Ok(created_after)
|
||||
}
|
||||
},
|
||||
None => Ok(events_list_begin_time)
|
||||
}?;
|
||||
|
||||
let created_before = match created_before{
|
||||
Some(created_before) => {
|
||||
if created_before < events_list_begin_time{
|
||||
Err(errors::ApiErrorResponse::InvalidRequestData { message: format!("`created_before` must be a timestamp within the past {INITIAL_DELIVERY_ATTEMPTS_LIST_MAX_DAYS} days.") })
|
||||
}
|
||||
else{
|
||||
Ok(created_before)
|
||||
}
|
||||
},
|
||||
None => Ok(now)
|
||||
}?;
|
||||
|
||||
match account {
|
||||
MerchantAccountOrProfile::MerchantAccount(merchant_account) => store
|
||||
.list_initial_events_by_merchant_id_constraints(key_manager_state,
|
||||
@ -99,11 +133,29 @@ pub async fn list_initial_delivery_attempts(
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable("Failed to list events with specified constraints")?;
|
||||
|
||||
let events = events
|
||||
.into_iter()
|
||||
.map(api::webhook_events::EventListItemResponse::try_from)
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
let created_after = api_constraints
|
||||
.created_after
|
||||
.unwrap_or(events_list_begin_time);
|
||||
let created_before = api_constraints.created_before.unwrap_or(now);
|
||||
|
||||
let total_count = store
|
||||
.count_initial_events_by_constraints(
|
||||
&merchant_id,
|
||||
profile_id,
|
||||
created_after,
|
||||
created_before,
|
||||
)
|
||||
.await
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable("Failed to get total events count")?;
|
||||
|
||||
Ok(ApplicationResponse::Json(
|
||||
events
|
||||
.into_iter()
|
||||
.map(api::webhook_events::EventListItemResponse::try_from)
|
||||
.collect::<Result<Vec<_>, _>>()?,
|
||||
api::webhook_events::TotalEventsResponse::new(total_count, events),
|
||||
))
|
||||
}
|
||||
|
||||
|
||||
@ -49,8 +49,8 @@ where
|
||||
&self,
|
||||
state: &KeyManagerState,
|
||||
merchant_id: &common_utils::id_type::MerchantId,
|
||||
created_after: Option<time::PrimitiveDateTime>,
|
||||
created_before: Option<time::PrimitiveDateTime>,
|
||||
created_after: time::PrimitiveDateTime,
|
||||
created_before: time::PrimitiveDateTime,
|
||||
limit: Option<i64>,
|
||||
offset: Option<i64>,
|
||||
merchant_key_store: &domain::MerchantKeyStore,
|
||||
@ -77,8 +77,8 @@ where
|
||||
&self,
|
||||
state: &KeyManagerState,
|
||||
profile_id: &common_utils::id_type::ProfileId,
|
||||
created_after: Option<time::PrimitiveDateTime>,
|
||||
created_before: Option<time::PrimitiveDateTime>,
|
||||
created_after: time::PrimitiveDateTime,
|
||||
created_before: time::PrimitiveDateTime,
|
||||
limit: Option<i64>,
|
||||
offset: Option<i64>,
|
||||
merchant_key_store: &domain::MerchantKeyStore,
|
||||
@ -92,6 +92,14 @@ where
|
||||
event: domain::EventUpdate,
|
||||
merchant_key_store: &domain::MerchantKeyStore,
|
||||
) -> CustomResult<domain::Event, errors::StorageError>;
|
||||
|
||||
async fn count_initial_events_by_constraints(
|
||||
&self,
|
||||
merchant_id: &common_utils::id_type::MerchantId,
|
||||
profile_id: Option<common_utils::id_type::ProfileId>,
|
||||
created_after: time::PrimitiveDateTime,
|
||||
created_before: time::PrimitiveDateTime,
|
||||
) -> CustomResult<i64, errors::StorageError>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@ -181,8 +189,8 @@ impl EventInterface for Store {
|
||||
&self,
|
||||
state: &KeyManagerState,
|
||||
merchant_id: &common_utils::id_type::MerchantId,
|
||||
created_after: Option<time::PrimitiveDateTime>,
|
||||
created_before: Option<time::PrimitiveDateTime>,
|
||||
created_after: time::PrimitiveDateTime,
|
||||
created_before: time::PrimitiveDateTime,
|
||||
limit: Option<i64>,
|
||||
offset: Option<i64>,
|
||||
merchant_key_store: &domain::MerchantKeyStore,
|
||||
@ -292,8 +300,8 @@ impl EventInterface for Store {
|
||||
&self,
|
||||
state: &KeyManagerState,
|
||||
profile_id: &common_utils::id_type::ProfileId,
|
||||
created_after: Option<time::PrimitiveDateTime>,
|
||||
created_before: Option<time::PrimitiveDateTime>,
|
||||
created_after: time::PrimitiveDateTime,
|
||||
created_before: time::PrimitiveDateTime,
|
||||
limit: Option<i64>,
|
||||
offset: Option<i64>,
|
||||
merchant_key_store: &domain::MerchantKeyStore,
|
||||
@ -351,6 +359,25 @@ impl EventInterface for Store {
|
||||
.await
|
||||
.change_context(errors::StorageError::DecryptionError)
|
||||
}
|
||||
|
||||
async fn count_initial_events_by_constraints(
|
||||
&self,
|
||||
merchant_id: &common_utils::id_type::MerchantId,
|
||||
profile_id: Option<common_utils::id_type::ProfileId>,
|
||||
created_after: time::PrimitiveDateTime,
|
||||
created_before: time::PrimitiveDateTime,
|
||||
) -> CustomResult<i64, errors::StorageError> {
|
||||
let conn = connection::pg_connection_read(self).await?;
|
||||
storage::Event::count_initial_attempts_by_constraints(
|
||||
&conn,
|
||||
merchant_id,
|
||||
profile_id,
|
||||
created_after,
|
||||
created_before,
|
||||
)
|
||||
.await
|
||||
.map_err(|error| report!(errors::StorageError::from(error)))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@ -452,28 +479,21 @@ impl EventInterface for MockDb {
|
||||
&self,
|
||||
state: &KeyManagerState,
|
||||
merchant_id: &common_utils::id_type::MerchantId,
|
||||
created_after: Option<time::PrimitiveDateTime>,
|
||||
created_before: Option<time::PrimitiveDateTime>,
|
||||
created_after: time::PrimitiveDateTime,
|
||||
created_before: time::PrimitiveDateTime,
|
||||
limit: Option<i64>,
|
||||
offset: Option<i64>,
|
||||
merchant_key_store: &domain::MerchantKeyStore,
|
||||
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
|
||||
let locked_events = self.events.lock().await;
|
||||
let events_iter = locked_events.iter().filter(|event| {
|
||||
let mut check = event.merchant_id == Some(merchant_id.to_owned())
|
||||
&& event.initial_attempt_id.as_ref() == Some(&event.event_id);
|
||||
|
||||
if let Some(created_after) = created_after {
|
||||
check = check && (event.created_at >= created_after);
|
||||
}
|
||||
|
||||
if let Some(created_before) = created_before {
|
||||
check = check && (event.created_at <= created_before);
|
||||
}
|
||||
let check = event.merchant_id == Some(merchant_id.to_owned())
|
||||
&& event.initial_attempt_id.as_ref() == Some(&event.event_id)
|
||||
&& (event.created_at >= created_after)
|
||||
&& (event.created_at <= created_before);
|
||||
|
||||
check
|
||||
});
|
||||
|
||||
let offset: usize = if let Some(offset) = offset {
|
||||
if offset < 0 {
|
||||
Err(errors::StorageError::MockDbError)?;
|
||||
@ -590,24 +610,18 @@ impl EventInterface for MockDb {
|
||||
&self,
|
||||
state: &KeyManagerState,
|
||||
profile_id: &common_utils::id_type::ProfileId,
|
||||
created_after: Option<time::PrimitiveDateTime>,
|
||||
created_before: Option<time::PrimitiveDateTime>,
|
||||
created_after: time::PrimitiveDateTime,
|
||||
created_before: time::PrimitiveDateTime,
|
||||
limit: Option<i64>,
|
||||
offset: Option<i64>,
|
||||
merchant_key_store: &domain::MerchantKeyStore,
|
||||
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
|
||||
let locked_events = self.events.lock().await;
|
||||
let events_iter = locked_events.iter().filter(|event| {
|
||||
let mut check = event.business_profile_id == Some(profile_id.to_owned())
|
||||
&& event.initial_attempt_id.as_ref() == Some(&event.event_id);
|
||||
|
||||
if let Some(created_after) = created_after {
|
||||
check = check && (event.created_at >= created_after);
|
||||
}
|
||||
|
||||
if let Some(created_before) = created_before {
|
||||
check = check && (event.created_at <= created_before);
|
||||
}
|
||||
let check = event.business_profile_id == Some(profile_id.to_owned())
|
||||
&& event.initial_attempt_id.as_ref() == Some(&event.event_id)
|
||||
&& (event.created_at >= created_after)
|
||||
&& (event.created_at <= created_before);
|
||||
|
||||
check
|
||||
});
|
||||
@ -692,6 +706,32 @@ impl EventInterface for MockDb {
|
||||
.await
|
||||
.change_context(errors::StorageError::DecryptionError)
|
||||
}
|
||||
|
||||
async fn count_initial_events_by_constraints(
|
||||
&self,
|
||||
merchant_id: &common_utils::id_type::MerchantId,
|
||||
profile_id: Option<common_utils::id_type::ProfileId>,
|
||||
created_after: time::PrimitiveDateTime,
|
||||
created_before: time::PrimitiveDateTime,
|
||||
) -> CustomResult<i64, errors::StorageError> {
|
||||
let locked_events = self.events.lock().await;
|
||||
|
||||
let iter_events = locked_events.iter().filter(|event| {
|
||||
let check = event.initial_attempt_id.as_ref() == Some(&event.event_id)
|
||||
&& (event.merchant_id == Some(merchant_id.to_owned()))
|
||||
&& (event.business_profile_id == profile_id)
|
||||
&& (event.created_at >= created_after)
|
||||
&& (event.created_at <= created_before);
|
||||
|
||||
check
|
||||
});
|
||||
|
||||
let events = iter_events.cloned().collect::<Vec<_>>();
|
||||
|
||||
i64::try_from(events.len())
|
||||
.change_context(errors::StorageError::MockDbError)
|
||||
.attach_printable("Failed to convert usize to i64")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@ -768,8 +768,8 @@ impl EventInterface for KafkaStore {
|
||||
&self,
|
||||
state: &KeyManagerState,
|
||||
merchant_id: &id_type::MerchantId,
|
||||
created_after: Option<PrimitiveDateTime>,
|
||||
created_before: Option<PrimitiveDateTime>,
|
||||
created_after: PrimitiveDateTime,
|
||||
created_before: PrimitiveDateTime,
|
||||
limit: Option<i64>,
|
||||
offset: Option<i64>,
|
||||
merchant_key_store: &domain::MerchantKeyStore,
|
||||
@ -825,8 +825,8 @@ impl EventInterface for KafkaStore {
|
||||
&self,
|
||||
state: &KeyManagerState,
|
||||
profile_id: &id_type::ProfileId,
|
||||
created_after: Option<PrimitiveDateTime>,
|
||||
created_before: Option<PrimitiveDateTime>,
|
||||
created_after: PrimitiveDateTime,
|
||||
created_before: PrimitiveDateTime,
|
||||
limit: Option<i64>,
|
||||
offset: Option<i64>,
|
||||
merchant_key_store: &domain::MerchantKeyStore,
|
||||
@ -862,6 +862,23 @@ impl EventInterface for KafkaStore {
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn count_initial_events_by_constraints(
|
||||
&self,
|
||||
merchant_id: &id_type::MerchantId,
|
||||
profile_id: Option<id_type::ProfileId>,
|
||||
created_after: PrimitiveDateTime,
|
||||
created_before: PrimitiveDateTime,
|
||||
) -> CustomResult<i64, errors::StorageError> {
|
||||
self.diesel_store
|
||||
.count_initial_events_by_constraints(
|
||||
merchant_id,
|
||||
profile_id,
|
||||
created_after,
|
||||
created_before,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
pub use api_models::webhook_events::{
|
||||
EventListConstraints, EventListConstraintsInternal, EventListItemResponse,
|
||||
EventListRequestInternal, EventRetrieveResponse, OutgoingWebhookRequestContent,
|
||||
OutgoingWebhookResponseContent, WebhookDeliveryAttemptListRequestInternal,
|
||||
OutgoingWebhookResponseContent, TotalEventsResponse, WebhookDeliveryAttemptListRequestInternal,
|
||||
WebhookDeliveryRetryRequestInternal,
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user