feat(router): add support for relay refund incoming webhooks (#6974)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Shankar Singh C
2025-01-10 13:08:36 +05:30
committed by GitHub
parent 15fd4de0e8
commit d850f17b87
16 changed files with 488 additions and 97 deletions

View File

@ -120,6 +120,10 @@ pub enum WebhookResponseTracker {
status: common_enums::MandateStatus,
},
NoEffect,
Relay {
relay_id: common_utils::id_type::RelayId,
status: common_enums::RelayStatus,
},
}
impl WebhookResponseTracker {
@ -132,6 +136,7 @@ impl WebhookResponseTracker {
Self::NoEffect | Self::Mandate { .. } => None,
#[cfg(feature = "payouts")]
Self::Payout { .. } => None,
Self::Relay { .. } => None,
}
}
@ -144,6 +149,7 @@ impl WebhookResponseTracker {
Self::NoEffect | Self::Mandate { .. } => None,
#[cfg(feature = "payouts")]
Self::Payout { .. } => None,
Self::Relay { .. } => None,
}
}
}

View File

@ -1,3 +1,5 @@
use std::str::FromStr;
crate::id_type!(
RelayId,
"A type for relay_id that can be used for relay ids"
@ -11,3 +13,12 @@ crate::impl_queryable_id_type!(RelayId);
crate::impl_to_sql_from_sql_id_type!(RelayId);
crate::impl_debug_id_type!(RelayId);
impl FromStr for RelayId {
type Err = error_stack::Report<crate::errors::ValidationError>;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let cow_string = std::borrow::Cow::Owned(s.to_string());
Self::try_from(cow_string)
}
}

View File

@ -1,4 +1,4 @@
use diesel::{associations::HasTable, ExpressionMethods};
use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods};
use super::generics;
use crate::{
@ -46,4 +46,18 @@ impl Relay {
)
.await
}
pub async fn find_by_profile_id_connector_reference_id(
conn: &PgPooledConn,
profile_id: &common_utils::id_type::ProfileId,
connector_reference_id: &str,
) -> StorageResult<Self> {
generics::generic_find_one::<<Self as HasTable>::Table, _, _>(
conn,
dsl::profile_id
.eq(profile_id.to_owned())
.and(dsl::connector_reference_id.eq(connector_reference_id.to_owned())),
)
.await
}
}

View File

