fix(webhooks): add support for updating mandate details in webhooks flow (#6523)

Co-authored-by: Chikke Srujan <chikke.srujan@Chikke-Srujan-N7WRTY72X7.local>
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
chikke srujan
2024-11-13 15:12:58 +05:30
committed by GitHub
parent 574170a357
commit 6eb72e923e
9 changed files with 400 additions and 36 deletions

View File

@ -4,15 +4,16 @@ use actix_web::FromRequest;
#[cfg(feature = "payouts")]
use api_models::payouts as payout_models;
use api_models::webhooks::{self, WebhookResponseTracker};
use common_utils::{errors::ReportSwitchExt, events::ApiEventsType};
use common_utils::{errors::ReportSwitchExt, events::ApiEventsType, ext_traits::ValueExt};
use diesel_models::ConnectorMandateReferenceId;
use error_stack::{report, ResultExt};
use hyperswitch_domain_models::{
payments::HeaderPayload,
payments::{payment_attempt::PaymentAttempt, HeaderPayload},
router_request_types::VerifyWebhookSourceRequestData,
router_response_types::{VerifyWebhookSourceResponseData, VerifyWebhookStatus},
};
use hyperswitch_interfaces::webhooks::IncomingWebhookRequestDetails;
use masking::ExposeInterface;
use masking::{ExposeInterface, PeekInterface};
use router_env::{instrument, metrics::add_attributes, tracing, tracing_actix_web::RequestId};
use super::{types, utils, MERCHANT_ID};
@ -21,7 +22,9 @@ use crate::{
core::{
api_locking,
errors::{self, ConnectorErrorExt, CustomResult, RouterResponse, StorageErrorExt},
metrics, payments, refunds, utils as core_utils,
metrics, payments,
payments::tokenization,
refunds, utils as core_utils,
webhooks::utils::construct_webhook_router_data,
},
db::StorageInterface,
@ -44,7 +47,7 @@ use crate::{
storage::{self, enums},
transformers::{ForeignFrom, ForeignInto, ForeignTryFrom},
},
utils::{self as helper_utils, generate_id},
utils::{self as helper_utils, ext_traits::OptionExt, generate_id},
};
#[cfg(feature = "payouts")]
use crate::{core::payouts, types::storage::PayoutAttemptUpdate};
@ -364,6 +367,9 @@ async fn incoming_webhooks_core<W: types::OutgoingWebhookType>(
key_store,
webhook_details,
source_verified,
&connector,
&request_details,
event_type,
))
.await
.attach_printable("Incoming webhook flow for payments failed")?,
@ -491,6 +497,7 @@ async fn incoming_webhooks_core<W: types::OutgoingWebhookType>(
Ok((response, webhook_effect, serialized_request))
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn payments_incoming_webhook_flow(
state: SessionState,
@ -500,6 +507,9 @@ async fn payments_incoming_webhook_flow(
key_store: domain::MerchantKeyStore,
webhook_details: api::IncomingWebhookDetails,
source_verified: bool,
connector: &ConnectorEnum,
request_details: &IncomingWebhookRequestDetails<'_>,
event_type: webhooks::IncomingWebhookEvent,
) -> CustomResult<WebhookResponseTracker, errors::ApiErrorResponse> {
let consume_or_trigger_flow = if source_verified {
payments::CallConnectorAction::HandleResponse(webhook_details.resource_object)
@ -507,10 +517,10 @@ async fn payments_incoming_webhook_flow(
payments::CallConnectorAction::Trigger
};
let payments_response = match webhook_details.object_reference_id {
webhooks::ObjectReferenceId::PaymentId(id) => {
webhooks::ObjectReferenceId::PaymentId(ref id) => {
let payment_id = get_payment_id(
state.store.as_ref(),
&id,
id,
merchant_account.get_id(),
merchant_account.storage_scheme,
)
@ -544,7 +554,7 @@ async fn payments_incoming_webhook_flow(
key_store.clone(),
payments::operations::PaymentStatus,
api::PaymentsRetrieveRequest {
resource_id: id,
resource_id: id.clone(),
merchant_id: Some(merchant_account.get_id().clone()),
force_sync: true,
connector: None,
@ -555,12 +565,23 @@ async fn payments_incoming_webhook_flow(
expand_captures: None,
},
services::AuthFlow::Merchant,
consume_or_trigger_flow,
consume_or_trigger_flow.clone(),
None,
HeaderPayload::default(),
))
.await;
// When mandate details are present in successful webhooks, and consuming webhooks are skipped during payment sync if the payment status is already updated to charged, this function is used to update the connector mandate details.
if should_update_connector_mandate_details(source_verified, event_type) {
update_connector_mandate_details(
&state,
&merchant_account,
&key_store,
webhook_details.object_reference_id.clone(),
connector,
request_details,
)
.await?
};
lock_action
.free_lock_action(&state, merchant_account.get_id().to_owned())
.await?;
@ -869,10 +890,7 @@ async fn get_payment_attempt_from_object_reference_id(
state: &SessionState,
object_reference_id: webhooks::ObjectReferenceId,
merchant_account: &domain::MerchantAccount,
) -> CustomResult<
hyperswitch_domain_models::payments::payment_attempt::PaymentAttempt,
errors::ApiErrorResponse,
> {
) -> CustomResult<PaymentAttempt, errors::ApiErrorResponse> {
let db = &*state.store;
match object_reference_id {
api::ObjectReferenceId::PaymentId(api::PaymentIdType::ConnectorTransactionId(ref id)) => db
@ -911,7 +929,7 @@ async fn get_or_update_dispute_object(
dispute_details: api::disputes::DisputePayload,
merchant_id: &common_utils::id_type::MerchantId,
organization_id: &common_utils::id_type::OrganizationId,
payment_attempt: &hyperswitch_domain_models::payments::payment_attempt::PaymentAttempt,
payment_attempt: &PaymentAttempt,
event_type: webhooks::IncomingWebhookEvent,
business_profile: &domain::Profile,
connector_name: &str,
@ -1716,3 +1734,173 @@ async fn fetch_optional_mca_and_connector(
Ok((None, connector, connector_name))
}
}
fn should_update_connector_mandate_details(
source_verified: bool,
event_type: webhooks::IncomingWebhookEvent,
) -> bool {
source_verified && event_type == webhooks::IncomingWebhookEvent::PaymentIntentSuccess
}
async fn update_connector_mandate_details(
state: &SessionState,
merchant_account: &domain::MerchantAccount,
key_store: &domain::MerchantKeyStore,
object_ref_id: api::ObjectReferenceId,
connector: &ConnectorEnum,
request_details: &IncomingWebhookRequestDetails<'_>,
) -> CustomResult<(), errors::ApiErrorResponse> {
let webhook_connector_mandate_details = connector
.get_mandate_details(request_details)
.switch()
.attach_printable("Could not find connector mandate details in incoming webhook body")?;
if let Some(webhook_mandate_details) = webhook_connector_mandate_details {
let payment_attempt =
get_payment_attempt_from_object_reference_id(state, object_ref_id, merchant_account)
.await?;
if let Some(ref payment_method_id) = payment_attempt.payment_method_id {
let key_manager_state = &state.into();
let payment_method_info = state
.store
.find_payment_method(
key_manager_state,
key_store,
payment_method_id,
merchant_account.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::PaymentMethodNotFound)?;
let mandate_details = payment_method_info
.connector_mandate_details
.clone()
.map(|val| {
val.parse_value::<diesel_models::PaymentsMandateReference>(
"PaymentsMandateReference",
)
})
.transpose()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to deserialize to Payment Mandate Reference")?;
let merchant_connector_account_id = payment_attempt
.merchant_connector_id
.clone()
.get_required_value("merchant_connector_id")?;
if mandate_details
.as_ref()
.map(|details: &diesel_models::PaymentsMandateReference| {
!details.0.contains_key(&merchant_connector_account_id)
})
.unwrap_or(true)
{
let updated_connector_mandate_details = insert_mandate_details(
&payment_attempt,
&webhook_mandate_details,
mandate_details,
)?;
let pm_update = diesel_models::PaymentMethodUpdate::ConnectorMandateDetailsUpdate {
connector_mandate_details: updated_connector_mandate_details,
};
state
.store
.update_payment_method(
key_manager_state,
key_store,
payment_method_info,
pm_update,
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to update payment method in db")?;
// Update the payment attempt to maintain consistency across tables.
let (mandate_metadata, connector_mandate_request_reference_id) = payment_attempt
.connector_mandate_detail
.as_ref()
.map(|details| {
(
details.mandate_metadata.clone(),
details.connector_mandate_request_reference_id.clone(),
)
})
.unwrap_or((None, None));
let connector_mandate_reference_id = ConnectorMandateReferenceId {
connector_mandate_id: Some(
webhook_mandate_details
.connector_mandate_id
.peek()
.to_string(),
),
payment_method_id: Some(payment_method_id.to_string()),
mandate_metadata,
connector_mandate_request_reference_id,
};
let attempt_update = storage::PaymentAttemptUpdate::ConnectorMandateDetailUpdate {
connector_mandate_detail: Some(connector_mandate_reference_id),
updated_by: merchant_account.storage_scheme.to_string(),
};
state
.store
.update_payment_attempt_with_attempt_id(
payment_attempt.clone(),
attempt_update,
merchant_account.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)?;
} else {
logger::info!(
"Skipping connector mandate details update since they are already present."
);
}
}
}
Ok(())
}
fn insert_mandate_details(
payment_attempt: &PaymentAttempt,
webhook_mandate_details: &hyperswitch_domain_models::router_flow_types::ConnectorMandateDetails,
payment_method_mandate_details: Option<diesel_models::PaymentsMandateReference>,
) -> CustomResult<Option<serde_json::Value>, errors::ApiErrorResponse> {
let (mandate_metadata, connector_mandate_request_reference_id) = payment_attempt
.connector_mandate_detail
.clone()
.map(|mandate_reference| {
(
mandate_reference.mandate_metadata,
mandate_reference.connector_mandate_request_reference_id,
)
})
.unwrap_or((None, None));
let connector_mandate_details = tokenization::update_connector_mandate_details(
payment_method_mandate_details,
payment_attempt.payment_method_type,
Some(
payment_attempt
.net_amount
.get_total_amount()
.get_amount_as_i64(),
),
payment_attempt.currency,
payment_attempt.merchant_connector_id.clone(),
Some(
webhook_mandate_details
.connector_mandate_id
.peek()
.to_string(),
),
mandate_metadata,
connector_mandate_request_reference_id,
)?;
Ok(connector_mandate_details)
}