feat(router): added incoming refund webhooks flow (#683)

Co-authored-by: sai harsha <sai.harsha@sai.harsha-MacBookPro>
This commit is contained in:
saiharsha-juspay
2023-02-28 19:15:25 +05:30
committed by GitHub
parent 2e99152d35
commit f12abbcef4
12 changed files with 302 additions and 11 deletions

View File

@ -264,6 +264,8 @@ pub enum Currency {
#[strum(serialize_all = "snake_case")]
pub enum EventType {
PaymentSucceeded,
RefundSucceeded,
RefundFailed,
}
#[derive(

View File

@ -2,13 +2,15 @@ use common_utils::custom_serde;
use serde::{Deserialize, Serialize};
use time::PrimitiveDateTime;
use crate::{enums as api_enums, payments};
use crate::{enums as api_enums, payments, refunds};
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum IncomingWebhookEvent {
PaymentIntentFailure,
PaymentIntentSuccess,
RefundFailure,
RefundSuccess,
EndpointVerification,
}
@ -24,6 +26,8 @@ impl From<IncomingWebhookEvent> for WebhookFlow {
match evt {
IncomingWebhookEvent::PaymentIntentFailure => Self::Payment,
IncomingWebhookEvent::PaymentIntentSuccess => Self::Payment,
IncomingWebhookEvent::RefundSuccess => Self::Refund,
IncomingWebhookEvent::RefundFailure => Self::Refund,
IncomingWebhookEvent::EndpointVerification => Self::ReturnResponse,
}
}
@ -56,4 +60,5 @@ pub struct OutgoingWebhook {
#[serde(tag = "type", content = "object", rename_all = "snake_case")]
pub enum OutgoingWebhookContent {
PaymentDetails(payments::PaymentsResponse),
RefundDetails(refunds::RefundResponse),
}

View File

@ -733,11 +733,7 @@ impl api::IncomingWebhook for Adyen {
) -> CustomResult<api::IncomingWebhookEvent, errors::ConnectorError> {
let notif = get_webhook_object_from_body(request.body)
.change_context(errors::ConnectorError::WebhookEventTypeNotFound)?;
Ok(match notif.event_code.as_str() {
"AUTHORISATION" => api::IncomingWebhookEvent::PaymentIntentSuccess,
_ => Err(errors::ConnectorError::WebhookEventTypeNotFound).into_report()?,
})
Ok(notif.event_code.into())
}
fn get_webhook_resource_object(

View File

@ -1,3 +1,4 @@
use api_models::webhooks::IncomingWebhookEvent;
use base64::Engine;
use error_stack::ResultExt;
use masking::PeekInterface;
@ -1168,6 +1169,27 @@ pub struct AdyenAmountWH {
pub currency: String,
}
#[derive(Debug, Deserialize, strum::Display)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
pub enum WebhookEventCode {
Authorisation,
Refund,
CancelOrRefund,
RefundFailed,
}
impl From<WebhookEventCode> for IncomingWebhookEvent {
fn from(code: WebhookEventCode) -> Self {
match code {
WebhookEventCode::Authorisation => Self::PaymentIntentSuccess,
WebhookEventCode::Refund => Self::RefundSuccess,
WebhookEventCode::CancelOrRefund => Self::RefundSuccess,
WebhookEventCode::RefundFailed => Self::RefundFailure,
}
}
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct AdyenNotificationRequestItemWH {
@ -1175,7 +1197,7 @@ pub struct AdyenNotificationRequestItemWH {
pub amount: AdyenAmountWH,
pub original_reference: Option<String>,
pub psp_reference: String,
pub event_code: String,
pub event_code: WebhookEventCode,
pub merchant_account_code: String,
pub merchant_reference: String,
pub success: String,

View File

@ -400,6 +400,8 @@ pub enum WebhooksFlowError {
MerchantWebhookURLNotConfigured,
#[error("Payments core flow failed")]
PaymentsCoreFailed,
#[error("Refunds core flow failed")]
RefundsCoreFailed,
#[error("Webhook event creation failed")]
WebhookEventCreationFailed,
#[error("Unable to fork webhooks flow for outgoing webhooks")]
@ -408,6 +410,8 @@ pub enum WebhooksFlowError {
CallToMerchantFailed,
#[error("Webhook not received by merchant")]
NotReceivedByMerchant,
#[error("Resource not found")]
ResourceNotFound,
}
#[derive(Debug, thiserror::Error)]

View File

@ -9,7 +9,7 @@ use crate::{
consts,
core::{
errors::{self, CustomResult, RouterResponse},
payments,
payments, refunds,
},
db::StorageInterface,
logger,
@ -90,6 +90,87 @@ async fn payments_incoming_webhook_flow(
Ok(())
}
#[instrument(skip_all)]
async fn refunds_incoming_webhook_flow(
state: AppState,
merchant_account: storage::MerchantAccount,
webhook_details: api::IncomingWebhookDetails,
connector_name: &str,
source_verified: bool,
event_type: api_models::webhooks::IncomingWebhookEvent,
) -> CustomResult<(), errors::WebhooksFlowError> {
let db = &*state.store;
//find refund by connector refund id
let refund = db
.find_refund_by_merchant_id_connector_refund_id_connector(
&merchant_account.merchant_id,
&webhook_details.object_reference_id,
connector_name,
merchant_account.storage_scheme,
)
.await
.change_context(errors::WebhooksFlowError::ResourceNotFound)
.attach_printable_lazy(|| "Failed fetching the refund")?;
let refund_id = refund.refund_id.to_owned();
//if source verified then update refund status else trigger refund sync
let updated_refund = if source_verified {
let refund_update = storage::RefundUpdate::StatusUpdate {
connector_refund_id: None,
sent_to_gateway: true,
refund_status: event_type
.foreign_try_into()
.into_report()
.change_context(errors::WebhooksFlowError::RefundsCoreFailed)?,
};
state
.store
.update_refund(
refund.to_owned(),
refund_update,
merchant_account.storage_scheme,
)
.await
.change_context(errors::WebhooksFlowError::RefundsCoreFailed)
.attach_printable_lazy(|| {
format!(
"Failed while updating refund: refund_id: {}",
refund_id.to_owned()
)
})?
} else {
refunds::refund_retrieve_core(&state, merchant_account.clone(), refund_id.to_owned())
.await
.change_context(errors::WebhooksFlowError::RefundsCoreFailed)
.attach_printable_lazy(|| {
format!(
"Failed while updating refund: refund_id: {}",
refund_id.to_owned()
)
})?
};
let event_type: enums::EventType = updated_refund
.refund_status
.foreign_try_into()
.into_report()
.change_context(errors::WebhooksFlowError::RefundsCoreFailed)?;
let refund_response: api_models::refunds::RefundResponse = updated_refund
.foreign_try_into()
.into_report()
.change_context(errors::WebhooksFlowError::RefundsCoreFailed)?;
create_event_and_trigger_outgoing_webhook(
state,
merchant_account,
event_type,
enums::EventClass::Refunds,
None,
refund_id,
enums::EventObjectType::RefundDetails,
api::OutgoingWebhookContent::RefundDetails(refund_response),
)
.await?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn create_event_and_trigger_outgoing_webhook(
@ -272,7 +353,7 @@ pub async fn webhooks_core(
)?,
};
let flow_type: api::WebhookFlow = event_type.into();
let flow_type: api::WebhookFlow = event_type.to_owned().into();
match flow_type {
api::WebhookFlow::Payment => payments_incoming_webhook_flow(
state.clone(),
@ -284,6 +365,18 @@ pub async fn webhooks_core(
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Incoming webhook flow for payments failed")?,
api::WebhookFlow::Refund => refunds_incoming_webhook_flow(
state.clone(),
merchant_account,
webhook_details,
connector_name,
source_verified,
event_type,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Incoming webhook flow for refunds failed")?,
api::WebhookFlow::ReturnResponse => {}
_ => Err(errors::ApiErrorResponse::InternalServerError)

View File

@ -36,6 +36,14 @@ pub trait RefundInterface {
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError>;
async fn find_refund_by_merchant_id_connector_refund_id_connector(
&self,
merchant_id: &str,
connector_refund_id: &str,
connector: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError>;
async fn update_refund(
&self,
this: storage_types::Refund,
@ -149,6 +157,25 @@ mod storage {
.into_report()
}
async fn find_refund_by_merchant_id_connector_refund_id_connector(
&self,
merchant_id: &str,
connector_refund_id: &str,
connector: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
storage_types::Refund::find_by_merchant_id_connector_refund_id_connector(
&conn,
merchant_id,
connector_refund_id,
connector,
)
.await
.map_err(Into::into)
.into_report()
}
// async fn find_refund_by_payment_id_merchant_id_refund_id(
// &self,
// payment_id: &str,
@ -311,7 +338,7 @@ mod storage {
Ok(HsetnxReply::KeySet) => {
let conn = pg_connection(&self.master_pool).await?;
let reverse_lookups = vec![
let mut reverse_lookups = vec![
storage_types::ReverseLookupNew {
sk_id: field.clone(),
lookup_id: format!(
@ -329,10 +356,25 @@ mod storage {
created_refund.merchant_id,
created_refund.internal_reference_id
),
pk_id: key,
pk_id: key.clone(),
source: "refund".to_string(),
},
];
if let Some(connector_refund_id) =
created_refund.to_owned().connector_refund_id
{
reverse_lookups.push(storage_types::ReverseLookupNew {
sk_id: field.clone(),
lookup_id: format!(
"{}_{}_{}",
created_refund.merchant_id,
connector_refund_id,
created_refund.connector
),
pk_id: key,
source: "refund".to_string(),
})
};
storage_types::ReverseLookupNew::batch_insert(reverse_lookups, &conn)
.await
.change_context(errors::StorageError::KVError)?;
@ -495,6 +537,47 @@ mod storage {
}
}
async fn find_refund_by_merchant_id_connector_refund_id_connector(
&self,
merchant_id: &str,
connector_refund_id: &str,
connector: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await?;
storage_types::Refund::find_by_merchant_id_connector_refund_id_connector(
&conn,
merchant_id,
connector_refund_id,
connector,
)
.await
.map_err(Into::into)
.into_report()
};
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => database_call().await,
enums::MerchantStorageScheme::RedisKv => {
let lookup_id = format!("{merchant_id}_{connector_refund_id}_{connector}");
let lookup = self
.get_lookup_by_lookup_id(&lookup_id)
.await
.map_err(Into::<errors::StorageError>::into)
.into_report()?;
let key = &lookup.pk_id;
db_utils::try_redis_get_else_try_database_get(
self.redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.get_hash_field_and_deserialize(key, &lookup.sk_id, "Refund"),
database_call,
)
.await
}
}
}
// async fn find_refund_by_payment_id_merchant_id_refund_id(
// &self,
// payment_id: &str,
@ -661,6 +744,28 @@ impl RefundInterface for MockDb {
})
}
async fn find_refund_by_merchant_id_connector_refund_id_connector(
&self,
merchant_id: &str,
connector_refund_id: &str,
connector: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let refunds = self.refunds.lock().await;
refunds
.iter()
.find(|refund| {
refund.merchant_id == merchant_id
&& refund.connector_refund_id == Some(connector_refund_id.to_string())
&& refund.connector == connector
})
.cloned()
.ok_or_else(|| {
errors::StorageError::DatabaseError(DatabaseError::NotFound.into()).into()
})
}
async fn find_refund_by_payment_id_merchant_id(
&self,
_payment_id: &str,

View File

@ -214,6 +214,40 @@ impl TryFrom<F<api_enums::IntentStatus>> for F<storage_enums::EventType> {
}
}
impl TryFrom<F<storage_enums::RefundStatus>> for F<storage_enums::EventType> {
type Error = errors::ValidationError;
fn try_from(value: F<storage_enums::RefundStatus>) -> Result<Self, Self::Error> {
match value.0 {
storage_enums::RefundStatus::Success => Ok(storage_enums::EventType::RefundSucceeded),
storage_enums::RefundStatus::Failure => Ok(storage_enums::EventType::RefundFailed),
_ => Err(errors::ValidationError::IncorrectValueProvided {
field_name: "refund_status",
}),
}
.map(Into::into)
}
}
impl TryFrom<F<api_models::webhooks::IncomingWebhookEvent>> for F<storage_enums::RefundStatus> {
type Error = errors::ValidationError;
fn try_from(value: F<api_models::webhooks::IncomingWebhookEvent>) -> Result<Self, Self::Error> {
match value.0 {
api_models::webhooks::IncomingWebhookEvent::RefundSuccess => {
Ok(storage_enums::RefundStatus::Success)
}
api_models::webhooks::IncomingWebhookEvent::RefundFailure => {
Ok(storage_enums::RefundStatus::Failure)
}
_ => Err(errors::ValidationError::IncorrectValueProvided {
field_name: "incoming_webhook_event_type",
}),
}
.map(Into::into)
}
}
impl From<F<storage_enums::EventType>> for F<api_enums::EventType> {
fn from(event_type: F<storage_enums::EventType>) -> Self {
Self(frunk::labelled_convert_from(event_type.0))

View File

@ -272,6 +272,7 @@ pub enum Currency {
#[strum(serialize_all = "snake_case")]
pub enum EventClass {
Payments,
Refunds,
}
#[derive(
@ -290,6 +291,7 @@ pub enum EventClass {
#[strum(serialize_all = "snake_case")]
pub enum EventObjectType {
PaymentDetails,
RefundDetails,
}
#[derive(
@ -309,6 +311,8 @@ pub enum EventObjectType {
#[strum(serialize_all = "snake_case")]
pub enum EventType {
PaymentSucceeded,
RefundSucceeded,
RefundFailed,
}
#[derive(

View File

@ -57,6 +57,23 @@ impl Refund {
.await
}
#[instrument(skip(conn))]
pub async fn find_by_merchant_id_connector_refund_id_connector(
conn: &PgPooledConn,
merchant_id: &str,
connector_refund_id: &str,
connector: &str,
) -> StorageResult<Self> {
generics::generic_find_one::<<Self as HasTable>::Table, _, _>(
conn,
dsl::merchant_id
.eq(merchant_id.to_owned())
.and(dsl::connector_refund_id.eq(connector_refund_id.to_owned()))
.and(dsl::connector.eq(connector.to_owned())),
)
.await
}
#[instrument(skip(conn))]
pub async fn find_by_internal_reference_id_merchant_id(
conn: &PgPooledConn,

View File

@ -0,0 +1 @@
-- This file should undo anything in `up.sql`

View File

@ -0,0 +1,8 @@
-- Your SQL goes here
ALTER TYPE "EventClass" ADD VALUE 'refunds';
ALTER TYPE "EventObjectType" ADD VALUE 'refund_details';
ALTER TYPE "EventType" ADD VALUE 'refund_succeeded';
ALTER TYPE "EventType" ADD VALUE 'refund_failed';