feat(core): Create Payout Webhook Flow (#4696)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Sakil Mostak
2024-06-05 14:58:42 +05:30
committed by GitHub
parent 32f0fae27d
commit a3183a0c5b
43 changed files with 809 additions and 68 deletions

View File

@ -33,6 +33,12 @@ counter_metric!(
GLOBAL_METER
);
counter_metric!(INCOMING_PAYOUT_WEBHOOK_METRIC, GLOBAL_METER); // No. of incoming payout webhooks
counter_metric!(
INCOMING_PAYOUT_WEBHOOK_SIGNATURE_FAILURE_METRIC,
GLOBAL_METER
); // No. of incoming payout webhooks for which signature verification failed
counter_metric!(WEBHOOK_INCOMING_COUNT, GLOBAL_METER);
counter_metric!(WEBHOOK_INCOMING_FILTERED_COUNT, GLOBAL_METER);
counter_metric!(WEBHOOK_SOURCE_VERIFIED_COUNT, GLOBAL_METER);

View File

@ -985,7 +985,7 @@ impl ForeignFrom<(storage::Payouts, storage::PayoutAttempt, domain::Customer)>
error_message: payout_attempt.error_message.clone(),
payment_method: Some(payout.payout_type),
payout_method_type: None,
connector_transaction_id: Some(payout_attempt.connector_payout_id),
connector_transaction_id: payout_attempt.connector_payout_id,
cancellation_reason: None,
unified_code: None,
unified_message: None,

View File

@ -1048,7 +1048,7 @@ pub async fn create_recipient(
customers::update_connector_customer_in_customers(
&connector_label,
Some(&customer),
&Some(recipient_create_data.connector_payout_id.clone()),
&recipient_create_data.connector_payout_id.clone(),
)
.await
{
@ -1248,7 +1248,7 @@ pub async fn check_payout_eligibility(
Err(err) => {
let status = storage_enums::PayoutStatus::Failed;
let updated_payout_attempt = storage::PayoutAttemptUpdate::StatusUpdate {
connector_payout_id: String::default(),
connector_payout_id: payout_data.payout_attempt.connector_payout_id.clone(),
status,
error_code: Some(err.code),
error_message: Some(err.message),
@ -1298,7 +1298,7 @@ pub async fn complete_create_payout(
let db = &*state.store;
let payout_attempt = &payout_data.payout_attempt;
let updated_payout_attempt = storage::PayoutAttemptUpdate::StatusUpdate {
connector_payout_id: "".to_string(),
connector_payout_id: payout_data.payout_attempt.connector_payout_id.clone(),
status: storage::enums::PayoutStatus::RequiresFulfillment,
error_code: None,
error_message: None,
@ -1440,7 +1440,7 @@ pub async fn create_payout(
Err(err) => {
let status = storage_enums::PayoutStatus::Failed;
let updated_payout_attempt = storage::PayoutAttemptUpdate::StatusUpdate {
connector_payout_id: String::default(),
connector_payout_id: payout_data.payout_attempt.connector_payout_id.clone(),
status,
error_code: Some(err.code),
error_message: Some(err.message),
@ -1563,7 +1563,7 @@ pub async fn create_recipient_disburse_account(
}
Err(err) => {
let updated_payout_attempt = storage::PayoutAttemptUpdate::StatusUpdate {
connector_payout_id: String::default(),
connector_payout_id: payout_data.payout_attempt.connector_payout_id.clone(),
status: storage_enums::PayoutStatus::Failed,
error_code: Some(err.code),
error_message: Some(err.message),
@ -1659,7 +1659,7 @@ pub async fn cancel_payout(
Err(err) => {
let status = storage_enums::PayoutStatus::Failed;
let updated_payout_attempt = storage::PayoutAttemptUpdate::StatusUpdate {
connector_payout_id: String::default(),
connector_payout_id: payout_data.payout_attempt.connector_payout_id.clone(),
status,
error_code: Some(err.code),
error_message: Some(err.message),
@ -1799,7 +1799,7 @@ pub async fn fulfill_payout(
Err(err) => {
let status = storage_enums::PayoutStatus::Failed;
let updated_payout_attempt = storage::PayoutAttemptUpdate::StatusUpdate {
connector_payout_id: String::default(),
connector_payout_id: payout_data.payout_attempt.connector_payout_id.clone(),
status,
error_code: Some(err.code),
error_message: Some(err.message),

View File

@ -346,6 +346,9 @@ impl GsmValidation for PayoutData {
common_enums::PayoutStatus::Success
| common_enums::PayoutStatus::Cancelled
| common_enums::PayoutStatus::Pending
| common_enums::PayoutStatus::Initiated
| common_enums::PayoutStatus::Reversed
| common_enums::PayoutStatus::Expired
| common_enums::PayoutStatus::Ineligible
| common_enums::PayoutStatus::RequiresCreation
| common_enums::PayoutStatus::RequiresPayoutMethodData

View File

@ -169,7 +169,7 @@ pub async fn construct_payout_router_data<'a, F>(
request: types::PayoutsData {
payout_id: payouts.payout_id.to_owned(),
amount: payouts.amount,
connector_payout_id: Some(payout_attempt.connector_payout_id.to_owned()),
connector_payout_id: payout_attempt.connector_payout_id.clone(),
destination_currency: payouts.destination_currency,
source_currency: payouts.source_currency,
entity_type: payouts.entity_type.to_owned(),
@ -192,8 +192,7 @@ pub async fn construct_payout_router_data<'a, F>(
payment_method_token: None,
recurring_mandate_payment_data: None,
preprocessing_id: None,
connector_request_reference_id: IRRELEVANT_CONNECTOR_REQUEST_REFERENCE_ID_IN_PAYOUTS_FLOW
.to_string(),
connector_request_reference_id: payout_attempt.payout_attempt_id.clone(),
payout_method_data: payout_data.payout_method_data.to_owned(),
quote_id: None,
test_mode,

View File

@ -6,6 +6,8 @@ pub mod webhook_events;
use std::{str::FromStr, time::Instant};
use actix_web::FromRequest;
#[cfg(feature = "payouts")]
use api_models::payouts as payout_models;
use api_models::{
payments::HeaderPayload,
webhook_events::{OutgoingWebhookRequestContent, OutgoingWebhookResponseContent},
@ -22,9 +24,13 @@ use router_env::{
tracing_actix_web::RequestId,
};
#[cfg(feature = "payouts")]
use super::payouts;
use super::{errors::StorageErrorExt, metrics};
#[cfg(feature = "stripe")]
use crate::compatibility::stripe::webhooks as stripe_webhooks;
#[cfg(not(feature = "payouts"))]
use crate::routes::SessionState;
use crate::{
consts,
core::{
@ -42,7 +48,6 @@ use crate::{
app::{ReqState, SessionStateInfo},
lock_utils,
metrics::request::add_attributes,
SessionState,
},
services::{self, authentication as auth},
types::{
@ -54,6 +59,8 @@ use crate::{
utils::{self as helper_utils, generate_id, OptionExt, ValueExt},
workflows::outgoing_webhook_retry,
};
#[cfg(feature = "payouts")]
use crate::{routes::SessionState, types::storage::PayoutAttemptUpdate};
const OUTGOING_WEBHOOK_TIMEOUT_SECS: u64 = 5;
const MERCHANT_ID: &str = "merchant_id";
@ -201,6 +208,130 @@ pub async fn payments_incoming_webhook_flow(
}
}
#[cfg(feature = "payouts")]
pub async fn payouts_incoming_webhook_flow(
state: SessionState,
merchant_account: domain::MerchantAccount,
business_profile: diesel_models::business_profile::BusinessProfile,
key_store: domain::MerchantKeyStore,
webhook_details: api::IncomingWebhookDetails,
event_type: webhooks::IncomingWebhookEvent,
source_verified: bool,
) -> CustomResult<WebhookResponseTracker, errors::ApiErrorResponse> {
metrics::INCOMING_PAYOUT_WEBHOOK_METRIC.add(&metrics::CONTEXT, 1, &[]);
if source_verified {
let db = &*state.store;
//find payout_attempt by object_reference_id
let payout_attempt = match webhook_details.object_reference_id {
webhooks::ObjectReferenceId::PayoutId(payout_id_type) => match payout_id_type {
webhooks::PayoutIdType::PayoutAttemptId(id) => db
.find_payout_attempt_by_merchant_id_payout_attempt_id(
&merchant_account.merchant_id,
&id,
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::WebhookResourceNotFound)
.attach_printable("Failed to fetch the payout attempt")?,
webhooks::PayoutIdType::ConnectorPayoutId(id) => db
.find_payout_attempt_by_merchant_id_connector_payout_id(
&merchant_account.merchant_id,
&id,
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::WebhookResourceNotFound)
.attach_printable("Failed to fetch the payout attempt")?,
},
_ => Err(errors::ApiErrorResponse::WebhookProcessingFailure)
.attach_printable("received a non-payout id when processing payout webhooks")?,
};
let payouts = db
.find_payout_by_merchant_id_payout_id(
&merchant_account.merchant_id,
&payout_attempt.payout_id,
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::WebhookResourceNotFound)
.attach_printable("Failed to fetch the payout")?;
let payout_attempt_update = PayoutAttemptUpdate::StatusUpdate {
connector_payout_id: payout_attempt.connector_payout_id.clone(),
status: common_enums::PayoutStatus::foreign_try_from(event_type)
.change_context(errors::ApiErrorResponse::WebhookProcessingFailure)
.attach_printable("failed payout status mapping from event type")?,
error_message: None,
error_code: None,
is_eligible: payout_attempt.is_eligible,
};
let action_req =
payout_models::PayoutRequest::PayoutActionRequest(payout_models::PayoutActionRequest {
payout_id: payouts.payout_id.clone(),
});
let payout_data =
payouts::make_payout_data(&state, &merchant_account, &key_store, &action_req).await?;
let updated_payout_attempt = db
.update_payout_attempt(
&payout_attempt,
payout_attempt_update,
&payout_data.payouts,
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::WebhookResourceNotFound)
.attach_printable_lazy(|| {
format!(
"Failed while updating payout attempt: payout_attempt_id: {}",
payout_attempt.payout_attempt_id
)
})?;
let event_type: Option<enums::EventType> = updated_payout_attempt.status.foreign_into();
// If event is NOT an UnsupportedEvent, trigger Outgoing Webhook
if let Some(outgoing_event_type) = event_type {
let router_response =
payouts::response_handler(&merchant_account, &payout_data).await?;
let payout_create_response: payout_models::PayoutCreateResponse = match router_response
{
services::ApplicationResponse::Json(response) => response,
_ => Err(errors::ApiErrorResponse::WebhookResourceNotFound)
.attach_printable("Failed to fetch the payout create response")?,
};
create_event_and_trigger_outgoing_webhook(
state,
merchant_account,
business_profile,
&key_store,
outgoing_event_type,
enums::EventClass::Payouts,
updated_payout_attempt.payout_id.clone(),
enums::EventObjectType::PayoutDetails,
api::OutgoingWebhookContent::PayoutDetails(payout_create_response),
Some(updated_payout_attempt.created_at),
)
.await?;
}
Ok(WebhookResponseTracker::Payout {
payout_id: updated_payout_attempt.payout_id,
status: updated_payout_attempt.status,
})
} else {
metrics::INCOMING_PAYOUT_WEBHOOK_SIGNATURE_FAILURE_METRIC.add(&metrics::CONTEXT, 1, &[]);
Err(report!(
errors::ApiErrorResponse::WebhookAuthenticationFailed
))
}
}
#[instrument(skip_all)]
#[allow(clippy::too_many_arguments)]
pub async fn refunds_incoming_webhook_flow(
@ -1953,6 +2084,19 @@ pub async fn webhooks_core<W: types::OutgoingWebhookType>(
.await
.attach_printable("Incoming webhook flow for fraud check failed")?,
#[cfg(feature = "payouts")]
api::WebhookFlow::Payout => Box::pin(payouts_incoming_webhook_flow(
state.clone(),
merchant_account,
business_profile,
key_store,
webhook_details,
event_type,
source_verified,
))
.await
.attach_printable("Incoming webhook flow for payouts failed")?,
_ => Err(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Unsupported Flow Type received in incoming webhooks")?,
}