feat(core): Adding integration for webhooks through UCS (#8814)

This commit is contained in:
Amitsingh Tanwar
2025-08-08 19:51:09 +05:30
committed by GitHub
parent ef27ac5c3d
commit 06dc66c62e
5 changed files with 921 additions and 344 deletions

View File

@ -4188,6 +4188,15 @@ impl MerchantConnectorAccountType {
Self::CacheVal(_) => None,
}
}
pub fn get_webhook_details(
&self,
) -> CustomResult<Option<&masking::Secret<serde_json::Value>>, errors::ApiErrorResponse> {
match self {
Self::DbVal(db_val) => Ok(db_val.connector_webhook_details.as_ref()),
Self::CacheVal(_) => Ok(None),
}
}
}
/// Query for merchant connector account either by business label or profile id

View File

@ -1,3 +1,4 @@
use api_models::admin;
use common_enums::{AttemptStatus, PaymentMethodType};
use common_utils::{errors::CustomResult, ext_traits::ValueExt};
use error_stack::ResultExt;
@ -13,6 +14,7 @@ use hyperswitch_domain_models::{
router_response_types::PaymentsResponseData,
};
use masking::{ExposeInterface, PeekInterface, Secret};
use router_env::logger;
use unified_connector_service_client::payments::{
self as payments_grpc, payment_method::PaymentMethod, CardDetails, CardPaymentMethodType,
PaymentServiceAuthorizeResponse,
@ -21,7 +23,7 @@ use unified_connector_service_client::payments::{
use crate::{
consts,
core::{
errors::RouterResult,
errors::{ApiErrorResponse, RouterResult},
payments::helpers::{
is_ucs_enabled, should_execute_based_on_rollout, MerchantConnectorAccountType,
},
@ -29,9 +31,13 @@ use crate::{
},
routes::SessionState,
types::transformers::ForeignTryFrom,
utils,
};
mod transformers;
pub mod transformers;
// Re-export webhook transformer types for easier access
pub use transformers::WebhookTransformData;
pub async fn should_call_unified_connector_service<F: Clone, T>(
state: &SessionState,
@ -80,6 +86,42 @@ pub async fn should_call_unified_connector_service<F: Clone, T>(
Ok(should_execute)
}
pub async fn should_call_unified_connector_service_for_webhooks(
state: &SessionState,
merchant_context: &MerchantContext,
connector_name: &str,
) -> RouterResult<bool> {
if state.grpc_client.unified_connector_service_client.is_none() {
logger::debug!(
connector = connector_name.to_string(),
"Unified Connector Service client is not available for webhooks"
);
return Ok(false);
}
let ucs_config_key = consts::UCS_ENABLED;
if !is_ucs_enabled(state, ucs_config_key).await {
return Ok(false);
}
let merchant_id = merchant_context
.get_merchant_account()
.get_id()
.get_string_repr();
let config_key = format!(
"{}_{}_{}_Webhooks",
consts::UCS_ROLLOUT_PERCENT_CONFIG_PREFIX,
merchant_id,
connector_name
);
let should_execute = should_execute_based_on_rollout(state, &config_key).await?;
Ok(should_execute)
}
pub fn build_unified_connector_service_payment_method(
payment_method_data: hyperswitch_domain_models::payment_method_data::PaymentMethodData,
payment_method_type: PaymentMethodType,
@ -317,3 +359,161 @@ pub fn handle_unified_connector_service_response_for_payment_repeat(
Ok((status, router_data_response, status_code))
}
pub fn build_webhook_secrets_from_merchant_connector_account(
#[cfg(feature = "v1")] merchant_connector_account: &MerchantConnectorAccountType,
#[cfg(feature = "v2")] merchant_connector_account: &MerchantConnectorAccountTypeDetails,
) -> CustomResult<Option<payments_grpc::WebhookSecrets>, UnifiedConnectorServiceError> {
// Extract webhook credentials from merchant connector account
// This depends on how webhook secrets are stored in the merchant connector account
#[cfg(feature = "v1")]
let webhook_details = merchant_connector_account
.get_webhook_details()
.map_err(|_| UnifiedConnectorServiceError::FailedToObtainAuthType)?;
#[cfg(feature = "v2")]
let webhook_details = match merchant_connector_account {
MerchantConnectorAccountTypeDetails::MerchantConnectorAccount(mca) => {
mca.connector_webhook_details.as_ref()
}
MerchantConnectorAccountTypeDetails::MerchantConnectorDetails(_) => None,
};
match webhook_details {
Some(details) => {
// Parse the webhook details JSON to extract secrets
let webhook_details: admin::MerchantConnectorWebhookDetails = details
.clone()
.parse_value("MerchantConnectorWebhookDetails")
.change_context(UnifiedConnectorServiceError::FailedToObtainAuthType)
.attach_printable("Failed to parse MerchantConnectorWebhookDetails")?;
// Build gRPC WebhookSecrets from parsed details
Ok(Some(payments_grpc::WebhookSecrets {
secret: webhook_details.merchant_secret.expose().to_string(),
additional_secret: webhook_details
.additional_secret
.map(|secret| secret.expose().to_string()),
}))
}
None => Ok(None),
}
}
/// High-level abstraction for calling UCS webhook transformation
/// This provides a clean interface similar to payment flow UCS calls
pub async fn call_unified_connector_service_for_webhook(
state: &SessionState,
merchant_context: &MerchantContext,
connector_name: &str,
body: &actix_web::web::Bytes,
request_details: &hyperswitch_interfaces::webhooks::IncomingWebhookRequestDetails<'_>,
merchant_connector_account: Option<
&hyperswitch_domain_models::merchant_connector_account::MerchantConnectorAccount,
>,
) -> RouterResult<(
api_models::webhooks::IncomingWebhookEvent,
bool,
WebhookTransformData,
)> {
let ucs_client = state
.grpc_client
.unified_connector_service_client
.as_ref()
.ok_or_else(|| {
error_stack::report!(ApiErrorResponse::WebhookProcessingFailure)
.attach_printable("UCS client is not available for webhook processing")
})?;
// Build webhook secrets from merchant connector account
let webhook_secrets = merchant_connector_account.and_then(|mca| {
#[cfg(feature = "v1")]
let mca_type = MerchantConnectorAccountType::DbVal(Box::new(mca.clone()));
#[cfg(feature = "v2")]
let mca_type =
MerchantConnectorAccountTypeDetails::MerchantConnectorAccount(Box::new(mca.clone()));
build_webhook_secrets_from_merchant_connector_account(&mca_type)
.map_err(|e| {
logger::warn!(
build_error=?e,
connector_name=connector_name,
"Failed to build webhook secrets from merchant connector account in call_unified_connector_service_for_webhook"
);
e
})
.ok()
.flatten()
});
// Build UCS transform request using new webhook transformers
let transform_request = transformers::build_webhook_transform_request(
body,
request_details,
webhook_secrets,
merchant_context
.get_merchant_account()
.get_id()
.get_string_repr(),
connector_name,
)?;
// Build connector auth metadata
let connector_auth_metadata = merchant_connector_account
.map(|mca| {
#[cfg(feature = "v1")]
let mca_type = MerchantConnectorAccountType::DbVal(Box::new(mca.clone()));
#[cfg(feature = "v2")]
let mca_type = MerchantConnectorAccountTypeDetails::MerchantConnectorAccount(Box::new(
mca.clone(),
));
build_unified_connector_service_auth_metadata(mca_type, merchant_context)
})
.transpose()
.change_context(ApiErrorResponse::InternalServerError)
.attach_printable("Failed to build UCS auth metadata")?
.ok_or_else(|| {
error_stack::report!(ApiErrorResponse::InternalServerError).attach_printable(
"Missing merchant connector account for UCS webhook transformation",
)
})?;
// Build gRPC headers
let grpc_headers = external_services::grpc_client::GrpcHeaders {
tenant_id: state.tenant.tenant_id.get_string_repr().to_string(),
request_id: Some(utils::generate_id(consts::ID_LENGTH, "webhook_req")),
};
// Make UCS call - client availability already verified
match ucs_client
.transform_incoming_webhook(transform_request, connector_auth_metadata, grpc_headers)
.await
{
Ok(response) => {
let transform_response = response.into_inner();
let transform_data = transformers::transform_ucs_webhook_response(transform_response)?;
// UCS handles everything internally - event type, source verification, decoding
Ok((
transform_data.event_type,
transform_data.source_verified,
transform_data,
))
}
Err(err) => {
// When UCS is configured, we don't fall back to direct connector processing
Err(ApiErrorResponse::WebhookProcessingFailure)
.attach_printable(format!("UCS webhook processing failed: {err}"))
}
}
}
/// Extract webhook content from UCS response for further processing
/// This provides a helper function to extract specific data from UCS responses
pub fn extract_webhook_content_from_ucs_response(
transform_data: &WebhookTransformData,
) -> Option<&unified_connector_service_client::payments::WebhookResponseContent> {
transform_data.webhook_content.as_ref()
}

View File

@ -17,11 +17,14 @@ use hyperswitch_domain_models::{
};
use masking::{ExposeInterface, PeekInterface};
use router_env::tracing;
use unified_connector_service_client::payments::{self as payments_grpc, Identifier};
use unified_connector_service_client::payments::{
self as payments_grpc, Identifier, PaymentServiceTransformRequest,
PaymentServiceTransformResponse,
};
use url::Url;
use crate::{
core::unified_connector_service::build_unified_connector_service_payment_method,
core::{errors, unified_connector_service::build_unified_connector_service_payment_method},
types::transformers::ForeignTryFrom,
};
impl ForeignTryFrom<&RouterData<PSync, PaymentsSyncData, PaymentsResponseData>>
@ -39,6 +42,13 @@ impl ForeignTryFrom<&RouterData<PSync, PaymentsSyncData, PaymentsResponseData>>
.map(|id| Identifier {
id_type: Some(payments_grpc::identifier::IdType::Id(id)),
})
.map_err(|e| {
tracing::debug!(
transaction_id_error=?e,
"Failed to extract connector transaction ID for UCS payment sync request"
);
e
})
.ok();
let connector_ref_id = router_data
@ -670,6 +680,7 @@ impl ForeignTryFrom<common_enums::CardNetwork> for payments_grpc::CardNetwork {
common_enums::CardNetwork::UnionPay => Ok(Self::Unionpay),
common_enums::CardNetwork::RuPay => Ok(Self::Rupay),
common_enums::CardNetwork::Maestro => Ok(Self::Maestro),
common_enums::CardNetwork::AmericanExpress => Ok(Self::Amex),
_ => Err(
UnifiedConnectorServiceError::RequestEncodingFailedWithReason(
"Card Network not supported".to_string(),
@ -1003,6 +1014,113 @@ impl ForeignTryFrom<common_types::payments::CustomerAcceptance>
}
}
impl ForeignTryFrom<&hyperswitch_interfaces::webhooks::IncomingWebhookRequestDetails<'_>>
for payments_grpc::RequestDetails
{
type Error = error_stack::Report<UnifiedConnectorServiceError>;
fn foreign_try_from(
request_details: &hyperswitch_interfaces::webhooks::IncomingWebhookRequestDetails<'_>,
) -> Result<Self, Self::Error> {
let headers_map = request_details
.headers
.iter()
.map(|(key, value)| {
let value_string = value.to_str().unwrap_or_default().to_string();
(key.as_str().to_string(), value_string)
})
.collect();
Ok(Self {
method: 1, // POST method for webhooks
uri: Some({
let uri_result = request_details
.headers
.get("x-forwarded-path")
.and_then(|h| h.to_str().map_err(|e| {
tracing::warn!(
header_conversion_error=?e,
header_value=?h,
"Failed to convert x-forwarded-path header to string for webhook processing"
);
e
}).ok());
uri_result.unwrap_or_else(|| {
tracing::debug!("x-forwarded-path header not found or invalid, using default '/Unknown'");
"/Unknown"
}).to_string()
}),
body: request_details.body.to_vec(),
headers: headers_map,
query_params: Some(request_details.query_params.clone()),
})
}
}
/// Webhook transform data structure containing UCS response information
pub struct WebhookTransformData {
pub event_type: api_models::webhooks::IncomingWebhookEvent,
pub source_verified: bool,
pub webhook_content: Option<payments_grpc::WebhookResponseContent>,
pub response_ref_id: Option<String>,
}
/// Transform UCS webhook response into webhook event data
pub fn transform_ucs_webhook_response(
response: PaymentServiceTransformResponse,
) -> Result<WebhookTransformData, error_stack::Report<errors::ApiErrorResponse>> {
let event_type = match response.event_type {
0 => api_models::webhooks::IncomingWebhookEvent::PaymentIntentSuccess,
1 => api_models::webhooks::IncomingWebhookEvent::PaymentIntentFailure,
2 => api_models::webhooks::IncomingWebhookEvent::PaymentIntentProcessing,
3 => api_models::webhooks::IncomingWebhookEvent::PaymentIntentCancelled,
4 => api_models::webhooks::IncomingWebhookEvent::RefundSuccess,
5 => api_models::webhooks::IncomingWebhookEvent::RefundFailure,
6 => api_models::webhooks::IncomingWebhookEvent::MandateRevoked,
_ => api_models::webhooks::IncomingWebhookEvent::EventNotSupported,
};
Ok(WebhookTransformData {
event_type,
source_verified: response.source_verified,
webhook_content: response.content,
response_ref_id: response.response_ref_id.and_then(|identifier| {
identifier.id_type.and_then(|id_type| match id_type {
payments_grpc::identifier::IdType::Id(id) => Some(id),
payments_grpc::identifier::IdType::EncodedData(encoded_data) => Some(encoded_data),
payments_grpc::identifier::IdType::NoResponseIdMarker(_) => None,
})
}),
})
}
/// Build UCS webhook transform request from webhook components
pub fn build_webhook_transform_request(
_webhook_body: &[u8],
request_details: &hyperswitch_interfaces::webhooks::IncomingWebhookRequestDetails<'_>,
webhook_secrets: Option<payments_grpc::WebhookSecrets>,
merchant_id: &str,
connector_id: &str,
) -> Result<PaymentServiceTransformRequest, error_stack::Report<errors::ApiErrorResponse>> {
let request_details_grpc = payments_grpc::RequestDetails::foreign_try_from(request_details)
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to transform webhook request details to gRPC format")?;
Ok(PaymentServiceTransformRequest {
request_ref_id: Some(Identifier {
id_type: Some(payments_grpc::identifier::IdType::Id(format!(
"{}_{}_{}",
merchant_id,
connector_id,
time::OffsetDateTime::now_utc().unix_timestamp()
))),
}),
request_details: Some(request_details_grpc),
webhook_secrets,
})
}
pub fn convert_connector_service_status_code(
status_code: u32,
) -> Result<u16, error_stack::Report<UnifiedConnectorServiceError>> {

View File

@ -30,7 +30,7 @@ use crate::{
errors::{self, ConnectorErrorExt, CustomResult, RouterResponse, StorageErrorExt},
metrics, payment_methods,
payments::{self, tokenization},
refunds, relay, utils as core_utils,
refunds, relay, unified_connector_service, utils as core_utils,
webhooks::{network_tokenization_incoming, utils::construct_webhook_router_data},
},
db::StorageInterface,
@ -202,8 +202,7 @@ async fn incoming_webhooks_core<W: types::OutgoingWebhookType>(
WebhookResponseTracker,
serde_json::Value,
)> {
let key_manager_state = &(&state).into();
// Initial setup and metrics
metrics::WEBHOOK_INCOMING_COUNT.add(
1,
router_env::metric_attributes!((
@ -211,7 +210,8 @@ async fn incoming_webhooks_core<W: types::OutgoingWebhookType>(
merchant_context.get_merchant_account().get_id().clone()
)),
);
let mut request_details = IncomingWebhookRequestDetails {
let request_details = IncomingWebhookRequestDetails {
method: req.method().clone(),
uri: req.uri().clone(),
headers: req.headers(),
@ -220,31 +220,211 @@ async fn incoming_webhooks_core<W: types::OutgoingWebhookType>(
};
// Fetch the merchant connector account to get the webhooks source secret
// `webhooks source secret` is a secret shared between the merchant and connector
// This is used for source verification and webhooks integrity
let (merchant_connector_account, connector, connector_name) =
fetch_optional_mca_and_connector(&state, &merchant_context, connector_name_or_mca_id)
.await?;
let decoded_body = connector
.decode_webhook_body(
&request_details,
merchant_context.get_merchant_account().get_id(),
merchant_connector_account
.clone()
.and_then(|merchant_connector_account| {
merchant_connector_account.connector_webhook_details
}),
connector_name.as_str(),
// Determine webhook processing path (UCS vs non-UCS) and handle event type extraction
let webhook_processing_result =
if unified_connector_service::should_call_unified_connector_service_for_webhooks(
&state,
&merchant_context,
&connector_name,
)
.await
.await?
{
logger::info!(
connector = connector_name,
"Using Unified Connector Service for webhook processing",
);
process_ucs_webhook_transform(
&state,
&merchant_context,
&connector_name,
&body,
&request_details,
merchant_connector_account.as_ref(),
)
.await?
} else {
// NON-UCS PATH: Need to decode body first
let decoded_body = connector
.decode_webhook_body(
&request_details,
merchant_context.get_merchant_account().get_id(),
merchant_connector_account
.and_then(|mca| mca.connector_webhook_details.clone()),
&connector_name,
)
.await
.switch()
.attach_printable("There was an error in incoming webhook body decoding")?;
process_non_ucs_webhook(
&state,
&merchant_context,
&connector,
&connector_name,
decoded_body.into(),
&request_details,
)
.await?
};
// Update request_details with the appropriate body (decoded for non-UCS, raw for UCS)
let final_request_details = match &webhook_processing_result.decoded_body {
Some(decoded_body) => IncomingWebhookRequestDetails {
method: request_details.method.clone(),
uri: request_details.uri.clone(),
headers: request_details.headers,
query_params: request_details.query_params.clone(),
body: decoded_body,
},
None => request_details, // Use original request details for UCS
};
logger::info!(event_type=?webhook_processing_result.event_type);
// Check if webhook should be processed further
let is_webhook_event_supported = !matches!(
webhook_processing_result.event_type,
webhooks::IncomingWebhookEvent::EventNotSupported
);
let is_webhook_event_enabled = !utils::is_webhook_event_disabled(
&*state.clone().store,
connector_name.as_str(),
merchant_context.get_merchant_account().get_id(),
&webhook_processing_result.event_type,
)
.await;
let flow_type: api::WebhookFlow = webhook_processing_result.event_type.into();
let process_webhook_further = is_webhook_event_enabled
&& is_webhook_event_supported
&& !matches!(flow_type, api::WebhookFlow::ReturnResponse);
logger::info!(process_webhook=?process_webhook_further);
let mut event_object: Box<dyn masking::ErasedMaskSerialize> = Box::new(serde_json::Value::Null);
let webhook_effect = match process_webhook_further {
true => {
let business_logic_result = process_webhook_business_logic(
&state,
req_state,
&merchant_context,
&connector,
&connector_name,
webhook_processing_result.event_type,
webhook_processing_result.source_verified,
&webhook_processing_result.transform_data,
&final_request_details,
is_relay_webhook,
)
.await;
match business_logic_result {
Ok(response) => {
// Extract event object for serialization
event_object = extract_webhook_event_object(
&webhook_processing_result.transform_data,
&connector,
&final_request_details,
)?;
response
}
Err(error) => {
let error_result = handle_incoming_webhook_error(
error,
&connector,
connector_name.as_str(),
&final_request_details,
);
match error_result {
Ok((_, webhook_tracker, _)) => webhook_tracker,
Err(e) => return Err(e),
}
}
}
}
false => {
metrics::WEBHOOK_INCOMING_FILTERED_COUNT.add(
1,
router_env::metric_attributes!((
MERCHANT_ID,
merchant_context.get_merchant_account().get_id().clone()
)),
);
WebhookResponseTracker::NoEffect
}
};
// Generate response
let response = connector
.get_webhook_api_response(&final_request_details, None)
.switch()
.attach_printable("There was an error in incoming webhook body decoding")?;
.attach_printable("Could not get incoming webhook api response from connector")?;
request_details.body = &decoded_body;
let serialized_request = event_object
.masked_serialize()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Could not convert webhook effect to string")?;
let event_type = match connector
.get_webhook_event_type(&request_details)
Ok((response, webhook_effect, serialized_request))
}
/// Process UCS webhook transformation using the high-level UCS abstraction
async fn process_ucs_webhook_transform(
state: &SessionState,
merchant_context: &domain::MerchantContext,
connector_name: &str,
body: &actix_web::web::Bytes,
request_details: &IncomingWebhookRequestDetails<'_>,
merchant_connector_account: Option<&domain::MerchantConnectorAccount>,
) -> errors::RouterResult<WebhookProcessingResult> {
// Use the new UCS abstraction which provides clean separation
let (event_type, source_verified, transform_data) =
unified_connector_service::call_unified_connector_service_for_webhook(
state,
merchant_context,
connector_name,
body,
request_details,
merchant_connector_account,
)
.await?;
Ok(WebhookProcessingResult {
event_type,
source_verified,
transform_data: Some(Box::new(transform_data)),
decoded_body: None, // UCS path uses raw body
})
}
/// Result type for webhook processing path determination
pub struct WebhookProcessingResult {
event_type: webhooks::IncomingWebhookEvent,
source_verified: bool,
transform_data: Option<Box<unified_connector_service::WebhookTransformData>>,
decoded_body: Option<actix_web::web::Bytes>,
}
/// Process non-UCS webhook using traditional connector processing
async fn process_non_ucs_webhook(
state: &SessionState,
merchant_context: &domain::MerchantContext,
connector: &ConnectorEnum,
connector_name: &str,
decoded_body: actix_web::web::Bytes,
request_details: &IncomingWebhookRequestDetails<'_>,
) -> errors::RouterResult<WebhookProcessingResult> {
// Create request_details with decoded body for connector processing
let updated_request_details = IncomingWebhookRequestDetails {
method: request_details.method.clone(),
uri: request_details.uri.clone(),
headers: request_details.headers,
query_params: request_details.query_params.clone(),
body: &decoded_body,
};
match connector
.get_webhook_event_type(&updated_request_details)
.allow_webhook_event_type_not_found(
state
.clone()
@ -257,14 +437,13 @@ async fn incoming_webhooks_core<W: types::OutgoingWebhookType>(
.switch()
.attach_printable("Could not find event type in incoming webhook body")?
{
Some(event_type) => event_type,
// Early return allows us to acknowledge the webhooks that we do not support
Some(event_type) => Ok(WebhookProcessingResult {
event_type,
source_verified: false,
transform_data: None,
decoded_body: Some(decoded_body),
}),
None => {
logger::error!(
webhook_payload =? request_details.body,
"Failed while identifying the event type",
);
metrics::WEBHOOK_EVENT_TYPE_IDENTIFICATION_FAILURE_COUNT.add(
1,
router_env::metric_attributes!(
@ -272,94 +451,101 @@ async fn incoming_webhooks_core<W: types::OutgoingWebhookType>(
MERCHANT_ID,
merchant_context.get_merchant_account().get_id().clone()
),
("connector", connector_name)
("connector", connector_name.to_string())
),
);
Err(errors::ApiErrorResponse::WebhookProcessingFailure)
.attach_printable("Failed to identify event type in incoming webhook body")
}
}
}
let response = connector
.get_webhook_api_response(&request_details, None)
/// Extract webhook event object based on transform data availability
fn extract_webhook_event_object(
transform_data: &Option<Box<unified_connector_service::WebhookTransformData>>,
connector: &ConnectorEnum,
request_details: &IncomingWebhookRequestDetails<'_>,
) -> errors::RouterResult<Box<dyn masking::ErasedMaskSerialize>> {
match transform_data {
Some(transform_data) => match &transform_data.webhook_content {
Some(webhook_content) => {
let serialized_value = serde_json::to_value(webhook_content)
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to serialize UCS webhook content")?;
Ok(Box::new(serialized_value))
}
None => connector
.get_webhook_resource_object(request_details)
.switch()
.attach_printable("Failed while early return in case of event type parsing")?;
.attach_printable("Could not find resource object in incoming webhook body"),
},
None => connector
.get_webhook_resource_object(request_details)
.switch()
.attach_printable("Could not find resource object in incoming webhook body"),
}
}
return Ok((
response,
WebhookResponseTracker::NoEffect,
serde_json::Value::Null,
));
/// Process the main webhook business logic after event type determination
#[allow(clippy::too_many_arguments)]
async fn process_webhook_business_logic(
state: &SessionState,
req_state: ReqState,
merchant_context: &domain::MerchantContext,
connector: &ConnectorEnum,
connector_name: &str,
event_type: webhooks::IncomingWebhookEvent,
source_verified_via_ucs: bool,
webhook_transform_data: &Option<Box<unified_connector_service::WebhookTransformData>>,
request_details: &IncomingWebhookRequestDetails<'_>,
is_relay_webhook: bool,
) -> errors::RouterResult<WebhookResponseTracker> {
let object_ref_id = connector
.get_webhook_object_reference_id(request_details)
.switch()
.attach_printable("Could not find object reference id in incoming webhook body")?;
let connector_enum = api_models::enums::Connector::from_str(connector_name)
.change_context(errors::ApiErrorResponse::InvalidDataValue {
field_name: "connector",
})
.attach_printable_lazy(|| format!("unable to parse connector name {connector_name:?}"))?;
let connectors_with_source_verification_call = &state.conf.webhook_source_verification_call;
let merchant_connector_account = match Box::pin(helper_utils::get_mca_from_object_reference_id(
state,
object_ref_id.clone(),
merchant_context,
connector_name,
))
.await
{
Ok(mca) => mca,
Err(error) => {
let result =
handle_incoming_webhook_error(error, connector, connector_name, request_details);
match result {
Ok((_, webhook_tracker, _)) => return Ok(webhook_tracker),
Err(e) => return Err(e),
}
}
};
logger::info!(event_type=?event_type);
let is_webhook_event_supported = !matches!(
event_type,
webhooks::IncomingWebhookEvent::EventNotSupported
);
let is_webhook_event_enabled = !utils::is_webhook_event_disabled(
&*state.clone().store,
connector_name.as_str(),
merchant_context.get_merchant_account().get_id(),
&event_type,
)
.await;
//process webhook further only if webhook event is enabled and is not event_not_supported
let process_webhook_further = is_webhook_event_enabled && is_webhook_event_supported;
logger::info!(process_webhook=?process_webhook_further);
let flow_type: api::WebhookFlow = event_type.into();
let mut event_object: Box<dyn masking::ErasedMaskSerialize> = Box::new(serde_json::Value::Null);
let webhook_effect = if process_webhook_further
&& !matches!(flow_type, api::WebhookFlow::ReturnResponse)
{
let object_ref_id = connector
.get_webhook_object_reference_id(&request_details)
.switch()
.attach_printable("Could not find object reference id in incoming webhook body")?;
let connector_enum = api_models::enums::Connector::from_str(&connector_name)
.change_context(errors::ApiErrorResponse::InvalidDataValue {
field_name: "connector",
})
.attach_printable_lazy(|| {
format!("unable to parse connector name {connector_name:?}")
})?;
let connectors_with_source_verification_call = &state.conf.webhook_source_verification_call;
let merchant_connector_account = match merchant_connector_account {
Some(merchant_connector_account) => merchant_connector_account,
None => {
match Box::pin(helper_utils::get_mca_from_object_reference_id(
&state,
object_ref_id.clone(),
&merchant_context,
&connector_name,
))
.await
{
Ok(mca) => mca,
Err(error) => {
return handle_incoming_webhook_error(
error,
&connector,
connector_name.as_str(),
&request_details,
);
}
}
}
};
let source_verified = if connectors_with_source_verification_call
let source_verified = if source_verified_via_ucs {
// If UCS handled verification, use that result
source_verified_via_ucs
} else {
// Fall back to traditional source verification
if connectors_with_source_verification_call
.connectors_with_webhook_source_verification_call
.contains(&connector_enum)
{
verify_webhook_source_verification_call(
connector.clone(),
&state,
&merchant_context,
state,
merchant_context,
merchant_connector_account.clone(),
&connector_name,
&request_details,
connector_name,
request_details,
)
.await
.or_else(|error| match error.current_context() {
@ -375,11 +561,11 @@ async fn incoming_webhooks_core<W: types::OutgoingWebhookType>(
connector
.clone()
.verify_webhook_source(
&request_details,
request_details,
merchant_context.get_merchant_account().get_id(),
merchant_connector_account.connector_webhook_details.clone(),
merchant_connector_account.connector_account_details.clone(),
connector_name.as_str(),
connector_name,
)
.await
.or_else(|error| match error.current_context() {
@ -391,235 +577,233 @@ async fn incoming_webhooks_core<W: types::OutgoingWebhookType>(
})
.switch()
.attach_printable("There was an issue in incoming webhook source verification")?
};
if source_verified {
metrics::WEBHOOK_SOURCE_VERIFIED_COUNT.add(
1,
router_env::metric_attributes!((
MERCHANT_ID,
merchant_context.get_merchant_account().get_id().clone()
)),
);
} else if connector.is_webhook_source_verification_mandatory() {
// if webhook consumption is mandatory for connector, fail webhook
// so that merchant can retrigger it after updating merchant_secret
return Err(errors::ApiErrorResponse::WebhookAuthenticationFailed.into());
}
};
logger::info!(source_verified=?source_verified);
event_object = connector
.get_webhook_resource_object(&request_details)
.switch()
.attach_printable("Could not find resource object in incoming webhook body")?;
let webhook_details = api::IncomingWebhookDetails {
object_reference_id: object_ref_id.clone(),
resource_object: serde_json::to_vec(&event_object)
.change_context(errors::ParsingError::EncodeError("byte-vec"))
.attach_printable("Unable to convert webhook payload to a value")
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable(
"There was an issue when encoding the incoming webhook body to bytes",
)?,
};
let profile_id = &merchant_connector_account.profile_id;
let business_profile = state
.store
.find_business_profile_by_profile_id(
key_manager_state,
merchant_context.get_merchant_key_store(),
profile_id,
)
.await
.to_not_found_response(errors::ApiErrorResponse::ProfileNotFound {
id: profile_id.get_string_repr().to_owned(),
})?;
// If the incoming webhook is a relay webhook, then we need to trigger the relay webhook flow
let result_response = if is_relay_webhook {
let relay_webhook_response = Box::pin(relay_incoming_webhook_flow(
state.clone(),
merchant_context,
business_profile,
webhook_details,
event_type,
source_verified,
))
.await
.attach_printable("Incoming webhook flow for relay failed");
// Using early return ensures unsupported webhooks are acknowledged to the connector
if let Some(errors::ApiErrorResponse::NotSupported { .. }) = relay_webhook_response
.as_ref()
.err()
.map(|a| a.current_context())
{
logger::error!(
webhook_payload =? request_details.body,
"Failed while identifying the event type",
);
let response = connector
.get_webhook_api_response(&request_details, None)
.switch()
.attach_printable(
"Failed while early return in case of not supported event type in relay webhooks",
)?;
return Ok((
response,
WebhookResponseTracker::NoEffect,
serde_json::Value::Null,
));
};
relay_webhook_response
} else {
match flow_type {
api::WebhookFlow::Payment => Box::pin(payments_incoming_webhook_flow(
state.clone(),
req_state,
merchant_context,
business_profile,
webhook_details,
source_verified,
&connector,
&request_details,
event_type,
))
.await
.attach_printable("Incoming webhook flow for payments failed"),
api::WebhookFlow::Refund => Box::pin(refunds_incoming_webhook_flow(
state.clone(),
merchant_context,
business_profile,
webhook_details,
connector_name.as_str(),
source_verified,
event_type,
))
.await
.attach_printable("Incoming webhook flow for refunds failed"),
api::WebhookFlow::Dispute => Box::pin(disputes_incoming_webhook_flow(
state.clone(),
merchant_context,
business_profile,
webhook_details,
source_verified,
&connector,
&request_details,
event_type,
))
.await
.attach_printable("Incoming webhook flow for disputes failed"),
api::WebhookFlow::BankTransfer => Box::pin(bank_transfer_webhook_flow(
state.clone(),
req_state,
merchant_context,
business_profile,
webhook_details,
source_verified,
))
.await
.attach_printable("Incoming bank-transfer webhook flow failed"),
api::WebhookFlow::ReturnResponse => Ok(WebhookResponseTracker::NoEffect),
api::WebhookFlow::Mandate => Box::pin(mandates_incoming_webhook_flow(
state.clone(),
merchant_context,
business_profile,
webhook_details,
source_verified,
event_type,
))
.await
.attach_printable("Incoming webhook flow for mandates failed"),
api::WebhookFlow::ExternalAuthentication => {
Box::pin(external_authentication_incoming_webhook_flow(
state.clone(),
req_state,
merchant_context,
source_verified,
event_type,
&request_details,
&connector,
object_ref_id,
business_profile,
merchant_connector_account,
))
.await
.attach_printable("Incoming webhook flow for external authentication failed")
}
api::WebhookFlow::FraudCheck => Box::pin(frm_incoming_webhook_flow(
state.clone(),
req_state,
merchant_context,
source_verified,
event_type,
object_ref_id,
business_profile,
))
.await
.attach_printable("Incoming webhook flow for fraud check failed"),
#[cfg(feature = "payouts")]
api::WebhookFlow::Payout => Box::pin(payouts_incoming_webhook_flow(
state.clone(),
merchant_context,
business_profile,
webhook_details,
event_type,
source_verified,
))
.await
.attach_printable("Incoming webhook flow for payouts failed"),
_ => Err(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Unsupported Flow Type received in incoming webhooks"),
}
};
match result_response {
Ok(response) => response,
Err(error) => {
return handle_incoming_webhook_error(
error,
&connector,
connector_name.as_str(),
&request_details,
);
}
}
} else {
metrics::WEBHOOK_INCOMING_FILTERED_COUNT.add(
if source_verified {
metrics::WEBHOOK_SOURCE_VERIFIED_COUNT.add(
1,
router_env::metric_attributes!((
MERCHANT_ID,
merchant_context.get_merchant_account().get_id().clone()
)),
);
WebhookResponseTracker::NoEffect
} else if connector.is_webhook_source_verification_mandatory() {
// if webhook consumption is mandatory for connector, fail webhook
// so that merchant can retrigger it after updating merchant_secret
return Err(errors::ApiErrorResponse::WebhookAuthenticationFailed.into());
}
logger::info!(source_verified=?source_verified);
let event_object: Box<dyn masking::ErasedMaskSerialize> =
if let Some(transform_data) = webhook_transform_data {
// Use UCS transform data if available
if let Some(webhook_content) = &transform_data.webhook_content {
// Convert UCS webhook content to appropriate format
Box::new(
serde_json::to_value(webhook_content)
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to serialize UCS webhook content")?,
)
} else {
// Fall back to connector extraction
connector
.get_webhook_resource_object(request_details)
.switch()
.attach_printable("Could not find resource object in incoming webhook body")?
}
} else {
// Use traditional connector extraction
connector
.get_webhook_resource_object(request_details)
.switch()
.attach_printable("Could not find resource object in incoming webhook body")?
};
let webhook_details = api::IncomingWebhookDetails {
object_reference_id: object_ref_id.clone(),
resource_object: serde_json::to_vec(&event_object)
.change_context(errors::ParsingError::EncodeError("byte-vec"))
.attach_printable("Unable to convert webhook payload to a value")
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable(
"There was an issue when encoding the incoming webhook body to bytes",
)?,
};
let response = connector
.get_webhook_api_response(&request_details, None)
.switch()
.attach_printable("Could not get incoming webhook api response from connector")?;
let profile_id = &merchant_connector_account.profile_id;
let key_manager_state = &(state).into();
let serialized_request = event_object
.masked_serialize()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Could not convert webhook effect to string")?;
Ok((response, webhook_effect, serialized_request))
let business_profile = state
.store
.find_business_profile_by_profile_id(
key_manager_state,
merchant_context.get_merchant_key_store(),
profile_id,
)
.await
.to_not_found_response(errors::ApiErrorResponse::ProfileNotFound {
id: profile_id.get_string_repr().to_owned(),
})?;
// If the incoming webhook is a relay webhook, then we need to trigger the relay webhook flow
let result_response = if is_relay_webhook {
let relay_webhook_response = Box::pin(relay_incoming_webhook_flow(
state.clone(),
merchant_context.clone(),
business_profile,
webhook_details,
event_type,
source_verified,
))
.await
.attach_printable("Incoming webhook flow for relay failed");
// Using early return ensures unsupported webhooks are acknowledged to the connector
if let Some(errors::ApiErrorResponse::NotSupported { .. }) = relay_webhook_response
.as_ref()
.err()
.map(|a| a.current_context())
{
logger::error!(
webhook_payload =? request_details.body,
"Failed while identifying the event type",
);
let _response = connector
.get_webhook_api_response(request_details, None)
.switch()
.attach_printable(
"Failed while early return in case of not supported event type in relay webhooks",
)?;
return Ok(WebhookResponseTracker::NoEffect);
};
relay_webhook_response
} else {
let flow_type: api::WebhookFlow = event_type.into();
match flow_type {
api::WebhookFlow::Payment => Box::pin(payments_incoming_webhook_flow(
state.clone(),
req_state,
merchant_context.clone(),
business_profile,
webhook_details,
source_verified,
connector,
request_details,
event_type,
))
.await
.attach_printable("Incoming webhook flow for payments failed"),
api::WebhookFlow::Refund => Box::pin(refunds_incoming_webhook_flow(
state.clone(),
merchant_context.clone(),
business_profile,
webhook_details,
connector_name,
source_verified,
event_type,
))
.await
.attach_printable("Incoming webhook flow for refunds failed"),
api::WebhookFlow::Dispute => Box::pin(disputes_incoming_webhook_flow(
state.clone(),
merchant_context.clone(),
business_profile,
webhook_details,
source_verified,
connector,
request_details,
event_type,
))
.await
.attach_printable("Incoming webhook flow for disputes failed"),
api::WebhookFlow::BankTransfer => Box::pin(bank_transfer_webhook_flow(
state.clone(),
req_state,
merchant_context.clone(),
business_profile,
webhook_details,
source_verified,
))
.await
.attach_printable("Incoming bank-transfer webhook flow failed"),
api::WebhookFlow::ReturnResponse => Ok(WebhookResponseTracker::NoEffect),
api::WebhookFlow::Mandate => Box::pin(mandates_incoming_webhook_flow(
state.clone(),
merchant_context.clone(),
business_profile,
webhook_details,
source_verified,
event_type,
))
.await
.attach_printable("Incoming webhook flow for mandates failed"),
api::WebhookFlow::ExternalAuthentication => {
Box::pin(external_authentication_incoming_webhook_flow(
state.clone(),
req_state,
merchant_context.clone(),
source_verified,
event_type,
request_details,
connector,
object_ref_id,
business_profile,
merchant_connector_account,
))
.await
.attach_printable("Incoming webhook flow for external authentication failed")
}
api::WebhookFlow::FraudCheck => Box::pin(frm_incoming_webhook_flow(
state.clone(),
req_state,
merchant_context.clone(),
source_verified,
event_type,
object_ref_id,
business_profile,
))
.await
.attach_printable("Incoming webhook flow for fraud check failed"),
#[cfg(feature = "payouts")]
api::WebhookFlow::Payout => Box::pin(payouts_incoming_webhook_flow(
state.clone(),
merchant_context.clone(),
business_profile,
webhook_details,
event_type,
source_verified,
))
.await
.attach_printable("Incoming webhook flow for payouts failed"),
_ => Err(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Unsupported Flow Type received in incoming webhooks"),
}
};
match result_response {
Ok(response) => Ok(response),
Err(error) => {
let result =
handle_incoming_webhook_error(error, connector, connector_name, request_details);
match result {
Ok((_, webhook_tracker, _)) => Ok(webhook_tracker),
Err(e) => Err(e),
}
}
}
}
fn handle_incoming_webhook_error(
@ -1346,7 +1530,7 @@ pub async fn get_or_update_dispute_object(
Some(dispute) => {
logger::info!("Dispute Already exists, Updating the dispute details");
metrics::INCOMING_DISPUTE_WEBHOOK_UPDATE_RECORD_METRIC.add(1, &[]);
crate::core::utils::validate_dispute_stage_and_dispute_status(
core_utils::validate_dispute_stage_and_dispute_status(
dispute.dispute_stage,
dispute.dispute_status,
dispute_details.dispute_stage,
@ -1354,7 +1538,6 @@ pub async fn get_or_update_dispute_object(
)
.change_context(errors::ApiErrorResponse::WebhookProcessingFailure)
.attach_printable("dispute stage and status validation failed")?;
let update_dispute = diesel_models::dispute::DisputeUpdate::Update {
dispute_stage: dispute_details.dispute_stage,
dispute_status,