From d850f17b87e4eedc66836925136ffbd513d09124 Mon Sep 17 00:00:00 2001 From: Shankar Singh C <83439957+ShankarSinghC@users.noreply.github.com> Date: Fri, 10 Jan 2025 13:08:36 +0530 Subject: [PATCH] feat(router): add support for relay refund incoming webhooks (#6974) Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> --- crates/api_models/src/webhooks.rs | 6 + crates/common_utils/src/id_type/relay.rs | 11 + crates/diesel_models/src/query/relay.rs | 16 +- crates/hyperswitch_domain_models/src/relay.rs | 2 +- crates/router/src/core/webhooks/incoming.rs | 363 +++++++++++++----- .../router/src/core/webhooks/incoming_v2.rs | 3 + crates/router/src/db/relay.rs | 59 +++ crates/router/src/lib.rs | 1 + crates/router/src/routes.rs | 2 +- crates/router/src/routes/app.rs | 14 + crates/router/src/routes/lock_utils.rs | 1 + crates/router/src/routes/webhooks.rs | 85 ++++ crates/router/src/types/transformers.rs | 16 + crates/router_env/src/logger/types.rs | 2 + .../down.sql | 2 + .../up.sql | 2 + 16 files changed, 488 insertions(+), 97 deletions(-) create mode 100644 migrations/2025-01-07-105739_create_index_for_relay/down.sql create mode 100644 migrations/2025-01-07-105739_create_index_for_relay/up.sql diff --git a/crates/api_models/src/webhooks.rs b/crates/api_models/src/webhooks.rs index e6f4065eb7..f99e3a450b 100644 --- a/crates/api_models/src/webhooks.rs +++ b/crates/api_models/src/webhooks.rs @@ -120,6 +120,10 @@ pub enum WebhookResponseTracker { status: common_enums::MandateStatus, }, NoEffect, + Relay { + relay_id: common_utils::id_type::RelayId, + status: common_enums::RelayStatus, + }, } impl WebhookResponseTracker { @@ -132,6 +136,7 @@ impl WebhookResponseTracker { Self::NoEffect | Self::Mandate { .. } => None, #[cfg(feature = "payouts")] Self::Payout { .. } => None, + Self::Relay { .. } => None, } } @@ -144,6 +149,7 @@ impl WebhookResponseTracker { Self::NoEffect | Self::Mandate { .. } => None, #[cfg(feature = "payouts")] Self::Payout { .. } => None, + Self::Relay { .. } => None, } } } diff --git a/crates/common_utils/src/id_type/relay.rs b/crates/common_utils/src/id_type/relay.rs index 3ad64729fb..c818671e0e 100644 --- a/crates/common_utils/src/id_type/relay.rs +++ b/crates/common_utils/src/id_type/relay.rs @@ -1,3 +1,5 @@ +use std::str::FromStr; + crate::id_type!( RelayId, "A type for relay_id that can be used for relay ids" @@ -11,3 +13,12 @@ crate::impl_queryable_id_type!(RelayId); crate::impl_to_sql_from_sql_id_type!(RelayId); crate::impl_debug_id_type!(RelayId); + +impl FromStr for RelayId { + type Err = error_stack::Report; + + fn from_str(s: &str) -> Result { + let cow_string = std::borrow::Cow::Owned(s.to_string()); + Self::try_from(cow_string) + } +} diff --git a/crates/diesel_models/src/query/relay.rs b/crates/diesel_models/src/query/relay.rs index 034446fe6b..217f6fd734 100644 --- a/crates/diesel_models/src/query/relay.rs +++ b/crates/diesel_models/src/query/relay.rs @@ -1,4 +1,4 @@ -use diesel::{associations::HasTable, ExpressionMethods}; +use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods}; use super::generics; use crate::{ @@ -46,4 +46,18 @@ impl Relay { ) .await } + + pub async fn find_by_profile_id_connector_reference_id( + conn: &PgPooledConn, + profile_id: &common_utils::id_type::ProfileId, + connector_reference_id: &str, + ) -> StorageResult { + generics::generic_find_one::<::Table, _, _>( + conn, + dsl::profile_id + .eq(profile_id.to_owned()) + .and(dsl::connector_reference_id.eq(connector_reference_id.to_owned())), + ) + .await + } } diff --git a/crates/hyperswitch_domain_models/src/relay.rs b/crates/hyperswitch_domain_models/src/relay.rs index 959ac8e7f6..8af58265c3 100644 --- a/crates/hyperswitch_domain_models/src/relay.rs +++ b/crates/hyperswitch_domain_models/src/relay.rs @@ -81,7 +81,7 @@ impl RelayUpdate { match response { Err(error) => Self::ErrorUpdate { error_code: error.code, - error_message: error.message, + error_message: error.reason.unwrap_or(error.message), status: common_enums::RelayStatus::Failure, }, Ok(response) => Self::StatusUpdate { diff --git a/crates/router/src/core/webhooks/incoming.rs b/crates/router/src/core/webhooks/incoming.rs index 474a0c628e..4c1a5aeb78 100644 --- a/crates/router/src/core/webhooks/incoming.rs +++ b/crates/router/src/core/webhooks/incoming.rs @@ -22,9 +22,9 @@ use crate::{ core::{ api_locking, errors::{self, ConnectorErrorExt, CustomResult, RouterResponse, StorageErrorExt}, - metrics, payments, - payments::tokenization, - refunds, utils as core_utils, + metrics, + payments::{self, tokenization}, + refunds, relay, utils as core_utils, webhooks::utils::construct_webhook_router_data, }, db::StorageInterface, @@ -62,6 +62,7 @@ pub async fn incoming_webhooks_wrapper( key_store: domain::MerchantKeyStore, connector_name_or_mca_id: &str, body: actix_web::web::Bytes, + is_relay_webhook: bool, ) -> RouterResponse { let start_instant = Instant::now(); let (application_response, webhooks_response_tracker, serialized_req) = @@ -73,6 +74,7 @@ pub async fn incoming_webhooks_wrapper( key_store, connector_name_or_mca_id, body.clone(), + is_relay_webhook, )) .await?; @@ -118,6 +120,7 @@ pub async fn incoming_webhooks_wrapper( Ok(application_response) } +#[allow(clippy::too_many_arguments)] #[instrument(skip_all)] async fn incoming_webhooks_core( state: SessionState, @@ -127,6 +130,7 @@ async fn incoming_webhooks_core( key_store: domain::MerchantKeyStore, connector_name_or_mca_id: &str, body: actix_web::web::Bytes, + is_relay_webhook: bool, ) -> errors::RouterResult<( services::ApplicationResponse, WebhookResponseTracker, @@ -361,120 +365,162 @@ async fn incoming_webhooks_core( id: profile_id.get_string_repr().to_owned(), })?; - let result_response = match flow_type { - api::WebhookFlow::Payment => Box::pin(payments_incoming_webhook_flow( + // If the incoming webhook is a relay webhook, then we need to trigger the relay webhook flow + let result_response = if is_relay_webhook { + let relay_webhook_response = Box::pin(relay_incoming_webhook_flow( state.clone(), - req_state, merchant_account, business_profile, key_store, webhook_details, - source_verified, - &connector, - &request_details, event_type, - )) - .await - .attach_printable("Incoming webhook flow for payments failed"), - - api::WebhookFlow::Refund => Box::pin(refunds_incoming_webhook_flow( - state.clone(), - merchant_account, - business_profile, - key_store, - webhook_details, - connector_name.as_str(), - source_verified, - event_type, - )) - .await - .attach_printable("Incoming webhook flow for refunds failed"), - - api::WebhookFlow::Dispute => Box::pin(disputes_incoming_webhook_flow( - state.clone(), - merchant_account, - business_profile, - key_store, - webhook_details, - source_verified, - &connector, - &request_details, - event_type, - )) - .await - .attach_printable("Incoming webhook flow for disputes failed"), - - api::WebhookFlow::BankTransfer => Box::pin(bank_transfer_webhook_flow( - state.clone(), - req_state, - merchant_account, - business_profile, - key_store, - webhook_details, source_verified, )) .await - .attach_printable("Incoming bank-transfer webhook flow failed"), + .attach_printable("Incoming webhook flow for relay failed"); - api::WebhookFlow::ReturnResponse => Ok(WebhookResponseTracker::NoEffect), + // Using early return ensures unsupported webhooks are acknowledged to the connector + if let Some(errors::ApiErrorResponse::NotSupported { .. }) = relay_webhook_response + .as_ref() + .err() + .map(|a| a.current_context()) + { + logger::error!( + webhook_payload =? request_details.body, + "Failed while identifying the event type", + ); - api::WebhookFlow::Mandate => Box::pin(mandates_incoming_webhook_flow( - state.clone(), - merchant_account, - business_profile, - key_store, - webhook_details, - source_verified, - event_type, - )) - .await - .attach_printable("Incoming webhook flow for mandates failed"), + let response = connector + .get_webhook_api_response(&request_details, None) + .switch() + .attach_printable( + "Failed while early return in case of not supported event type in relay webhooks", + )?; - api::WebhookFlow::ExternalAuthentication => { - Box::pin(external_authentication_incoming_webhook_flow( + return Ok(( + response, + WebhookResponseTracker::NoEffect, + serde_json::Value::Null, + )); + }; + + relay_webhook_response + } else { + match flow_type { + api::WebhookFlow::Payment => Box::pin(payments_incoming_webhook_flow( + state.clone(), + req_state, + merchant_account, + business_profile, + key_store, + webhook_details, + source_verified, + &connector, + &request_details, + event_type, + )) + .await + .attach_printable("Incoming webhook flow for payments failed"), + + api::WebhookFlow::Refund => Box::pin(refunds_incoming_webhook_flow( + state.clone(), + merchant_account, + business_profile, + key_store, + webhook_details, + connector_name.as_str(), + source_verified, + event_type, + )) + .await + .attach_printable("Incoming webhook flow for refunds failed"), + + api::WebhookFlow::Dispute => Box::pin(disputes_incoming_webhook_flow( + state.clone(), + merchant_account, + business_profile, + key_store, + webhook_details, + source_verified, + &connector, + &request_details, + event_type, + )) + .await + .attach_printable("Incoming webhook flow for disputes failed"), + + api::WebhookFlow::BankTransfer => Box::pin(bank_transfer_webhook_flow( + state.clone(), + req_state, + merchant_account, + business_profile, + key_store, + webhook_details, + source_verified, + )) + .await + .attach_printable("Incoming bank-transfer webhook flow failed"), + + api::WebhookFlow::ReturnResponse => Ok(WebhookResponseTracker::NoEffect), + + api::WebhookFlow::Mandate => Box::pin(mandates_incoming_webhook_flow( + state.clone(), + merchant_account, + business_profile, + key_store, + webhook_details, + source_verified, + event_type, + )) + .await + .attach_printable("Incoming webhook flow for mandates failed"), + + api::WebhookFlow::ExternalAuthentication => { + Box::pin(external_authentication_incoming_webhook_flow( + state.clone(), + req_state, + merchant_account, + key_store, + source_verified, + event_type, + &request_details, + &connector, + object_ref_id, + business_profile, + merchant_connector_account, + )) + .await + .attach_printable("Incoming webhook flow for external authentication failed") + } + api::WebhookFlow::FraudCheck => Box::pin(frm_incoming_webhook_flow( state.clone(), req_state, merchant_account, key_store, source_verified, event_type, - &request_details, - &connector, object_ref_id, business_profile, - merchant_connector_account, )) .await - .attach_printable("Incoming webhook flow for external authentication failed") + .attach_printable("Incoming webhook flow for fraud check failed"), + + #[cfg(feature = "payouts")] + api::WebhookFlow::Payout => Box::pin(payouts_incoming_webhook_flow( + state.clone(), + merchant_account, + business_profile, + key_store, + webhook_details, + event_type, + source_verified, + )) + .await + .attach_printable("Incoming webhook flow for payouts failed"), + + _ => Err(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Unsupported Flow Type received in incoming webhooks"), } - api::WebhookFlow::FraudCheck => Box::pin(frm_incoming_webhook_flow( - state.clone(), - req_state, - merchant_account, - key_store, - source_verified, - event_type, - object_ref_id, - business_profile, - )) - .await - .attach_printable("Incoming webhook flow for fraud check failed"), - - #[cfg(feature = "payouts")] - api::WebhookFlow::Payout => Box::pin(payouts_incoming_webhook_flow( - state.clone(), - merchant_account, - business_profile, - key_store, - webhook_details, - event_type, - source_verified, - )) - .await - .attach_printable("Incoming webhook flow for payouts failed"), - - _ => Err(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Unsupported Flow Type received in incoming webhooks"), }; match result_response { @@ -836,6 +882,97 @@ async fn payouts_incoming_webhook_flow( } } +async fn relay_refunds_incoming_webhook_flow( + state: SessionState, + merchant_account: domain::MerchantAccount, + business_profile: domain::Profile, + merchant_key_store: domain::MerchantKeyStore, + webhook_details: api::IncomingWebhookDetails, + event_type: webhooks::IncomingWebhookEvent, + source_verified: bool, +) -> CustomResult { + let db = &*state.store; + let key_manager_state = &(&state).into(); + + let relay_record = match webhook_details.object_reference_id { + webhooks::ObjectReferenceId::RefundId(refund_id_type) => match refund_id_type { + webhooks::RefundIdType::RefundId(refund_id) => { + let relay_id = common_utils::id_type::RelayId::from_str(&refund_id) + .change_context(errors::ValidationError::IncorrectValueProvided { + field_name: "relay_id", + }) + .change_context(errors::ApiErrorResponse::InternalServerError)?; + + db.find_relay_by_id(key_manager_state, &merchant_key_store, &relay_id) + .await + .to_not_found_response(errors::ApiErrorResponse::WebhookResourceNotFound) + .attach_printable("Failed to fetch the relay record")? + } + webhooks::RefundIdType::ConnectorRefundId(connector_refund_id) => db + .find_relay_by_profile_id_connector_reference_id( + key_manager_state, + &merchant_key_store, + business_profile.get_id(), + &connector_refund_id, + ) + .await + .to_not_found_response(errors::ApiErrorResponse::WebhookResourceNotFound) + .attach_printable("Failed to fetch the relay record")?, + }, + _ => Err(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("received a non-refund id when processing relay refund webhooks")?, + }; + + // if source_verified then update relay status else trigger relay force sync + let relay_response = if source_verified { + let relay_update = hyperswitch_domain_models::relay::RelayUpdate::StatusUpdate { + connector_reference_id: None, + status: common_enums::RelayStatus::foreign_try_from(event_type) + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("failed relay refund status mapping from event type")?, + }; + db.update_relay( + key_manager_state, + &merchant_key_store, + relay_record, + relay_update, + ) + .await + .map(api_models::relay::RelayResponse::from) + .to_not_found_response(errors::ApiErrorResponse::WebhookResourceNotFound) + .attach_printable("Failed to update relay")? + } else { + let relay_retrieve_request = api_models::relay::RelayRetrieveRequest { + force_sync: true, + id: relay_record.id, + }; + let relay_force_sync_response = Box::pin(relay::relay_retrieve( + state, + merchant_account, + Some(business_profile.get_id().clone()), + merchant_key_store, + relay_retrieve_request, + )) + .await + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to force sync relay")?; + + if let hyperswitch_domain_models::api::ApplicationResponse::Json(response) = + relay_force_sync_response + { + response + } else { + Err(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Unexpected response from force sync relay")? + } + }; + + Ok(WebhookResponseTracker::Relay { + relay_id: relay_response.id, + status: relay_response.status, + }) +} + #[allow(clippy::too_many_arguments)] #[instrument(skip_all)] async fn refunds_incoming_webhook_flow( @@ -938,6 +1075,44 @@ async fn refunds_incoming_webhook_flow( }) } +async fn relay_incoming_webhook_flow( + state: SessionState, + merchant_account: domain::MerchantAccount, + business_profile: domain::Profile, + merchant_key_store: domain::MerchantKeyStore, + webhook_details: api::IncomingWebhookDetails, + event_type: webhooks::IncomingWebhookEvent, + source_verified: bool, +) -> CustomResult { + let flow_type: api::WebhookFlow = event_type.into(); + + let result_response = match flow_type { + webhooks::WebhookFlow::Refund => Box::pin(relay_refunds_incoming_webhook_flow( + state, + merchant_account, + business_profile, + merchant_key_store, + webhook_details, + event_type, + source_verified, + )) + .await + .attach_printable("Incoming webhook flow for relay refund failed")?, + webhooks::WebhookFlow::Payment + | webhooks::WebhookFlow::Payout + | webhooks::WebhookFlow::Dispute + | webhooks::WebhookFlow::Subscription + | webhooks::WebhookFlow::ReturnResponse + | webhooks::WebhookFlow::BankTransfer + | webhooks::WebhookFlow::Mandate + | webhooks::WebhookFlow::ExternalAuthentication + | webhooks::WebhookFlow::FraudCheck => Err(errors::ApiErrorResponse::NotSupported { + message: "Relay webhook flow types not supported".to_string(), + })?, + }; + Ok(result_response) +} + async fn get_payment_attempt_from_object_reference_id( state: &SessionState, object_reference_id: webhooks::ObjectReferenceId, diff --git a/crates/router/src/core/webhooks/incoming_v2.rs b/crates/router/src/core/webhooks/incoming_v2.rs index 39b7091e73..5e89f3343b 100644 --- a/crates/router/src/core/webhooks/incoming_v2.rs +++ b/crates/router/src/core/webhooks/incoming_v2.rs @@ -56,6 +56,7 @@ pub async fn incoming_webhooks_wrapper( key_store: domain::MerchantKeyStore, connector_id: &common_utils::id_type::MerchantConnectorAccountId, body: actix_web::web::Bytes, + is_relay_webhook: bool, ) -> RouterResponse { let start_instant = Instant::now(); let (application_response, webhooks_response_tracker, serialized_req) = @@ -68,6 +69,7 @@ pub async fn incoming_webhooks_wrapper( key_store, connector_id, body.clone(), + is_relay_webhook, )) .await?; @@ -124,6 +126,7 @@ async fn incoming_webhooks_core( key_store: domain::MerchantKeyStore, connector_id: &common_utils::id_type::MerchantConnectorAccountId, body: actix_web::web::Bytes, + _is_relay_webhook: bool, ) -> errors::RouterResult<( services::ApplicationResponse, WebhookResponseTracker, diff --git a/crates/router/src/db/relay.rs b/crates/router/src/db/relay.rs index 46259679c5..2ead84019d 100644 --- a/crates/router/src/db/relay.rs +++ b/crates/router/src/db/relay.rs @@ -35,6 +35,14 @@ pub trait RelayInterface { merchant_key_store: &domain::MerchantKeyStore, relay_id: &common_utils::id_type::RelayId, ) -> CustomResult; + + async fn find_relay_by_profile_id_connector_reference_id( + &self, + key_manager_state: &KeyManagerState, + merchant_key_store: &domain::MerchantKeyStore, + profile_id: &common_utils::id_type::ProfileId, + connector_reference_id: &str, + ) -> CustomResult; } #[async_trait::async_trait] @@ -105,6 +113,30 @@ impl RelayInterface for Store { .await .change_context(errors::StorageError::DecryptionError) } + + async fn find_relay_by_profile_id_connector_reference_id( + &self, + key_manager_state: &KeyManagerState, + merchant_key_store: &domain::MerchantKeyStore, + profile_id: &common_utils::id_type::ProfileId, + connector_reference_id: &str, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + diesel_models::relay::Relay::find_by_profile_id_connector_reference_id( + &conn, + profile_id, + connector_reference_id, + ) + .await + .map_err(|error| report!(errors::StorageError::from(error)))? + .convert( + key_manager_state, + merchant_key_store.key.get_inner(), + merchant_key_store.merchant_id.clone().into(), + ) + .await + .change_context(errors::StorageError::DecryptionError) + } } #[async_trait::async_trait] @@ -136,6 +168,16 @@ impl RelayInterface for MockDb { ) -> CustomResult { Err(errors::StorageError::MockDbError)? } + + async fn find_relay_by_profile_id_connector_reference_id( + &self, + _key_manager_state: &KeyManagerState, + _merchant_key_store: &domain::MerchantKeyStore, + _profile_id: &common_utils::id_type::ProfileId, + _connector_reference_id: &str, + ) -> CustomResult { + Err(errors::StorageError::MockDbError)? + } } #[async_trait::async_trait] @@ -178,4 +220,21 @@ impl RelayInterface for KafkaStore { .find_relay_by_id(key_manager_state, merchant_key_store, relay_id) .await } + + async fn find_relay_by_profile_id_connector_reference_id( + &self, + key_manager_state: &KeyManagerState, + merchant_key_store: &domain::MerchantKeyStore, + profile_id: &common_utils::id_type::ProfileId, + connector_reference_id: &str, + ) -> CustomResult { + self.diesel_store + .find_relay_by_profile_id_connector_reference_id( + key_manager_state, + merchant_key_store, + profile_id, + connector_reference_id, + ) + .await + } } diff --git a/crates/router/src/lib.rs b/crates/router/src/lib.rs index 4fe9318f64..8c2bca5a82 100644 --- a/crates/router/src/lib.rs +++ b/crates/router/src/lib.rs @@ -142,6 +142,7 @@ pub fn mk_app( .service(routes::Customers::server(state.clone())) .service(routes::Configs::server(state.clone())) .service(routes::MerchantConnectorAccount::server(state.clone())) + .service(routes::RelayWebhooks::server(state.clone())) .service(routes::Webhooks::server(state.clone())) .service(routes::Relay::server(state.clone())); diff --git a/crates/router/src/routes.rs b/crates/router/src/routes.rs index 462861d331..22f5983e4c 100644 --- a/crates/router/src/routes.rs +++ b/crates/router/src/routes.rs @@ -69,7 +69,7 @@ pub use self::app::{ ApiKeys, AppState, ApplePayCertificatesMigration, Cache, Cards, Configs, ConnectorOnboarding, Customers, Disputes, EphemeralKey, FeatureMatrix, Files, Forex, Gsm, Health, Mandates, MerchantAccount, MerchantConnectorAccount, PaymentLink, PaymentMethods, Payments, Poll, - Profile, ProfileNew, Refunds, Relay, SessionState, User, Webhooks, + Profile, ProfileNew, Refunds, Relay, RelayWebhooks, SessionState, User, Webhooks, }; #[cfg(feature = "olap")] pub use self::app::{Blocklist, Organization, Routing, Verify, WebhookEvents}; diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index f8b5ef4456..2a9561c06c 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -1515,6 +1515,20 @@ impl Webhooks { } } +pub struct RelayWebhooks; + +#[cfg(feature = "oltp")] +impl RelayWebhooks { + pub fn server(state: AppState) -> Scope { + use api_models::webhooks as webhook_type; + web::scope("/webhooks/relay") + .app_data(web::Data::new(state)) + .service(web::resource("/{merchant_id}/{connector_id}").route( + web::post().to(receive_incoming_relay_webhook::), + )) + } +} + #[cfg(all(feature = "oltp", feature = "v2"))] impl Webhooks { pub fn server(config: AppState) -> Scope { diff --git a/crates/router/src/routes/lock_utils.rs b/crates/router/src/routes/lock_utils.rs index 9d7ae1874c..6550778619 100644 --- a/crates/router/src/routes/lock_utils.rs +++ b/crates/router/src/routes/lock_utils.rs @@ -171,6 +171,7 @@ impl From for ApiIdentifier { Flow::FrmFulfillment | Flow::IncomingWebhookReceive + | Flow::IncomingRelayWebhookReceive | Flow::WebhookEventInitialDeliveryAttemptList | Flow::WebhookEventDeliveryAttemptList | Flow::WebhookEventDeliveryRetry => Self::Webhooks, diff --git a/crates/router/src/routes/webhooks.rs b/crates/router/src/routes/webhooks.rs index 5427ac34b4..1b3f28fd25 100644 --- a/crates/router/src/routes/webhooks.rs +++ b/crates/router/src/routes/webhooks.rs @@ -36,6 +36,7 @@ pub async fn receive_incoming_webhook( auth.key_store, &connector_id_or_name, body.clone(), + false, ) }, &auth::MerchantIdAuth(merchant_id), @@ -44,6 +45,89 @@ pub async fn receive_incoming_webhook( .await } +#[cfg(feature = "v1")] +#[instrument(skip_all, fields(flow = ?Flow::IncomingRelayWebhookReceive))] +pub async fn receive_incoming_relay_webhook( + state: web::Data, + req: HttpRequest, + body: web::Bytes, + path: web::Path<( + common_utils::id_type::MerchantId, + common_utils::id_type::MerchantConnectorAccountId, + )>, +) -> impl Responder { + let flow = Flow::IncomingWebhookReceive; + let (merchant_id, connector_id) = path.into_inner(); + let is_relay_webhook = true; + + Box::pin(api::server_wrap( + flow.clone(), + state, + &req, + (), + |state, auth, _, req_state| { + webhooks::incoming_webhooks_wrapper::( + &flow, + state.to_owned(), + req_state, + &req, + auth.merchant_account, + auth.key_store, + connector_id.get_string_repr(), + body.clone(), + is_relay_webhook, + ) + }, + &auth::MerchantIdAuth(merchant_id), + api_locking::LockAction::NotApplicable, + )) + .await +} + +#[cfg(feature = "v2")] +#[instrument(skip_all, fields(flow = ?Flow::IncomingRelayWebhookReceive))] +pub async fn receive_incoming_relay_webhook( + state: web::Data, + req: HttpRequest, + body: web::Bytes, + path: web::Path<( + common_utils::id_type::MerchantId, + common_utils::id_type::ProfileId, + common_utils::id_type::MerchantConnectorAccountId, + )>, +) -> impl Responder { + let flow = Flow::IncomingWebhookReceive; + let (merchant_id, profile_id, connector_id) = path.into_inner(); + let is_relay_webhook = true; + + Box::pin(api::server_wrap( + flow.clone(), + state, + &req, + (), + |state, auth, _, req_state| { + webhooks::incoming_webhooks_wrapper::( + &flow, + state.to_owned(), + req_state, + &req, + auth.merchant_account, + auth.profile, + auth.key_store, + &connector_id, + body.clone(), + is_relay_webhook, + ) + }, + &auth::MerchantIdAndProfileIdAuth { + merchant_id, + profile_id, + }, + api_locking::LockAction::NotApplicable, + )) + .await +} + #[instrument(skip_all, fields(flow = ?Flow::IncomingWebhookReceive))] #[cfg(feature = "v2")] pub async fn receive_incoming_webhook( @@ -75,6 +159,7 @@ pub async fn receive_incoming_webhook( auth.key_store, &connector_id, body.clone(), + false, ) }, &auth::MerchantIdAndProfileIdAuth { diff --git a/crates/router/src/types/transformers.rs b/crates/router/src/types/transformers.rs index bbcdfc535d..cb7d3f78f7 100644 --- a/crates/router/src/types/transformers.rs +++ b/crates/router/src/types/transformers.rs @@ -662,6 +662,22 @@ impl ForeignTryFrom for storage_enum } } +impl ForeignTryFrom for api_enums::RelayStatus { + type Error = errors::ValidationError; + + fn foreign_try_from( + value: api_models::webhooks::IncomingWebhookEvent, + ) -> Result { + match value { + api_models::webhooks::IncomingWebhookEvent::RefundSuccess => Ok(Self::Success), + api_models::webhooks::IncomingWebhookEvent::RefundFailure => Ok(Self::Failure), + _ => Err(errors::ValidationError::IncorrectValueProvided { + field_name: "incoming_webhook_event_type", + }), + } + } +} + #[cfg(feature = "payouts")] impl ForeignTryFrom for storage_enums::PayoutStatus { type Error = errors::ValidationError; diff --git a/crates/router_env/src/logger/types.rs b/crates/router_env/src/logger/types.rs index 10183f75d5..935efa3c78 100644 --- a/crates/router_env/src/logger/types.rs +++ b/crates/router_env/src/logger/types.rs @@ -537,6 +537,8 @@ pub enum Flow { Relay, /// Relay retrieve flow RelayRetrieve, + /// Incoming Relay Webhook Receive + IncomingRelayWebhookReceive, } /// Trait for providing generic behaviour to flow metric diff --git a/migrations/2025-01-07-105739_create_index_for_relay/down.sql b/migrations/2025-01-07-105739_create_index_for_relay/down.sql new file mode 100644 index 0000000000..8a75d44546 --- /dev/null +++ b/migrations/2025-01-07-105739_create_index_for_relay/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +DROP INDEX relay_profile_id_connector_reference_id_index; \ No newline at end of file diff --git a/migrations/2025-01-07-105739_create_index_for_relay/up.sql b/migrations/2025-01-07-105739_create_index_for_relay/up.sql new file mode 100644 index 0000000000..5ebe5983c7 --- /dev/null +++ b/migrations/2025-01-07-105739_create_index_for_relay/up.sql @@ -0,0 +1,2 @@ +-- Your SQL goes here +CREATE UNIQUE INDEX relay_profile_id_connector_reference_id_index ON relay (profile_id, connector_reference_id); \ No newline at end of file