feat(router): add mandates incoming webhooks flow (#2464)

This commit is contained in:
Sai Harsha Vardhan
2023-10-06 00:38:47 +05:30
committed by GitHub
parent 3f0d927cb8
commit 1cf8b6cf53
19 changed files with 321 additions and 8 deletions

View File

@ -19,7 +19,7 @@ pub struct MandateRevokedResponse {
pub status: api_enums::MandateStatus,
}
#[derive(Default, Debug, Deserialize, Serialize, ToSchema)]
#[derive(Default, Debug, Deserialize, Serialize, ToSchema, Clone)]
pub struct MandateResponse {
/// The identifier for mandate
pub mandate_id: String,
@ -37,7 +37,7 @@ pub struct MandateResponse {
pub customer_acceptance: Option<payments::CustomerAcceptance>,
}
#[derive(Default, Debug, Deserialize, Serialize, ToSchema)]
#[derive(Default, Debug, Deserialize, Serialize, ToSchema, Clone)]
pub struct MandateCardDetails {
/// The last 4 digits of card
pub last4_digits: Option<String>,

View File

@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
use time::PrimitiveDateTime;
use utoipa::ToSchema;
use crate::{disputes, enums as api_enums, payments, refunds};
use crate::{disputes, enums as api_enums, mandates, payments, refunds};
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Copy)]
#[serde(rename_all = "snake_case")]
@ -27,6 +27,8 @@ pub enum IncomingWebhookEvent {
DisputeWon,
// dispute has been unsuccessfully challenged
DisputeLost,
MandateActive,
MandateRevoked,
EndpointVerification,
}
@ -37,6 +39,7 @@ pub enum WebhookFlow {
Subscription,
ReturnResponse,
BankTransfer,
Mandate,
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
@ -56,6 +59,10 @@ pub enum WebhookResponseTracker {
payment_id: String,
status: common_enums::DisputeStatus,
},
Mandate {
mandate_id: String,
status: common_enums::MandateStatus,
},
NoEffect,
}
@ -65,7 +72,7 @@ impl WebhookResponseTracker {
Self::Payment { payment_id, .. }
| Self::Refund { payment_id, .. }
| Self::Dispute { payment_id, .. } => Some(payment_id.to_string()),
Self::NoEffect => None,
Self::NoEffect | Self::Mandate { .. } => None,
}
}
}
@ -82,6 +89,9 @@ impl From<IncomingWebhookEvent> for WebhookFlow {
IncomingWebhookEvent::RefundSuccess | IncomingWebhookEvent::RefundFailure => {
Self::Refund
}
IncomingWebhookEvent::MandateActive | IncomingWebhookEvent::MandateRevoked => {
Self::Mandate
}
IncomingWebhookEvent::DisputeOpened
| IncomingWebhookEvent::DisputeAccepted
| IncomingWebhookEvent::DisputeExpired
@ -104,10 +114,17 @@ pub enum RefundIdType {
ConnectorRefundId(String),
}
#[derive(Clone)]
pub enum MandateIdType {
MandateId(String),
ConnectorMandateId(String),
}
#[derive(Clone)]
pub enum ObjectReferenceId {
PaymentId(payments::PaymentIdType),
RefundId(RefundIdType),
MandateId(MandateIdType),
}
pub struct IncomingWebhookDetails {
@ -144,6 +161,8 @@ pub enum OutgoingWebhookContent {
RefundDetails(refunds::RefundResponse),
#[schema(value_type = DisputeResponse)]
DisputeDetails(Box<disputes::DisputeResponse>),
#[schema(value_type = MandateResponse)]
MandateDetails(Box<mandates::MandateResponse>),
}
#[derive(Debug, Clone, Serialize)]

View File

@ -804,6 +804,8 @@ pub enum EventType {
DisputeChallenged,
DisputeWon,
DisputeLost,
MandateActive,
MandateRevoked,
}
#[derive(

View File

@ -39,6 +39,7 @@ pub enum EventClass {
Payments,
Refunds,
Disputes,
Mandates,
}
#[derive(
@ -59,6 +60,7 @@ pub enum EventObjectType {
PaymentDetails,
RefundDetails,
DisputeDetails,
MandateDetails,
}
#[derive(

View File

@ -30,6 +30,7 @@ pub struct Mandate {
pub end_date: Option<PrimitiveDateTime>,
pub metadata: Option<pii::SecretSerdeValue>,
pub connector_mandate_ids: Option<pii::SecretSerdeValue>,
pub original_payment_id: Option<String>,
}
#[derive(
@ -58,6 +59,7 @@ pub struct MandateNew {
pub end_date: Option<PrimitiveDateTime>,
pub metadata: Option<pii::SecretSerdeValue>,
pub connector_mandate_ids: Option<pii::SecretSerdeValue>,
pub original_payment_id: Option<String>,
}
#[derive(Debug)]

View File

@ -27,6 +27,20 @@ impl Mandate {
.await
}
pub async fn find_by_merchant_id_connector_mandate_id(
conn: &PgPooledConn,
merchant_id: &str,
connector_mandate_id: &str,
) -> StorageResult<Self> {
generics::generic_find_one::<<Self as HasTable>::Table, _, _>(
conn,
dsl::merchant_id
.eq(merchant_id.to_owned())
.and(dsl::connector_mandate_id.eq(connector_mandate_id.to_owned())),
)
.await
}
pub async fn find_by_merchant_id_customer_id(
conn: &PgPooledConn,
merchant_id: &str,

View File

@ -398,6 +398,8 @@ diesel::table! {
end_date -> Nullable<Timestamp>,
metadata -> Nullable<Jsonb>,
connector_mandate_ids -> Nullable<Jsonb>,
#[max_length = 64]
original_payment_id -> Nullable<Varchar>,
}
}

View File

@ -1,5 +1,5 @@
use api_models::{
enums::DisputeStatus,
enums::{DisputeStatus, MandateStatus},
webhooks::{self as api},
};
use common_utils::{crypto::SignMessage, date_time, ext_traits};
@ -73,6 +73,7 @@ pub enum StripeWebhookObject {
PaymentIntent(StripePaymentIntentResponse),
Refund(StripeRefundResponse),
Dispute(StripeDisputeResponse),
Mandate(StripeMandateResponse),
}
#[derive(Serialize, Debug)]
@ -85,6 +86,22 @@ pub struct StripeDisputeResponse {
pub status: StripeDisputeStatus,
}
#[derive(Serialize, Debug)]
pub struct StripeMandateResponse {
pub mandate_id: String,
pub status: StripeMandateStatus,
pub payment_method_id: String,
pub payment_method: String,
}
#[derive(Serialize, Debug)]
#[serde(rename_all = "snake_case")]
pub enum StripeMandateStatus {
Active,
Inactive,
Pending,
}
#[derive(Serialize, Debug)]
#[serde(rename_all = "snake_case")]
pub enum StripeDisputeStatus {
@ -111,6 +128,27 @@ impl From<api_models::disputes::DisputeResponse> for StripeDisputeResponse {
}
}
impl From<api_models::mandates::MandateResponse> for StripeMandateResponse {
fn from(res: api_models::mandates::MandateResponse) -> Self {
Self {
mandate_id: res.mandate_id,
payment_method: res.payment_method,
payment_method_id: res.payment_method_id,
status: StripeMandateStatus::from(res.status),
}
}
}
impl From<MandateStatus> for StripeMandateStatus {
fn from(status: MandateStatus) -> Self {
match status {
MandateStatus::Active => Self::Active,
MandateStatus::Inactive | MandateStatus::Revoked => Self::Inactive,
MandateStatus::Pending => Self::Pending,
}
}
}
impl From<DisputeStatus> for StripeDisputeStatus {
fn from(status: DisputeStatus) -> Self {
match status {
@ -142,6 +180,8 @@ fn get_stripe_event_type(event_type: api_models::enums::EventType) -> &'static s
api_models::enums::EventType::DisputeChallenged => "dispute.challenged",
api_models::enums::EventType::DisputeWon => "dispute.won",
api_models::enums::EventType::DisputeLost => "dispute.lost",
api_models::enums::EventType::MandateActive => "mandate.active",
api_models::enums::EventType::MandateRevoked => "mandate.revoked",
}
}
@ -179,6 +219,9 @@ impl From<api::OutgoingWebhookContent> for StripeWebhookObject {
api::OutgoingWebhookContent::DisputeDetails(dispute) => {
Self::Dispute((*dispute).into())
}
api::OutgoingWebhookContent::MandateDetails(mandate) => {
Self::Mandate((*mandate).into())
}
}
}
}

View File

@ -218,6 +218,7 @@ where
if let Some(new_mandate_data) = helpers::generate_mandate(
resp.merchant_id.clone(),
resp.payment_id.clone(),
resp.connector.clone(),
resp.request.get_setup_mandate_details().map(Clone::clone),
maybe_customer,

View File

@ -2115,6 +2115,7 @@ pub fn check_if_operation_confirm<Op: std::fmt::Debug>(operations: Op) -> bool {
#[allow(clippy::too_many_arguments)]
pub fn generate_mandate(
merchant_id: String,
payment_id: String,
connector: String,
setup_mandate_details: Option<MandateData>,
customer: &Option<domain::Customer>,
@ -2137,6 +2138,7 @@ pub fn generate_mandate(
.set_mandate_id(mandate_id)
.set_customer_id(cus.customer_id.clone())
.set_merchant_id(merchant_id)
.set_original_payment_id(Some(payment_id))
.set_payment_method_id(payment_method_id)
.set_connector(connector)
.set_mandate_status(storage_enums::MandateStatus::Active)

View File

@ -3,7 +3,10 @@ pub mod utils;
use std::str::FromStr;
use api_models::{payments::HeaderPayload, webhooks::WebhookResponseTracker};
use api_models::{
payments::HeaderPayload,
webhooks::{self, WebhookResponseTracker},
};
use common_utils::errors::ReportSwitchExt;
use error_stack::{report, IntoReport, ResultExt};
use masking::ExposeInterface;
@ -24,7 +27,9 @@ use crate::{
routes::{lock_utils, metrics::request::add_attributes, AppState},
services,
types::{
self as router_types, api, domain,
self as router_types,
api::{self, mandates::MandateResponseExt},
domain,
storage::{self, enums},
transformers::{ForeignInto, ForeignTryInto},
},
@ -389,6 +394,79 @@ pub async fn get_or_update_dispute_object(
}
}
pub async fn mandates_incoming_webhook_flow<W: types::OutgoingWebhookType>(
state: AppState,
merchant_account: domain::MerchantAccount,
webhook_details: api::IncomingWebhookDetails,
source_verified: bool,
event_type: api_models::webhooks::IncomingWebhookEvent,
) -> CustomResult<WebhookResponseTracker, errors::ApiErrorResponse> {
if source_verified {
let db = &*state.store;
let mandate = match webhook_details.object_reference_id {
webhooks::ObjectReferenceId::MandateId(webhooks::MandateIdType::MandateId(
mandate_id,
)) => db
.find_mandate_by_merchant_id_mandate_id(
&merchant_account.merchant_id,
mandate_id.as_str(),
)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?,
webhooks::ObjectReferenceId::MandateId(
webhooks::MandateIdType::ConnectorMandateId(connector_mandate_id),
) => db
.find_mandate_by_merchant_id_connector_mandate_id(
&merchant_account.merchant_id,
connector_mandate_id.as_str(),
)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?,
_ => Err(errors::ApiErrorResponse::WebhookProcessingFailure)
.into_report()
.attach_printable("received a non-mandate id for retrieving mandate")?,
};
let mandate_status = event_type
.foreign_try_into()
.into_report()
.change_context(errors::ApiErrorResponse::WebhookProcessingFailure)
.attach_printable("event type to mandate status mapping failed")?;
let updated_mandate = db
.update_mandate_by_merchant_id_mandate_id(
&merchant_account.merchant_id,
&mandate.mandate_id,
storage::MandateUpdate::StatusUpdate { mandate_status },
)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?;
let mandates_response = Box::new(
api::mandates::MandateResponse::from_db_mandate(&state, updated_mandate.clone())
.await?,
);
let event_type: Option<enums::EventType> = updated_mandate.mandate_status.foreign_into();
if let Some(outgoing_event_type) = event_type {
create_event_and_trigger_outgoing_webhook::<W>(
state,
merchant_account,
outgoing_event_type,
enums::EventClass::Mandates,
None,
updated_mandate.mandate_id.clone(),
enums::EventObjectType::MandateDetails,
api::OutgoingWebhookContent::MandateDetails(mandates_response),
)
.await?;
}
Ok(WebhookResponseTracker::Mandate {
mandate_id: updated_mandate.mandate_id,
status: updated_mandate.mandate_status,
})
} else {
logger::error!("Webhook source verification failed for mandates webhook flow");
Err(errors::ApiErrorResponse::WebhookAuthenticationFailed).into_report()
}
}
#[instrument(skip_all)]
pub async fn disputes_incoming_webhook_flow<W: types::OutgoingWebhookType>(
state: AppState,
@ -1012,6 +1090,16 @@ pub async fn webhooks_core<W: types::OutgoingWebhookType>(
api::WebhookFlow::ReturnResponse => WebhookResponseTracker::NoEffect,
api::WebhookFlow::Mandate => mandates_incoming_webhook_flow::<W>(
state.clone(),
merchant_account,
webhook_details,
source_verified,
event_type,
)
.await
.attach_printable("Incoming webhook flow for mandates failed")?,
_ => Err(errors::ApiErrorResponse::InternalServerError)
.into_report()
.attach_printable("Unsupported Flow Type received in incoming webhooks")?,

View File

@ -15,6 +15,12 @@ pub trait MandateInterface {
mandate_id: &str,
) -> CustomResult<storage::Mandate, errors::StorageError>;
async fn find_mandate_by_merchant_id_connector_mandate_id(
&self,
merchant_id: &str,
connector_mandate_id: &str,
) -> CustomResult<storage::Mandate, errors::StorageError>;
async fn find_mandate_by_merchant_id_customer_id(
&self,
merchant_id: &str,
@ -54,6 +60,22 @@ impl MandateInterface for Store {
.into_report()
}
async fn find_mandate_by_merchant_id_connector_mandate_id(
&self,
merchant_id: &str,
connector_mandate_id: &str,
) -> CustomResult<storage::Mandate, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage::Mandate::find_by_merchant_id_connector_mandate_id(
&conn,
merchant_id,
connector_mandate_id,
)
.await
.map_err(Into::into)
.into_report()
}
async fn find_mandate_by_merchant_id_customer_id(
&self,
merchant_id: &str,
@ -121,6 +143,24 @@ impl MandateInterface for MockDb {
.map_err(|err| err.into())
}
async fn find_mandate_by_merchant_id_connector_mandate_id(
&self,
merchant_id: &str,
connector_mandate_id: &str,
) -> CustomResult<storage::Mandate, errors::StorageError> {
self.mandates
.lock()
.await
.iter()
.find(|mandate| {
mandate.merchant_id == merchant_id
&& mandate.connector_mandate_id == Some(connector_mandate_id.to_string())
})
.cloned()
.ok_or_else(|| errors::StorageError::ValueNotFound("mandate not found".to_string()))
.map_err(|err| err.into())
}
async fn find_mandate_by_merchant_id_customer_id(
&self,
merchant_id: &str,
@ -229,6 +269,7 @@ impl MandateInterface for MockDb {
mandate_id: mandate_new.mandate_id.clone(),
customer_id: mandate_new.customer_id,
merchant_id: mandate_new.merchant_id,
original_payment_id: mandate_new.original_payment_id,
payment_method_id: mandate_new.payment_method_id,
mandate_status: mandate_new.mandate_status,
mandate_type: mandate_new.mandate_type,

View File

@ -409,6 +409,16 @@ impl ForeignFrom<storage_enums::DisputeStatus> for storage_enums::EventType {
}
}
impl ForeignFrom<storage_enums::MandateStatus> for Option<storage_enums::EventType> {
fn foreign_from(value: storage_enums::MandateStatus) -> Self {
match value {
storage_enums::MandateStatus::Active => Some(storage_enums::EventType::MandateActive),
storage_enums::MandateStatus::Revoked => Some(storage_enums::EventType::MandateRevoked),
storage_enums::MandateStatus::Inactive | storage_enums::MandateStatus::Pending => None,
}
}
}
impl ForeignTryFrom<api_models::webhooks::IncomingWebhookEvent> for storage_enums::RefundStatus {
type Error = errors::ValidationError;
@ -425,6 +435,22 @@ impl ForeignTryFrom<api_models::webhooks::IncomingWebhookEvent> for storage_enum
}
}
impl ForeignTryFrom<api_models::webhooks::IncomingWebhookEvent> for storage_enums::MandateStatus {
type Error = errors::ValidationError;
fn foreign_try_from(
value: api_models::webhooks::IncomingWebhookEvent,
) -> Result<Self, Self::Error> {
match value {
api_models::webhooks::IncomingWebhookEvent::MandateActive => Ok(Self::Active),
api_models::webhooks::IncomingWebhookEvent::MandateRevoked => Ok(Self::Revoked),
_ => Err(errors::ValidationError::IncorrectValueProvided {
field_name: "incoming_webhook_event_type",
}),
}
}
}
impl ForeignFrom<storage::Config> for api_types::Config {
fn foreign_from(config: storage::Config) -> Self {
let config = config;

View File

@ -289,6 +289,40 @@ pub async fn find_payment_intent_from_refund_id_type(
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
}
pub async fn find_payment_intent_from_mandate_id_type(
db: &dyn StorageInterface,
mandate_id_type: webhooks::MandateIdType,
merchant_account: &domain::MerchantAccount,
) -> CustomResult<PaymentIntent, errors::ApiErrorResponse> {
let mandate = match mandate_id_type {
webhooks::MandateIdType::MandateId(mandate_id) => db
.find_mandate_by_merchant_id_mandate_id(
&merchant_account.merchant_id,
mandate_id.as_str(),
)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?,
webhooks::MandateIdType::ConnectorMandateId(connector_mandate_id) => db
.find_mandate_by_merchant_id_connector_mandate_id(
&merchant_account.merchant_id,
connector_mandate_id.as_str(),
)
.await
.to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?,
};
db.find_payment_intent_by_payment_id_merchant_id(
&mandate
.original_payment_id
.ok_or(errors::ApiErrorResponse::InternalServerError)
.into_report()
.attach_printable("original_payment_id not present in mandate record")?,
&merchant_account.merchant_id,
merchant_account.storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound)
}
pub async fn get_profile_id_using_object_reference_id(
db: &dyn StorageInterface,
object_reference_id: webhooks::ObjectReferenceId,
@ -312,6 +346,10 @@ pub async fn get_profile_id_using_object_reference_id(
)
.await?
}
webhooks::ObjectReferenceId::MandateId(mandate_id_type) => {
find_payment_intent_from_mandate_id_type(db, mandate_id_type, merchant_account)
.await?
}
};
let profile_id = utils::get_profile_id_from_business_details(

View File

@ -0,0 +1 @@
ALTER TABLE mandate DROP COLUMN original_payment_id;

View File

@ -0,0 +1,2 @@
-- Your SQL goes here
ALTER TABLE mandate ADD COLUMN original_payment_id VARCHAR(64);

View File

@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
Select 1;

View File

@ -0,0 +1,8 @@
-- Your SQL goes here
ALTER TYPE "EventClass" ADD VALUE 'mandates';
ALTER TYPE "EventObjectType" ADD VALUE 'mandate_details';
ALTER TYPE "EventType" ADD VALUE 'mandate_active';
ALTER TYPE "EventType" ADD VALUE 'mandate_revoked';

View File

@ -4998,7 +4998,9 @@
"dispute_cancelled",
"dispute_challenged",
"dispute_won",
"dispute_lost"
"dispute_lost",
"mandate_active",
"mandate_revoked"
]
},
"FeatureMetadata": {
@ -7358,6 +7360,24 @@
"$ref": "#/components/schemas/DisputeResponse"
}
}
},
{
"type": "object",
"required": [
"type",
"object"
],
"properties": {
"type": {
"type": "string",
"enum": [
"mandate_details"
]
},
"object": {
"$ref": "#/components/schemas/MandateResponse"
}
}
}
],
"discriminator": {