mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-11-02 12:06:56 +08:00
fix: Update events table after notifying merchant (#871)
This commit is contained in:
@ -430,6 +430,8 @@ pub enum WebhooksFlowError {
|
|||||||
DisputeCoreFailed,
|
DisputeCoreFailed,
|
||||||
#[error("Webhook event creation failed")]
|
#[error("Webhook event creation failed")]
|
||||||
WebhookEventCreationFailed,
|
WebhookEventCreationFailed,
|
||||||
|
#[error("Webhook event updation failed")]
|
||||||
|
WebhookEventUpdationFailed,
|
||||||
#[error("Unable to fork webhooks flow for outgoing webhooks")]
|
#[error("Unable to fork webhooks flow for outgoing webhooks")]
|
||||||
ForkFlowFailed,
|
ForkFlowFailed,
|
||||||
#[error("Webhook api call to merchant failed")]
|
#[error("Webhook api call to merchant failed")]
|
||||||
|
|||||||
@ -416,7 +416,7 @@ async fn create_event_and_trigger_outgoing_webhook<W: api::OutgoingWebhookType>(
|
|||||||
async fn trigger_webhook_to_merchant<W: api::OutgoingWebhookType>(
|
async fn trigger_webhook_to_merchant<W: api::OutgoingWebhookType>(
|
||||||
merchant_account: storage::MerchantAccount,
|
merchant_account: storage::MerchantAccount,
|
||||||
webhook: api::OutgoingWebhook,
|
webhook: api::OutgoingWebhook,
|
||||||
_db: Box<dyn StorageInterface>,
|
db: Box<dyn StorageInterface>,
|
||||||
) -> CustomResult<(), errors::WebhooksFlowError> {
|
) -> CustomResult<(), errors::WebhooksFlowError> {
|
||||||
let webhook_details_json = merchant_account
|
let webhook_details_json = merchant_account
|
||||||
.webhook_details
|
.webhook_details
|
||||||
@ -434,6 +434,8 @@ async fn trigger_webhook_to_merchant<W: api::OutgoingWebhookType>(
|
|||||||
.change_context(errors::WebhooksFlowError::MerchantWebhookURLNotConfigured)
|
.change_context(errors::WebhooksFlowError::MerchantWebhookURLNotConfigured)
|
||||||
.map(ExposeInterface::expose)?;
|
.map(ExposeInterface::expose)?;
|
||||||
|
|
||||||
|
let outgoing_webhook_event_id = webhook.event_id.clone();
|
||||||
|
|
||||||
let transformed_outgoing_webhook = W::from(webhook);
|
let transformed_outgoing_webhook = W::from(webhook);
|
||||||
|
|
||||||
let response = reqwest::Client::new()
|
let response = reqwest::Client::new()
|
||||||
@ -454,7 +456,14 @@ async fn trigger_webhook_to_merchant<W: api::OutgoingWebhookType>(
|
|||||||
.change_context(errors::WebhooksFlowError::CallToMerchantFailed)?;
|
.change_context(errors::WebhooksFlowError::CallToMerchantFailed)?;
|
||||||
}
|
}
|
||||||
Ok(res) => {
|
Ok(res) => {
|
||||||
if !res.status().is_success() {
|
if res.status().is_success() {
|
||||||
|
let update_event = storage::EventUpdate::UpdateWebhookNotified {
|
||||||
|
is_webhook_notified: Some(true),
|
||||||
|
};
|
||||||
|
db.update_event(outgoing_webhook_event_id, update_event)
|
||||||
|
.await
|
||||||
|
.change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed)?;
|
||||||
|
} else {
|
||||||
// [#217]: Schedule webhook for retry.
|
// [#217]: Schedule webhook for retry.
|
||||||
Err(errors::WebhooksFlowError::NotReceivedByMerchant).into_report()?;
|
Err(errors::WebhooksFlowError::NotReceivedByMerchant).into_report()?;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -13,6 +13,11 @@ pub trait EventInterface {
|
|||||||
&self,
|
&self,
|
||||||
event: storage::EventNew,
|
event: storage::EventNew,
|
||||||
) -> CustomResult<storage::Event, errors::StorageError>;
|
) -> CustomResult<storage::Event, errors::StorageError>;
|
||||||
|
async fn update_event(
|
||||||
|
&self,
|
||||||
|
event_id: String,
|
||||||
|
event: storage::EventUpdate,
|
||||||
|
) -> CustomResult<storage::Event, errors::StorageError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
@ -24,6 +29,17 @@ impl EventInterface for Store {
|
|||||||
let conn = connection::pg_connection_write(self).await?;
|
let conn = connection::pg_connection_write(self).await?;
|
||||||
event.insert(&conn).await.map_err(Into::into).into_report()
|
event.insert(&conn).await.map_err(Into::into).into_report()
|
||||||
}
|
}
|
||||||
|
async fn update_event(
|
||||||
|
&self,
|
||||||
|
event_id: String,
|
||||||
|
event: storage::EventUpdate,
|
||||||
|
) -> CustomResult<storage::Event, errors::StorageError> {
|
||||||
|
let conn = connection::pg_connection_write(self).await?;
|
||||||
|
storage::Event::update(&conn, &event_id, event)
|
||||||
|
.await
|
||||||
|
.map_err(Into::into)
|
||||||
|
.into_report()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
@ -35,4 +51,11 @@ impl EventInterface for MockDb {
|
|||||||
// [#172]: Implement function for `MockDb`
|
// [#172]: Implement function for `MockDb`
|
||||||
Err(errors::StorageError::MockDbError)?
|
Err(errors::StorageError::MockDbError)?
|
||||||
}
|
}
|
||||||
|
async fn update_event(
|
||||||
|
&self,
|
||||||
|
_event_id: String,
|
||||||
|
_event: storage::EventUpdate,
|
||||||
|
) -> CustomResult<storage::Event, errors::StorageError> {
|
||||||
|
Err(errors::StorageError::MockDbError)?
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1 +1 @@
|
|||||||
pub use storage_models::events::{Event, EventNew};
|
pub use storage_models::events::{Event, EventNew, EventUpdate};
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
use common_utils::custom_serde;
|
use common_utils::custom_serde;
|
||||||
use diesel::{Identifiable, Insertable, Queryable};
|
use diesel::{AsChangeset, Identifiable, Insertable, Queryable};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use time::PrimitiveDateTime;
|
use time::PrimitiveDateTime;
|
||||||
|
|
||||||
@ -18,6 +18,17 @@ pub struct EventNew {
|
|||||||
pub primary_object_type: storage_enums::EventObjectType,
|
pub primary_object_type: storage_enums::EventObjectType,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum EventUpdate {
|
||||||
|
UpdateWebhookNotified { is_webhook_notified: Option<bool> },
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Default, AsChangeset, router_derive::DebugAsDisplay)]
|
||||||
|
#[diesel(table_name = events)]
|
||||||
|
pub struct EventUpdateInternal {
|
||||||
|
pub is_webhook_notified: Option<bool>,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Deserialize, Serialize, Identifiable, Queryable)]
|
#[derive(Clone, Debug, Deserialize, Serialize, Identifiable, Queryable)]
|
||||||
#[diesel(table_name = events)]
|
#[diesel(table_name = events)]
|
||||||
pub struct Event {
|
pub struct Event {
|
||||||
@ -33,3 +44,15 @@ pub struct Event {
|
|||||||
#[serde(with = "custom_serde::iso8601")]
|
#[serde(with = "custom_serde::iso8601")]
|
||||||
pub created_at: PrimitiveDateTime,
|
pub created_at: PrimitiveDateTime,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<EventUpdate> for EventUpdateInternal {
|
||||||
|
fn from(event_update: EventUpdate) -> Self {
|
||||||
|
match event_update {
|
||||||
|
EventUpdate::UpdateWebhookNotified {
|
||||||
|
is_webhook_notified,
|
||||||
|
} => Self {
|
||||||
|
is_webhook_notified,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -1,8 +1,10 @@
|
|||||||
|
use diesel::{associations::HasTable, ExpressionMethods};
|
||||||
use router_env::{instrument, tracing};
|
use router_env::{instrument, tracing};
|
||||||
|
|
||||||
use super::generics;
|
use super::generics;
|
||||||
use crate::{
|
use crate::{
|
||||||
events::{Event, EventNew},
|
events::{Event, EventNew, EventUpdate, EventUpdateInternal},
|
||||||
|
schema::events::dsl,
|
||||||
PgPooledConn, StorageResult,
|
PgPooledConn, StorageResult,
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -12,3 +14,24 @@ impl EventNew {
|
|||||||
generics::generic_insert(conn, self).await
|
generics::generic_insert(conn, self).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Event {
|
||||||
|
#[instrument(skip(conn))]
|
||||||
|
pub async fn update(
|
||||||
|
conn: &PgPooledConn,
|
||||||
|
event_id: &str,
|
||||||
|
event: EventUpdate,
|
||||||
|
) -> StorageResult<Self> {
|
||||||
|
generics::generic_update_with_unique_predicate_get_result::<
|
||||||
|
<Self as HasTable>::Table,
|
||||||
|
_,
|
||||||
|
_,
|
||||||
|
_,
|
||||||
|
>(
|
||||||
|
conn,
|
||||||
|
dsl::event_id.eq(event_id.to_owned()),
|
||||||
|
EventUpdateInternal::from(event),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user