@ -81,7 +81,7 @@ impl RelayUpdate {
match response {
Err(error) => Self::ErrorUpdate {
error_code: error.code,
error_message: error.message,
error_message: error.reason.unwrap_or(error.message),
status: common_enums::RelayStatus::Failure,
},
Ok(response) => Self::StatusUpdate {

View File

@ -22,9 +22,9 @@ use crate::{
core::{
api_locking,
errors::{self, ConnectorErrorExt, CustomResult, RouterResponse, StorageErrorExt},
metrics, payments,
payments::tokenization,
refunds, utils as core_utils,
metrics,
payments::{self, tokenization},
refunds, relay, utils as core_utils,
webhooks::utils::construct_webhook_router_data,
},
db::StorageInterface,
@ -62,6 +62,7 @@ pub async fn incoming_webhooks_wrapper<W: types::OutgoingWebhookType>(
key_store: domain::MerchantKeyStore,
connector_name_or_mca_id: &str,
body: actix_web::web::Bytes,
is_relay_webhook: bool,
) -> RouterResponse<serde_json::Value> {
let start_instant = Instant::now();
let (application_response, webhooks_response_tracker, serialized_req) =
@ -73,6 +74,7 @@ pub async fn incoming_webhooks_wrapper<W: types::OutgoingWebhookType>(
key_store,
connector_name_or_mca_id,
body.clone(),
is_relay_webhook,
))
.await?;
@ -118,6 +120,7 @@ pub async fn incoming_webhooks_wrapper<W: types::OutgoingWebhookType>(
Ok(application_response)
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn incoming_webhooks_core<W: types::OutgoingWebhookType>(
state: SessionState,
@ -127,6 +130,7 @@ async fn incoming_webhooks_core<W: types::OutgoingWebhookType>(
key_store: domain::MerchantKeyStore,
connector_name_or_mca_id: &str,
body: actix_web::web::Bytes,
is_relay_webhook: bool,
) -> errors::RouterResult<(
services::ApplicationResponse<serde_json::Value>,
WebhookResponseTracker,
@ -361,7 +365,48 @@ async fn incoming_webhooks_core<W: types::OutgoingWebhookType>(
id: profile_id.get_string_repr().to_owned(),
})?;
let result_response = match flow_type {
// If the incoming webhook is a relay webhook, then we need to trigger the relay webhook flow
let result_response = if is_relay_webhook {
let relay_webhook_response = Box::pin(relay_incoming_webhook_flow(
state.clone(),
merchant_account,
business_profile,
key_store,
webhook_details,
event_type,
source_verified,
))
.await
.attach_printable("Incoming webhook flow for relay failed");
// Using early return ensures unsupported webhooks are acknowledged to the connector
if let Some(errors::ApiErrorResponse::NotSupported { .. }) = relay_webhook_response
.as_ref()
.err()
.map(|a| a.current_context())
{
logger::error!(
webhook_payload =? request_details.body,
"Failed while identifying the event type",
);
let response = connector
.get_webhook_api_response(&request_details, None)
.switch()
.attach_printable(
"Failed while early return in case of not supported event type in relay webhooks",
)?;
return Ok((
response,
WebhookResponseTracker::NoEffect,
serde_json::Value::Null,
));
};
relay_webhook_response
} else {
match flow_type {
api::WebhookFlow::Payment => Box::pin(payments_incoming_webhook_flow(
state.clone(),
req_state,
@ -475,6 +520,7 @@ async fn incoming_webhooks_core<W: types::OutgoingWebhookType>(
_ => Err(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Unsupported Flow Type received in incoming webhooks"),
}
};
match result_response {
@ -836,6 +882,97 @@ async fn payouts_incoming_webhook_flow(
}
}
async fn relay_refunds_incoming_webhook_flow(
state: SessionState,
merchant_account: domain::MerchantAccount,
business_profile: domain::Profile,
merchant_key_store: domain::MerchantKeyStore,
webhook_details: api::IncomingWebhookDetails,
event_type: webhooks::IncomingWebhookEvent,
source_verified: bool,
) -> CustomResult<WebhookResponseTracker, errors::ApiErrorResponse> {
let db = &*state.store;
let key_manager_state = &(&state).into();
let relay_record = match webhook_details.object_reference_id {
webhooks::ObjectReferenceId::RefundId(refund_id_type) => match refund_id_type {
webhooks::RefundIdType::RefundId(refund_id) => {
let relay_id = common_utils::id_type::RelayId::from_str(&refund_id)
.change_context(errors::ValidationError::IncorrectValueProvided {
field_name: "relay_id",
})
.change_context(errors::ApiErrorResponse::InternalServerError)?;
db.find_relay_by_id(key_manager_state, &merchant_key_store, &relay_id)
.await
.to_not_found_response(errors::ApiErrorResponse::WebhookResourceNotFound)
.attach_printable("Failed to fetch the relay record")?
}
webhooks::RefundIdType::ConnectorRefundId(connector_refund_id) => db
.find_relay_by_profile_id_connector_reference_id(
key_manager_state,
&merchant_key_store,
business_profile.get_id(),
&connector_refund_id,
)
.await
.to_not_found_response(errors::ApiErrorResponse::WebhookResourceNotFound)
.attach_printable("Failed to fetch the relay record")?,
},
_ => Err(errors::ApiErrorResponse::WebhookProcessingFailure)
.attach_printable("received a non-refund id when processing relay refund webhooks")?,
};
// if source_verified then update relay status else trigger relay force sync
let relay_response = if source_verified {
let relay_update = hyperswitch_domain_models::relay::RelayUpdate::StatusUpdate {
connector_reference_id: None,
status: common_enums::RelayStatus::foreign_try_from(event_type)
.change_context(errors::ApiErrorResponse::WebhookProcessingFailure)
.attach_printable("failed relay refund status mapping from event type")?,
};
db.update_relay(
key_manager_state,
&merchant_key_store,
relay_record,
relay_update,
)
.await
.map(api_models::relay::RelayResponse::from)
.to_not_found_response(errors::ApiErrorResponse::WebhookResourceNotFound)
.attach_printable("Failed to update relay")?
} else {
let relay_retrieve_request = api_models::relay::RelayRetrieveRequest {
force_sync: true,
id: relay_record.id,
};
let relay_force_sync_response = Box::pin(relay::relay_retrieve(
state,
merchant_account,
Some(business_profile.get_id().clone()),
merchant_key_store,
relay_retrieve_request,
))
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to force sync relay")?;
if let hyperswitch_domain_models::api::ApplicationResponse::Json(response) =
relay_force_sync_response
{
response
} else {
Err(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Unexpected response from force sync relay")?
}
};
Ok(WebhookResponseTracker::Relay {
relay_id: relay_response.id,
status: relay_response.status,
})
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn refunds_incoming_webhook_flow(
@ -938,6 +1075,44 @@ async fn refunds_incoming_webhook_flow(
})
}
async fn relay_incoming_webhook_flow(
state: SessionState,
merchant_account: domain::MerchantAccount,
business_profile: domain::Profile,
merchant_key_store: domain::MerchantKeyStore,
webhook_details: api::IncomingWebhookDetails,
event_type: webhooks::IncomingWebhookEvent,
source_verified: bool,
) -> CustomResult<WebhookResponseTracker, errors::ApiErrorResponse> {
let flow_type: api::WebhookFlow = event_type.into();
let result_response = match flow_type {
webhooks::WebhookFlow::Refund => Box::pin(relay_refunds_incoming_webhook_flow(
state,
merchant_account,
business_profile,
merchant_key_store,
webhook_details,
event_type,
source_verified,
))
.await
.attach_printable("Incoming webhook flow for relay refund failed")?,
webhooks::WebhookFlow::Payment
| webhooks::WebhookFlow::Payout
| webhooks::WebhookFlow::Dispute
| webhooks::WebhookFlow::Subscription
| webhooks::WebhookFlow::ReturnResponse
| webhooks::WebhookFlow::BankTransfer
| webhooks::WebhookFlow::Mandate
| webhooks::WebhookFlow::ExternalAuthentication
| webhooks::WebhookFlow::FraudCheck => Err(errors::ApiErrorResponse::NotSupported {
message: "Relay webhook flow types not supported".to_string(),
})?,
};
Ok(result_response)
}
async fn get_payment_attempt_from_object_reference_id(
state: &SessionState,
object_reference_id: webhooks::ObjectReferenceId,

View File

@ -56,6 +56,7 @@ pub async fn incoming_webhooks_wrapper<W: types::OutgoingWebhookType>(
key_store: domain::MerchantKeyStore,
connector_id: &common_utils::id_type::MerchantConnectorAccountId,
body: actix_web::web::Bytes,
is_relay_webhook: bool,
) -> RouterResponse<serde_json::Value> {
let start_instant = Instant::now();
let (application_response, webhooks_response_tracker, serialized_req) =
@ -68,6 +69,7 @@ pub async fn incoming_webhooks_wrapper<W: types::OutgoingWebhookType>(
key_store,
connector_id,
body.clone(),
is_relay_webhook,
))
.await?;
@ -124,6 +126,7 @@ async fn incoming_webhooks_core<W: types::OutgoingWebhookType>(
key_store: domain::MerchantKeyStore,
connector_id: &common_utils::id_type::MerchantConnectorAccountId,
body: actix_web::web::Bytes,
_is_relay_webhook: bool,
) -> errors::RouterResult<(
services::ApplicationResponse<serde_json::Value>,
WebhookResponseTracker,

View File

@ -35,6 +35,14 @@ pub trait RelayInterface {
merchant_key_store: &domain::MerchantKeyStore,
relay_id: &common_utils::id_type::RelayId,
) -> CustomResult<hyperswitch_domain_models::relay::Relay, errors::StorageError>;
async fn find_relay_by_profile_id_connector_reference_id(
&self,
key_manager_state: &KeyManagerState,
merchant_key_store: &domain::MerchantKeyStore,
profile_id: &common_utils::id_type::ProfileId,
connector_reference_id: &str,
) -> CustomResult<hyperswitch_domain_models::relay::Relay, errors::StorageError>;
}
#[async_trait::async_trait]
@ -105,6 +113,30 @@ impl RelayInterface for Store {
.await
.change_context(errors::StorageError::DecryptionError)
}
async fn find_relay_by_profile_id_connector_reference_id(
&self,
key_manager_state: &KeyManagerState,
merchant_key_store: &domain::MerchantKeyStore,
profile_id: &common_utils::id_type::ProfileId,
connector_reference_id: &str,
) -> CustomResult<hyperswitch_domain_models::relay::Relay, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
diesel_models::relay::Relay::find_by_profile_id_connector_reference_id(
&conn,
profile_id,
connector_reference_id,
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))?
.convert(
key_manager_state,
merchant_key_store.key.get_inner(),
merchant_key_store.merchant_id.clone().into(),
)
.await
.change_context(errors::StorageError::DecryptionError)
}
}
#[async_trait::async_trait]
@ -136,6 +168,16 @@ impl RelayInterface for MockDb {
) -> CustomResult<hyperswitch_domain_models::relay::Relay, errors::StorageError> {
Err(errors::StorageError::MockDbError)?
}
async fn find_relay_by_profile_id_connector_reference_id(
&self,
_key_manager_state: &KeyManagerState,
_merchant_key_store: &domain::MerchantKeyStore,
_profile_id: &common_utils::id_type::ProfileId,
_connector_reference_id: &str,
) -> CustomResult<hyperswitch_domain_models::relay::Relay, errors::StorageError> {
Err(errors::StorageError::MockDbError)?
}
}
#[async_trait::async_trait]
@ -178,4 +220,21 @@ impl RelayInterface for KafkaStore {
.find_relay_by_id(key_manager_state, merchant_key_store, relay_id)
.await
}
async fn find_relay_by_profile_id_connector_reference_id(
&self,
key_manager_state: &KeyManagerState,
merchant_key_store: &domain::MerchantKeyStore,
profile_id: &common_utils::id_type::ProfileId,
connector_reference_id: &str,
) -> CustomResult<hyperswitch_domain_models::relay::Relay, errors::StorageError> {
self.diesel_store
.find_relay_by_profile_id_connector_reference_id(
key_manager_state,
merchant_key_store,
profile_id,
connector_reference_id,
)
.await
}
}

View File

@ -142,6 +142,7 @@ pub fn mk_app(
.service(routes::Customers::server(state.clone()))
.service(routes::Configs::server(state.clone()))
.service(routes::MerchantConnectorAccount::server(state.clone()))
.service(routes::RelayWebhooks::server(state.clone()))
.service(routes::Webhooks::server(state.clone()))
.service(routes::Relay::server(state.clone()));

View File

@ -69,7 +69,7 @@ pub use self::app::{
ApiKeys, AppState, ApplePayCertificatesMigration, Cache, Cards, Configs, ConnectorOnboarding,
Customers, Disputes, EphemeralKey, FeatureMatrix, Files, Forex, Gsm, Health, Mandates,
MerchantAccount, MerchantConnectorAccount, PaymentLink, PaymentMethods, Payments, Poll,
Profile, ProfileNew, Refunds, Relay, SessionState, User, Webhooks,
Profile, ProfileNew, Refunds, Relay, RelayWebhooks, SessionState, User, Webhooks,
};
#[cfg(feature = "olap")]
pub use self::app::{Blocklist, Organization, Routing, Verify, WebhookEvents};

View File

@ -1515,6 +1515,20 @@ impl Webhooks {
}
}
pub struct RelayWebhooks;
#[cfg(feature = "oltp")]
impl RelayWebhooks {
pub fn server(state: AppState) -> Scope {
use api_models::webhooks as webhook_type;
web::scope("/webhooks/relay")
.app_data(web::Data::new(state))
.service(web::resource("/{merchant_id}/{connector_id}").route(
web::post().to(receive_incoming_relay_webhook::<webhook_type::OutgoingWebhook>),
))
}
}
#[cfg(all(feature = "oltp", feature = "v2"))]
impl Webhooks {
pub fn server(config: AppState) -> Scope {

View File

@ -171,6 +171,7 @@ impl From<Flow> for ApiIdentifier {
Flow::FrmFulfillment
| Flow::IncomingWebhookReceive
| Flow::IncomingRelayWebhookReceive
| Flow::WebhookEventInitialDeliveryAttemptList
| Flow::WebhookEventDeliveryAttemptList
| Flow::WebhookEventDeliveryRetry => Self::Webhooks,

View File

@ -36,6 +36,7 @@ pub async fn receive_incoming_webhook<W: types::OutgoingWebhookType>(
auth.key_store,
&connector_id_or_name,
body.clone(),
false,
)
},
&auth::MerchantIdAuth(merchant_id),
@ -44,6 +45,89 @@ pub async fn receive_incoming_webhook<W: types::OutgoingWebhookType>(
.await
}
#[cfg(feature = "v1")]
#[instrument(skip_all, fields(flow = ?Flow::IncomingRelayWebhookReceive))]
pub async fn receive_incoming_relay_webhook<W: types::OutgoingWebhookType>(
state: web::Data<AppState>,
req: HttpRequest,
body: web::Bytes,
path: web::Path<(
common_utils::id_type::MerchantId,
common_utils::id_type::MerchantConnectorAccountId,
)>,
) -> impl Responder {
let flow = Flow::IncomingWebhookReceive;
let (merchant_id, connector_id) = path.into_inner();
let is_relay_webhook = true;
Box::pin(api::server_wrap(
flow.clone(),
state,
&req,
(),
|state, auth, _, req_state| {
webhooks::incoming_webhooks_wrapper::<W>(
&flow,
state.to_owned(),
req_state,
&req,
auth.merchant_account,
auth.key_store,
connector_id.get_string_repr(),
body.clone(),
is_relay_webhook,
)
},
&auth::MerchantIdAuth(merchant_id),
api_locking::LockAction::NotApplicable,
))
.await
}
#[cfg(feature = "v2")]
#[instrument(skip_all, fields(flow = ?Flow::IncomingRelayWebhookReceive))]
pub async fn receive_incoming_relay_webhook<W: types::OutgoingWebhookType>(
state: web::Data<AppState>,
req: HttpRequest,
body: web::Bytes,
path: web::Path<(
common_utils::id_type::MerchantId,
common_utils::id_type::ProfileId,
common_utils::id_type::MerchantConnectorAccountId,
)>,
) -> impl Responder {
let flow = Flow::IncomingWebhookReceive;
let (merchant_id, profile_id, connector_id) = path.into_inner();
let is_relay_webhook = true;
Box::pin(api::server_wrap(
flow.clone(),
state,
&req,
(),
|state, auth, _, req_state| {
webhooks::incoming_webhooks_wrapper::<W>(
&flow,
state.to_owned(),
req_state,
&req,
auth.merchant_account,
auth.profile,
auth.key_store,
&connector_id,
body.clone(),
is_relay_webhook,
)
},
&auth::MerchantIdAndProfileIdAuth {
merchant_id,
profile_id,
},
api_locking::LockAction::NotApplicable,
))
.await
}
#[instrument(skip_all, fields(flow = ?Flow::IncomingWebhookReceive))]
#[cfg(feature = "v2")]
pub async fn receive_incoming_webhook<W: types::OutgoingWebhookType>(
@ -75,6 +159,7 @@ pub async fn receive_incoming_webhook<W: types::OutgoingWebhookType>(
auth.key_store,
&connector_id,
body.clone(),
false,
)
},
&auth::MerchantIdAndProfileIdAuth {

View File

@ -662,6 +662,22 @@ impl ForeignTryFrom<api_models::webhooks::IncomingWebhookEvent> for storage_enum
}
}
impl ForeignTryFrom<api_models::webhooks::IncomingWebhookEvent> for api_enums::RelayStatus {
type Error = errors::ValidationError;
fn foreign_try_from(
value: api_models::webhooks::IncomingWebhookEvent,
) -> Result<Self, Self::Error> {
match value {
api_models::webhooks::IncomingWebhookEvent::RefundSuccess => Ok(Self::Success),
api_models::webhooks::IncomingWebhookEvent::RefundFailure => Ok(Self::Failure),
_ => Err(errors::ValidationError::IncorrectValueProvided {
field_name: "incoming_webhook_event_type",
}),
}
}
}
#[cfg(feature = "payouts")]
impl ForeignTryFrom<api_models::webhooks::IncomingWebhookEvent> for storage_enums::PayoutStatus {
type Error = errors::ValidationError;

View File

@ -537,6 +537,8 @@ pub enum Flow {
Relay,
/// Relay retrieve flow
RelayRetrieve,
/// Incoming Relay Webhook Receive
IncomingRelayWebhookReceive,
}
/// Trait for providing generic behaviour to flow metric

View File

@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
DROP INDEX relay_profile_id_connector_reference_id_index;

View File

@ -0,0 +1,2 @@
-- Your SQL goes here
CREATE UNIQUE INDEX relay_profile_id_connector_reference_id_index ON relay (profile_id, connector_reference_id);