feat(router): added incoming dispute webhooks flow (#769)

Co-authored-by: Sangamesh <sangamesh.kulkarni@juspay.in>
Co-authored-by: sai harsha <sai.harsha@sai.harsha-MacBookPro>
Co-authored-by: Arun Raj M <jarnura47@gmail.com>
This commit is contained in:
saiharsha-juspay
2023-03-30 04:25:54 +05:30
committed by GitHub
parent fb66a0e0f2
commit a733eafbbe
28 changed files with 1139 additions and 32 deletions

View File

@ -7,6 +7,7 @@ use error_stack::{IntoReport, ResultExt};
use masking::ExposeInterface;
use router_env::{instrument, tracing};
use super::metrics;
use crate::{
consts,
core::{
@ -197,6 +198,173 @@ async fn refunds_incoming_webhook_flow<W: api::OutgoingWebhookType>(
Ok(())
}
async fn get_payment_attempt_from_object_reference_id(
state: AppState,
object_reference_id: api_models::webhooks::ObjectReferenceId,
merchant_account: &storage::MerchantAccount,
) -> CustomResult<storage_models::payment_attempt::PaymentAttempt, errors::WebhooksFlowError> {
let db = &*state.store;
match object_reference_id {
api::ObjectReferenceId::PaymentId(api::PaymentIdType::ConnectorTransactionId(ref id)) => db
.find_payment_attempt_by_merchant_id_connector_txn_id(
&merchant_account.merchant_id,
id,
merchant_account.storage_scheme,
)
.await
.change_context(errors::WebhooksFlowError::ResourceNotFound),
api::ObjectReferenceId::PaymentId(api::PaymentIdType::PaymentAttemptId(ref id)) => db
.find_payment_attempt_by_merchant_id_attempt_id(
&merchant_account.merchant_id,
id,
merchant_account.storage_scheme,
)
.await
.change_context(errors::WebhooksFlowError::ResourceNotFound),
_ => Err(errors::WebhooksFlowError::ResourceNotFound).into_report(),
}
}
async fn get_or_update_dispute_object(
state: AppState,
option_dispute: Option<storage_models::dispute::Dispute>,
dispute_details: api::disputes::DisputePayload,
merchant_id: &str,
payment_id: &str,
attempt_id: &str,
event_type: api_models::webhooks::IncomingWebhookEvent,
) -> CustomResult<storage_models::dispute::Dispute, errors::WebhooksFlowError> {
let db = &*state.store;
match option_dispute {
None => {
metrics::INCOMING_DISPUTE_WEBHOOK_NEW_RECORD_METRIC.add(&metrics::CONTEXT, 1, &[]);
let dispute_id = generate_id(consts::ID_LENGTH, "dp");
let new_dispute = storage_models::dispute::DisputeNew {
dispute_id,
amount: dispute_details.amount,
currency: dispute_details.currency,
dispute_stage: dispute_details.dispute_stage.foreign_into(),
dispute_status: event_type
.foreign_try_into()
.into_report()
.change_context(errors::WebhooksFlowError::DisputeCoreFailed)?,
payment_id: payment_id.to_owned(),
attempt_id: attempt_id.to_owned(),
merchant_id: merchant_id.to_owned(),
connector_status: dispute_details.connector_status,
connector_dispute_id: dispute_details.connector_dispute_id,
connector_reason: dispute_details.connector_reason,
connector_reason_code: dispute_details.connector_reason_code,
challenge_required_by: dispute_details.challenge_required_by,
dispute_created_at: dispute_details.created_at,
updated_at: dispute_details.updated_at,
};
state
.store
.insert_dispute(new_dispute.clone())
.await
.change_context(errors::WebhooksFlowError::WebhookEventCreationFailed)
}
Some(dispute) => {
logger::info!("Dispute Already exists, Updating the dispute details");
metrics::INCOMING_DISPUTE_WEBHOOK_UPDATE_RECORD_METRIC.add(&metrics::CONTEXT, 1, &[]);
let dispute_status: storage_models::enums::DisputeStatus = event_type
.foreign_try_into()
.into_report()
.change_context(errors::WebhooksFlowError::DisputeCoreFailed)?;
crate::core::utils::validate_dispute_stage_and_dispute_status(
dispute.dispute_stage.foreign_into(),
dispute.dispute_status.foreign_into(),
dispute_details.dispute_stage.clone(),
dispute_status.foreign_into(),
)?;
let update_dispute = storage_models::dispute::DisputeUpdate::Update {
dispute_stage: dispute_details.dispute_stage.foreign_into(),
dispute_status,
connector_status: dispute_details.connector_status,
connector_reason: dispute_details.connector_reason,
connector_reason_code: dispute_details.connector_reason_code,
challenge_required_by: dispute_details.challenge_required_by,
updated_at: dispute_details.updated_at,
};
db.update_dispute(dispute, update_dispute)
.await
.change_context(errors::WebhooksFlowError::ResourceNotFound)
}
}
}
#[instrument(skip_all)]
async fn disputes_incoming_webhook_flow<W: api::OutgoingWebhookType>(
state: AppState,
merchant_account: storage::MerchantAccount,
webhook_details: api::IncomingWebhookDetails,
source_verified: bool,
connector: &(dyn api::Connector + Sync),
request_details: &api_models::webhooks::IncomingWebhookRequestDetails<'_>,
event_type: api_models::webhooks::IncomingWebhookEvent,
) -> CustomResult<(), errors::WebhooksFlowError> {
metrics::INCOMING_DISPUTE_WEBHOOK_METRIC.add(&metrics::CONTEXT, 1, &[]);
if source_verified {
let db = &*state.store;
let dispute_details = connector
.get_dispute_details(request_details)
.change_context(errors::WebhooksFlowError::WebhookEventObjectCreationFailed)?;
let payment_attempt = get_payment_attempt_from_object_reference_id(
state.clone(),
webhook_details.object_reference_id,
&merchant_account,
)
.await?;
let option_dispute = db
.find_by_merchant_id_payment_id_connector_dispute_id(
&merchant_account.merchant_id,
&payment_attempt.payment_id,
&dispute_details.connector_dispute_id,
)
.await
.change_context(errors::WebhooksFlowError::ResourceNotFound)?;
let dispute_object = get_or_update_dispute_object(
state.clone(),
option_dispute,
dispute_details,
&merchant_account.merchant_id,
&payment_attempt.payment_id,
&payment_attempt.attempt_id,
event_type.clone(),
)
.await?;
let disputes_response = Box::new(
dispute_object
.clone()
.foreign_try_into()
.into_report()
.change_context(errors::WebhooksFlowError::DisputeCoreFailed)?,
);
let event_type: enums::EventType = dispute_object
.dispute_status
.foreign_try_into()
.into_report()
.change_context(errors::WebhooksFlowError::DisputeCoreFailed)?;
create_event_and_trigger_outgoing_webhook::<W>(
state,
merchant_account,
event_type,
enums::EventClass::Disputes,
None,
dispute_object.dispute_id,
enums::EventObjectType::DisputeDetails,
api::OutgoingWebhookContent::DisputeDetails(disputes_response),
)
.await?;
metrics::INCOMING_DISPUTE_WEBHOOK_MERCHANT_NOTIFIED_METRIC.add(&metrics::CONTEXT, 1, &[]);
Ok(())
} else {
metrics::INCOMING_DISPUTE_WEBHOOK_SIGNATURE_FAILURE_METRIC.add(&metrics::CONTEXT, 1, &[]);
Err(errors::WebhooksFlowError::WebhookSourceVerificationFailed).into_report()
}
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn create_event_and_trigger_outgoing_webhook<W: api::OutgoingWebhookType>(
@ -329,7 +497,7 @@ pub async fn webhooks_core<W: api::OutgoingWebhookType>(
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("There was an error in parsing the query params")?;
let mut request_details = api::IncomingWebhookRequestDetails {
let mut request_details = api_models::webhooks::IncomingWebhookRequestDetails {
method: req.method().clone(),
headers: req.headers(),
query_params: req.query_string().to_string(),
@ -420,6 +588,19 @@ pub async fn webhooks_core<W: api::OutgoingWebhookType>(
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Incoming webhook flow for refunds failed")?,
api::WebhookFlow::Dispute => disputes_incoming_webhook_flow::<W>(
state.clone(),
merchant_account,
webhook_details,
source_verified,
*connector,
&request_details,
event_type,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Incoming webhook flow for disputes failed")?,
api::WebhookFlow::ReturnResponse => {}
_ => Err(errors::ApiErrorResponse::InternalServerError)