feat: Implement subscriptions workflow and incoming webhook support (#9400)

Co-authored-by: Prajjwal kumar <write2prajjwal@gmail.com>
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: Prajjwal Kumar <prajjwal.kumar@juspay.in>
This commit is contained in:
Gaurav Rawat
2025-10-03 21:31:21 +05:30
committed by GitHub
parent e186a0f4f4
commit 32dd9e10e3
12 changed files with 381 additions and 52 deletions

View File

@ -42,7 +42,7 @@ pub async fn create_subscription(
let billing_handler =
BillingHandler::create(&state, &merchant_context, customer, profile.clone()).await?;
let subscription_handler = SubscriptionHandler::new(&state, &merchant_context, profile);
let subscription_handler = SubscriptionHandler::new(&state, &merchant_context);
let mut subscription = subscription_handler
.create_subscription_entry(
subscription_id,
@ -50,10 +50,11 @@ pub async fn create_subscription(
billing_handler.connector_data.connector_name,
billing_handler.merchant_connector_id.clone(),
request.merchant_reference_id.clone(),
&profile.clone(),
)
.await
.attach_printable("subscriptions: failed to create subscription entry")?;
let invoice_handler = subscription.get_invoice_handler();
let invoice_handler = subscription.get_invoice_handler(profile.clone());
let payment = invoice_handler
.create_payment_with_confirm_false(subscription.handler.state, &request)
.await
@ -107,7 +108,7 @@ pub async fn create_and_confirm_subscription(
let billing_handler =
BillingHandler::create(&state, &merchant_context, customer, profile.clone()).await?;
let subscription_handler = SubscriptionHandler::new(&state, &merchant_context, profile.clone());
let subscription_handler = SubscriptionHandler::new(&state, &merchant_context);
let mut subs_handler = subscription_handler
.create_subscription_entry(
subscription_id.clone(),
@ -115,10 +116,11 @@ pub async fn create_and_confirm_subscription(
billing_handler.connector_data.connector_name,
billing_handler.merchant_connector_id.clone(),
request.merchant_reference_id.clone(),
&profile.clone(),
)
.await
.attach_printable("subscriptions: failed to create subscription entry")?;
let invoice_handler = subs_handler.get_invoice_handler();
let invoice_handler = subs_handler.get_invoice_handler(profile.clone());
let _customer_create_response = billing_handler
.create_customer_on_connector(
@ -210,9 +212,9 @@ pub async fn confirm_subscription(
.await
.attach_printable("subscriptions: failed to find customer")?;
let handler = SubscriptionHandler::new(&state, &merchant_context, profile.clone());
let handler = SubscriptionHandler::new(&state, &merchant_context);
let mut subscription_entry = handler.find_subscription(subscription_id).await?;
let invoice_handler = subscription_entry.get_invoice_handler();
let invoice_handler = subscription_entry.get_invoice_handler(profile.clone());
let invoice = invoice_handler
.get_latest_invoice(&state)
.await
@ -230,8 +232,8 @@ pub async fn confirm_subscription(
.await?;
let billing_handler =
BillingHandler::create(&state, &merchant_context, customer, profile).await?;
let invoice_handler = subscription_entry.get_invoice_handler();
BillingHandler::create(&state, &merchant_context, customer, profile.clone()).await?;
let invoice_handler = subscription_entry.get_invoice_handler(profile);
let subscription = subscription_entry.subscription.clone();
let _customer_create_response = billing_handler
@ -296,13 +298,13 @@ pub async fn get_subscription(
profile_id: common_utils::id_type::ProfileId,
subscription_id: common_utils::id_type::SubscriptionId,
) -> RouterResponse<SubscriptionResponse> {
let profile =
let _profile =
SubscriptionHandler::find_business_profile(&state, &merchant_context, &profile_id)
.await
.attach_printable(
"subscriptions: failed to find business profile in get_subscription",
)?;
let handler = SubscriptionHandler::new(&state, &merchant_context, profile);
let handler = SubscriptionHandler::new(&state, &merchant_context);
let subscription = handler
.find_subscription(subscription_id)
.await

View File

@ -14,24 +14,23 @@ use hyperswitch_domain_models::{
use masking::Secret;
use super::errors;
use crate::{core::subscription::invoice_handler::InvoiceHandler, routes::SessionState};
use crate::{
core::{errors::StorageErrorExt, subscription::invoice_handler::InvoiceHandler},
db::CustomResult,
routes::SessionState,
types::domain,
};
pub struct SubscriptionHandler<'a> {
pub state: &'a SessionState,
pub merchant_context: &'a MerchantContext,
pub profile: hyperswitch_domain_models::business_profile::Profile,
}
impl<'a> SubscriptionHandler<'a> {
pub fn new(
state: &'a SessionState,
merchant_context: &'a MerchantContext,
profile: hyperswitch_domain_models::business_profile::Profile,
) -> Self {
pub fn new(state: &'a SessionState, merchant_context: &'a MerchantContext) -> Self {
Self {
state,
merchant_context,
profile,
}
}
@ -43,6 +42,7 @@ impl<'a> SubscriptionHandler<'a> {
billing_processor: connector_enums::Connector,
merchant_connector_id: common_utils::id_type::MerchantConnectorAccountId,
merchant_reference_id: Option<String>,
profile: &hyperswitch_domain_models::business_profile::Profile,
) -> errors::RouterResult<SubscriptionWithHandler<'_>> {
let store = self.state.store.clone();
let db = store.as_ref();
@ -61,7 +61,7 @@ impl<'a> SubscriptionHandler<'a> {
.clone(),
customer_id.clone(),
None,
self.profile.get_id().clone(),
profile.get_id().clone(),
merchant_reference_id,
);
@ -76,7 +76,6 @@ impl<'a> SubscriptionHandler<'a> {
Ok(SubscriptionWithHandler {
handler: self,
subscription: new_subscription,
profile: self.profile.clone(),
merchant_account: self.merchant_context.get_merchant_account().clone(),
})
}
@ -145,7 +144,6 @@ impl<'a> SubscriptionHandler<'a> {
Ok(SubscriptionWithHandler {
handler: self,
subscription,
profile: self.profile.clone(),
merchant_account: self.merchant_context.get_merchant_account().clone(),
})
}
@ -153,7 +151,6 @@ impl<'a> SubscriptionHandler<'a> {
pub struct SubscriptionWithHandler<'a> {
pub handler: &'a SubscriptionHandler<'a>,
pub subscription: diesel_models::subscription::Subscription,
pub profile: hyperswitch_domain_models::business_profile::Profile,
pub merchant_account: hyperswitch_domain_models::merchant_account::MerchantAccount,
}
@ -237,11 +234,83 @@ impl SubscriptionWithHandler<'_> {
Ok(())
}
pub fn get_invoice_handler(&self) -> InvoiceHandler {
pub fn get_invoice_handler(
&self,
profile: hyperswitch_domain_models::business_profile::Profile,
) -> InvoiceHandler {
InvoiceHandler {
subscription: self.subscription.clone(),
merchant_account: self.merchant_account.clone(),
profile: self.profile.clone(),
profile,
}
}
pub async fn get_mca(
&mut self,
connector_name: &str,
) -> CustomResult<domain::MerchantConnectorAccount, errors::ApiErrorResponse> {
let db = self.handler.state.store.as_ref();
let key_manager_state = &(self.handler.state).into();
match &self.subscription.merchant_connector_id {
Some(merchant_connector_id) => {
#[cfg(feature = "v1")]
{
db.find_by_merchant_connector_account_merchant_id_merchant_connector_id(
key_manager_state,
self.handler
.merchant_context
.get_merchant_account()
.get_id(),
merchant_connector_id,
self.handler.merchant_context.get_merchant_key_store(),
)
.await
.to_not_found_response(
errors::ApiErrorResponse::MerchantConnectorAccountNotFound {
id: merchant_connector_id.get_string_repr().to_string(),
},
)
}
#[cfg(feature = "v2")]
{
//get mca using id
let _ = key_manager_state;
let _ = connector_name;
let _ = merchant_context.get_merchant_key_store();
let _ = subscription.profile_id;
todo!()
}
}
None => {
// Fallback to profile-based lookup when merchant_connector_id is not set
#[cfg(feature = "v1")]
{
db.find_merchant_connector_account_by_profile_id_connector_name(
key_manager_state,
&self.subscription.profile_id,
connector_name,
self.handler.merchant_context.get_merchant_key_store(),
)
.await
.to_not_found_response(
errors::ApiErrorResponse::MerchantConnectorAccountNotFound {
id: format!(
"profile_id {} and connector_name {connector_name}",
self.subscription.profile_id.get_string_repr()
),
},
)
}
#[cfg(feature = "v2")]
{
//get mca using id
let _ = key_manager_state;
let _ = connector_name;
let _ = self.handler.merchant_context.get_merchant_key_store();
let _ = self.subscription.profile_id;
todo!()
}
}
}
}
}

View File

@ -3,7 +3,11 @@ use std::{str::FromStr, time::Instant};
use actix_web::FromRequest;
#[cfg(feature = "payouts")]
use api_models::payouts as payout_models;
use api_models::webhooks::{self, WebhookResponseTracker};
use api_models::{
enums::Connector,
webhooks::{self, WebhookResponseTracker},
};
pub use common_enums::{connector_enums::InvoiceStatus, enums::ProcessTrackerRunner};
use common_utils::{
errors::ReportSwitchExt,
events::ApiEventsType,
@ -30,7 +34,9 @@ use crate::{
errors::{self, ConnectorErrorExt, CustomResult, RouterResponse, StorageErrorExt},
metrics, payment_methods,
payments::{self, tokenization},
refunds, relay, unified_connector_service, utils as core_utils,
refunds, relay,
subscription::subscription_handler::SubscriptionHandler,
unified_connector_service, utils as core_utils,
webhooks::{network_tokenization_incoming, utils::construct_webhook_router_data},
},
db::StorageInterface,
@ -253,6 +259,7 @@ async fn incoming_webhooks_core<W: types::OutgoingWebhookType>(
&request_details,
merchant_context.get_merchant_account().get_id(),
merchant_connector_account
.clone()
.and_then(|mca| mca.connector_webhook_details.clone()),
&connector_name,
)
@ -342,6 +349,11 @@ async fn incoming_webhooks_core<W: types::OutgoingWebhookType>(
&webhook_processing_result.transform_data,
&final_request_details,
is_relay_webhook,
merchant_connector_account
.ok_or(errors::ApiErrorResponse::MerchantConnectorAccountNotFound {
id: connector_name_or_mca_id.to_string(),
})?
.merchant_connector_id,
)
.await;
@ -524,12 +536,13 @@ async fn process_webhook_business_logic(
webhook_transform_data: &Option<Box<unified_connector_service::WebhookTransformData>>,
request_details: &IncomingWebhookRequestDetails<'_>,
is_relay_webhook: bool,
billing_connector_mca_id: common_utils::id_type::MerchantConnectorAccountId,
) -> errors::RouterResult<WebhookResponseTracker> {
let object_ref_id = connector
.get_webhook_object_reference_id(request_details)
.switch()
.attach_printable("Could not find object reference id in incoming webhook body")?;
let connector_enum = api_models::enums::Connector::from_str(connector_name)
let connector_enum = Connector::from_str(connector_name)
.change_context(errors::ApiErrorResponse::InvalidDataValue {
field_name: "connector",
})
@ -814,6 +827,21 @@ async fn process_webhook_business_logic(
.await
.attach_printable("Incoming webhook flow for payouts failed"),
api::WebhookFlow::Subscription => Box::pin(subscription_incoming_webhook_flow(
state.clone(),
req_state,
merchant_context.clone(),
business_profile,
webhook_details,
source_verified,
connector,
request_details,
event_type,
billing_connector_mca_id,
))
.await
.attach_printable("Incoming webhook flow for subscription failed"),
_ => Err(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Unsupported Flow Type received in incoming webhooks"),
}
@ -845,7 +873,7 @@ fn handle_incoming_webhook_error(
logger::error!(?error, "Incoming webhook flow failed");
// fetch the connector enum from the connector name
let connector_enum = api_models::connector_enums::Connector::from_str(connector_name)
let connector_enum = Connector::from_str(connector_name)
.change_context(errors::ApiErrorResponse::InvalidDataValue {
field_name: "connector",
})
@ -2533,3 +2561,92 @@ fn insert_mandate_details(
)?;
Ok(connector_mandate_details)
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn subscription_incoming_webhook_flow(
state: SessionState,
_req_state: ReqState,
merchant_context: domain::MerchantContext,
business_profile: domain::Profile,
_webhook_details: api::IncomingWebhookDetails,
source_verified: bool,
connector_enum: &ConnectorEnum,
request_details: &IncomingWebhookRequestDetails<'_>,
event_type: webhooks::IncomingWebhookEvent,
billing_connector_mca_id: common_utils::id_type::MerchantConnectorAccountId,
) -> CustomResult<WebhookResponseTracker, errors::ApiErrorResponse> {
// Only process invoice_generated events for MIT payments
if event_type != webhooks::IncomingWebhookEvent::InvoiceGenerated {
return Ok(WebhookResponseTracker::NoEffect);
}
if !source_verified {
logger::error!("Webhook source verification failed for subscription webhook flow");
return Err(report!(
errors::ApiErrorResponse::WebhookAuthenticationFailed
));
}
let connector_name = connector_enum.id().to_string();
let connector = Connector::from_str(&connector_name)
.change_context(errors::ConnectorError::InvalidConnectorName)
.change_context(errors::ApiErrorResponse::IncorrectConnectorNameGiven)
.attach_printable_lazy(|| format!("unable to parse connector name {connector_name}"))?;
let mit_payment_data = connector_enum
.get_subscription_mit_payment_data(request_details)
.change_context(errors::ApiErrorResponse::WebhookProcessingFailure)
.attach_printable("Failed to extract MIT payment data from subscription webhook")?;
if mit_payment_data.first_invoice {
return Ok(WebhookResponseTracker::NoEffect);
}
let profile_id = business_profile.get_id().clone();
let profile =
SubscriptionHandler::find_business_profile(&state, &merchant_context, &profile_id)
.await
.attach_printable(
"subscriptions: failed to find business profile in get_subscription",
)?;
let handler = SubscriptionHandler::new(&state, &merchant_context);
let subscription_id = mit_payment_data.subscription_id.clone();
let subscription_with_handler = handler
.find_subscription(subscription_id)
.await
.attach_printable("subscriptions: failed to get subscription entry in get_subscription")?;
let invoice_handler = subscription_with_handler.get_invoice_handler(profile);
let payment_method_id = subscription_with_handler
.subscription
.payment_method_id
.clone()
.ok_or(errors::ApiErrorResponse::GenericNotFoundError {
message: "No payment method found for subscription".to_string(),
})
.attach_printable("No payment method found for subscription")?;
logger::info!("Payment method ID found: {}", payment_method_id);
let _invoice_new = invoice_handler
.create_invoice_entry(
&state,
billing_connector_mca_id.clone(),
None,
mit_payment_data.amount_due,
mit_payment_data.currency_code,
InvoiceStatus::PaymentPending,
connector,
None,
)
.await?;
Ok(WebhookResponseTracker::NoEffect)
}

View File

@ -1456,6 +1456,7 @@ impl RecoveryAction {
| webhooks::IncomingWebhookEvent::PayoutCreated
| webhooks::IncomingWebhookEvent::PayoutExpired
| webhooks::IncomingWebhookEvent::PayoutReversed
| webhooks::IncomingWebhookEvent::InvoiceGenerated
| webhooks::IncomingWebhookEvent::SetupWebhook => {
common_types::payments::RecoveryAction::InvalidAction
}

View File

@ -44,6 +44,8 @@ use serde_json::Value;
use tracing_futures::Instrument;
pub use self::ext_traits::{OptionExt, ValidateCall};
#[cfg(feature = "v1")]
use crate::core::subscription::subscription_handler::SubscriptionHandler;
use crate::{
consts,
core::{
@ -459,7 +461,6 @@ pub async fn get_mca_from_payment_intent(
}
}
}
#[cfg(feature = "payouts")]
pub async fn get_mca_from_payout_attempt(
state: &SessionState,
@ -639,6 +640,22 @@ pub async fn get_mca_from_object_reference_id(
)
.await
}
webhooks::ObjectReferenceId::SubscriptionId(subscription_id_type) => {
#[cfg(feature = "v1")]
{
let subscription_handler = SubscriptionHandler::new(state, merchant_context);
let mut subscription_with_handler = subscription_handler
.find_subscription(subscription_id_type)
.await?;
subscription_with_handler.get_mca(connector_name).await
}
#[cfg(feature = "v2")]
{
let _db = db;
todo!()
}
}
#[cfg(feature = "payouts")]
webhooks::ObjectReferenceId::PayoutId(payout_id_type) => {
get_mca_from_payout_attempt(state, merchant_context, payout_id_type, connector_name)