feat: Add outgoing webhooks for subscriptions (#9859)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: Jagan Elavarasan <jaganelavarasan@gmail.com>
This commit is contained in:
Sarthak Soni
2025-10-27 15:24:12 +05:30
committed by GitHub
parent 6ff2116461
commit 62035c4aeb
27 changed files with 600 additions and 137 deletions

View File

@ -12970,6 +12970,74 @@
}
}
},
"ConfirmSubscriptionResponse": {
"type": "object",
"required": [
"id",
"status",
"profile_id"
],
"properties": {
"id": {
"$ref": "#/components/schemas/SubscriptionId"
},
"merchant_reference_id": {
"type": "string",
"description": "Merchant specific Unique identifier.",
"nullable": true
},
"status": {
"$ref": "#/components/schemas/SubscriptionStatus"
},
"plan_id": {
"type": "string",
"description": "Identifier for the associated subscription plan.",
"nullable": true
},
"item_price_id": {
"type": "string",
"description": "Identifier for the associated item_price_id for the subscription.",
"nullable": true
},
"coupon": {
"type": "string",
"description": "Optional coupon code applied to this subscription.",
"nullable": true
},
"profile_id": {
"$ref": "#/components/schemas/ProfileId"
},
"payment": {
"allOf": [
{
"$ref": "#/components/schemas/PaymentResponseData"
}
],
"nullable": true
},
"customer_id": {
"allOf": [
{
"$ref": "#/components/schemas/CustomerId"
}
],
"nullable": true
},
"invoice": {
"allOf": [
{
"$ref": "#/components/schemas/Invoice"
}
],
"nullable": true
},
"billing_processor_subscription_id": {
"type": "string",
"description": "Billing Processor subscription ID.",
"nullable": true
}
}
},
"Connector": {
"type": "string",
"enum": [
@ -16076,7 +16144,8 @@
"refunds",
"disputes",
"mandates",
"payouts"
"payouts",
"subscriptions"
]
},
"EventListConstraints": {
@ -16273,7 +16342,8 @@
"payout_processing",
"payout_cancelled",
"payout_expired",
"payout_reversed"
"payout_reversed",
"invoice_paid"
]
},
"ExtendedCardInfo": {
@ -18400,6 +18470,11 @@
},
"status": {
"$ref": "#/components/schemas/InvoiceStatus"
},
"billing_processor_invoice_id": {
"type": "string",
"description": "billing processor invoice id",
"nullable": true
}
}
},
@ -21648,6 +21723,25 @@
"$ref": "#/components/schemas/PayoutCreateResponse"
}
}
},
{
"type": "object",
"title": "ConfirmSubscriptionResponse",
"required": [
"type",
"object"
],
"properties": {
"type": {
"type": "string",
"enum": [
"subscription_details"
]
},
"object": {
"$ref": "#/components/schemas/ConfirmSubscriptionResponse"
}
}
}
],
"discriminator": {

View File

@ -11308,7 +11308,8 @@
"refunds",
"disputes",
"mandates",
"payouts"
"payouts",
"subscriptions"
]
},
"EventListItemResponse": {
@ -11435,7 +11436,8 @@
"payout_processing",
"payout_cancelled",
"payout_expired",
"payout_reversed"
"payout_reversed",
"invoice_paid"
]
},
"ExtendedCardInfo": {

View File

@ -1,4 +1,4 @@
use common_enums::connector_enums::InvoiceStatus;
use common_enums::{connector_enums::InvoiceStatus, SubscriptionStatus};
use common_types::payments::CustomerAcceptance;
use common_utils::{
events::ApiEventMetric,
@ -94,43 +94,6 @@ pub struct SubscriptionResponse {
pub invoice: Option<Invoice>,
}
/// Possible states of a subscription lifecycle.
///
/// - `Created`: Subscription was created but not yet activated.
/// - `Active`: Subscription is currently active.
/// - `InActive`: Subscription is inactive.
/// - `Pending`: Subscription is pending activation.
/// - `Trial`: Subscription is in a trial period.
/// - `Paused`: Subscription is paused.
/// - `Unpaid`: Subscription is unpaid.
/// - `Onetime`: Subscription is a one-time payment.
/// - `Cancelled`: Subscription has been cancelled.
/// - `Failed`: Subscription has failed.
#[derive(Debug, Clone, serde::Serialize, strum::EnumString, strum::Display, ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum SubscriptionStatus {
/// Subscription is active.
Active,
/// Subscription is created but not yet active.
Created,
/// Subscription is inactive.
InActive,
/// Subscription is in pending state.
Pending,
/// Subscription is in trial state.
Trial,
/// Subscription is paused.
Paused,
/// Subscription is unpaid.
Unpaid,
/// Subscription is a one-time payment.
Onetime,
/// Subscription is cancelled.
Cancelled,
/// Subscription has failed.
Failed,
}
impl SubscriptionResponse {
/// Creates a new [`CreateSubscriptionResponse`] with the given identifiers.
///
@ -342,6 +305,12 @@ pub struct PaymentResponseData {
pub payment_type: Option<PaymentType>,
}
impl PaymentResponseData {
pub fn get_billing_address(&self) -> Option<Address> {
self.billing.clone()
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema)]
pub struct CreateMitPaymentRequestData {
pub amount: MinorUnit,
@ -451,6 +420,18 @@ pub struct ConfirmSubscriptionResponse {
pub billing_processor_subscription_id: Option<String>,
}
impl ConfirmSubscriptionResponse {
pub fn get_optional_invoice_id(&self) -> Option<InvoiceId> {
self.invoice.as_ref().map(|invoice| invoice.id.to_owned())
}
pub fn get_optional_payment_id(&self) -> Option<PaymentId> {
self.payment
.as_ref()
.map(|payment| payment.payment_id.to_owned())
}
}
#[derive(Debug, Clone, serde::Serialize, ToSchema)]
pub struct Invoice {
/// Unique identifier for the invoice.
@ -486,6 +467,9 @@ pub struct Invoice {
/// Status of the invoice.
pub status: InvoiceStatus,
/// billing processor invoice id
pub billing_processor_invoice_id: Option<String>,
}
impl ApiEventMetric for ConfirmSubscriptionResponse {}

View File

@ -5,7 +5,7 @@ use utoipa::ToSchema;
#[cfg(feature = "payouts")]
use crate::payouts;
use crate::{disputes, enums as api_enums, mandates, payments, refunds};
use crate::{disputes, enums as api_enums, mandates, payments, refunds, subscription};
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Copy)]
#[serde(rename_all = "snake_case")]
@ -446,6 +446,8 @@ pub enum OutgoingWebhookContent {
#[cfg(feature = "payouts")]
#[schema(value_type = PayoutCreateResponse, title = "PayoutCreateResponse")]
PayoutDetails(Box<payouts::PayoutCreateResponse>),
#[schema(value_type = ConfirmSubscriptionResponse, title = "ConfirmSubscriptionResponse")]
SubscriptionDetails(Box<subscription::ConfirmSubscriptionResponse>),
}
#[derive(Debug, Clone, Serialize, ToSchema)]

View File

@ -1504,6 +1504,29 @@ impl Currency {
}
}
#[derive(
Clone,
Copy,
Debug,
Eq,
PartialEq,
serde::Deserialize,
serde::Serialize,
strum::Display,
strum::EnumString,
)]
#[router_derive::diesel_enum(storage_type = "db_enum")]
#[serde(rename_all = "snake_case")]
#[strum(serialize_all = "snake_case")]
pub enum EventObjectType {
PaymentDetails,
RefundDetails,
DisputeDetails,
MandateDetails,
PayoutDetails,
SubscriptionDetails,
}
#[derive(
Clone,
Copy,
@ -1527,6 +1550,7 @@ pub enum EventClass {
Mandates,
#[cfg(feature = "payouts")]
Payouts,
Subscriptions,
}
impl EventClass {
@ -1565,6 +1589,7 @@ impl EventClass {
EventType::PayoutExpired,
EventType::PayoutReversed,
]),
Self::Subscriptions => HashSet::from([EventType::InvoicePaid]),
}
}
}
@ -1624,6 +1649,7 @@ pub enum EventType {
PayoutExpired,
#[cfg(feature = "payouts")]
PayoutReversed,
InvoicePaid,
}
#[derive(
@ -9781,3 +9807,49 @@ impl From<IntentStatus> for InvoiceStatus {
}
}
}
/// Possible states of a subscription lifecycle.
///
/// - `Created`: Subscription was created but not yet activated.
/// - `Active`: Subscription is currently active.
/// - `InActive`: Subscription is inactive.
/// - `Pending`: Subscription is pending activation.
/// - `Trial`: Subscription is in a trial period.
/// - `Paused`: Subscription is paused.
/// - `Unpaid`: Subscription is unpaid.
/// - `Onetime`: Subscription is a one-time payment.
/// - `Cancelled`: Subscription has been cancelled.
/// - `Failed`: Subscription has failed.
#[derive(
Debug,
Clone,
Copy,
serde::Serialize,
strum::EnumString,
strum::Display,
strum::EnumIter,
ToSchema,
)]
#[serde(rename_all = "snake_case")]
pub enum SubscriptionStatus {
/// Subscription is active.
Active,
/// Subscription is created but not yet active.
Created,
/// Subscription is inactive.
InActive,
/// Subscription is in pending state.
Pending,
/// Subscription is in trial state.
Trial,
/// Subscription is paused.
Paused,
/// Subscription is unpaid.
Unpaid,
/// Subscription is a one-time payment.
Onetime,
/// Subscription is cancelled.
Cancelled,
/// Subscription has failed.
Failed,
}

View File

@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
use crate::enums::PayoutStatus;
use crate::enums::{
AttemptStatus, Country, CountryAlpha2, CountryAlpha3, DisputeStatus, EventType, IntentStatus,
MandateStatus, PaymentMethod, PaymentMethodType, RefundStatus,
MandateStatus, PaymentMethod, PaymentMethodType, RefundStatus, SubscriptionStatus,
};
impl Display for NumericCountryCodeParseError {
@ -2214,6 +2214,15 @@ impl From<MandateStatus> for Option<EventType> {
}
}
impl From<SubscriptionStatus> for Option<EventType> {
fn from(value: SubscriptionStatus) -> Self {
match value {
SubscriptionStatus::Active => Some(EventType::InvoicePaid),
_ => None,
}
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used)]

View File

@ -61,28 +61,6 @@ pub enum RoutingAlgorithmKind {
ThreeDsDecisionRule,
}
#[derive(
Clone,
Copy,
Debug,
Eq,
PartialEq,
serde::Deserialize,
serde::Serialize,
strum::Display,
strum::EnumString,
)]
#[diesel_enum(storage_type = "db_enum")]
#[serde(rename_all = "snake_case")]
#[strum(serialize_all = "snake_case")]
pub enum EventObjectType {
PaymentDetails,
RefundDetails,
DisputeDetails,
MandateDetails,
PayoutDetails,
}
// Refund
#[derive(
Clone,

View File

@ -102,6 +102,11 @@ pub enum EventMetadata {
payment_method_id: String,
mandate_id: String,
},
Subscription {
subscription_id: common_utils::id_type::SubscriptionId,
invoice_id: Option<common_utils::id_type::InvoiceId>,
payment_id: Option<common_utils::id_type::PaymentId>,
},
}
common_utils::impl_to_sql_from_sql_json!(EventMetadata);

View File

@ -38,7 +38,7 @@ use api_models::payment_methods::CountryCodeWithName;
use common_enums::PayoutStatus;
use common_enums::{
CountryAlpha2, DisputeStatus, EventClass, EventType, IntentStatus, MandateStatus,
MerchantCategoryCode, MerchantCategoryCodeWithName, RefundStatus,
MerchantCategoryCode, MerchantCategoryCodeWithName, RefundStatus, SubscriptionStatus,
};
use strum::IntoEnumIterator;
@ -515,5 +515,11 @@ pub fn get_valid_webhook_status(key: &str) -> JsResult {
.collect();
Ok(serde_wasm_bindgen::to_value(&statuses)?)
}
EventClass::Subscriptions => {
let statuses: Vec<SubscriptionStatus> = SubscriptionStatus::iter()
.filter(|status| Into::<Option<EventType>>::into(*status).is_some())
.collect();
Ok(serde_wasm_bindgen::to_value(&statuses)?)
}
}
}

View File

@ -36,7 +36,7 @@ pub enum SubscriptionStatus {
Created,
}
impl From<SubscriptionStatus> for api_models::subscription::SubscriptionStatus {
impl From<SubscriptionStatus> for common_enums::SubscriptionStatus {
fn from(status: SubscriptionStatus) -> Self {
match status {
SubscriptionStatus::Pending => Self::Pending,

View File

@ -904,6 +904,7 @@ Never share your secret api keys. Keep them guarded and secure.
api_models::subscription::EstimateSubscriptionResponse,
api_models::subscription::GetPlansQuery,
api_models::subscription::EstimateSubscriptionQuery,
api_models::subscription::ConfirmSubscriptionResponse,
api_models::subscription::ConfirmSubscriptionPaymentDetails,
api_models::subscription::PaymentDetails,
api_models::subscription::CreateSubscriptionPaymentDetails,
@ -911,7 +912,7 @@ Never share your secret api keys. Keep them guarded and secure.
api_models::subscription::SubscriptionPlanPrices,
api_models::subscription::PaymentResponseData,
api_models::subscription::Invoice,
api_models::subscription::SubscriptionStatus,
api_models::enums::SubscriptionStatus,
api_models::subscription::PeriodUnit,
)),
modifiers(&SecurityAddon)

View File

@ -90,6 +90,7 @@ pub enum StripeWebhookObject {
Mandate(StripeMandateResponse),
#[cfg(feature = "payouts")]
Payout(StripePayoutResponse),
Subscriptions,
}
#[derive(Serialize, Debug)]
@ -302,6 +303,7 @@ fn get_stripe_event_type(event_type: api_models::enums::EventType) -> &'static s
api_models::enums::EventType::PayoutProcessing => "payout.created",
api_models::enums::EventType::PayoutExpired => "payout.failed",
api_models::enums::EventType::PayoutReversed => "payout.reconciliation_completed",
api_models::enums::EventType::InvoicePaid => "invoice.paid",
}
}
@ -344,6 +346,9 @@ impl From<api::OutgoingWebhookContent> for StripeWebhookObject {
}
#[cfg(feature = "payouts")]
api::OutgoingWebhookContent::PayoutDetails(payout) => Self::Payout((*payout).into()),
api_models::webhooks::OutgoingWebhookContent::SubscriptionDetails(_) => {
Self::Subscriptions
}
}
}
}

View File

@ -1458,7 +1458,7 @@ impl RevenueRecoveryOutgoingWebhook {
event_status,
event_class,
payment_attempt_id,
enums::EventObjectType::PaymentDetails,
common_enums::EventObjectType::PaymentDetails,
outgoing_webhook_content,
payment_intent.created_at,
)

View File

@ -1026,6 +1026,13 @@ impl ForeignFrom<&api::OutgoingWebhookContent> for storage::EventMetadata {
webhooks::OutgoingWebhookContent::PayoutDetails(payout_response) => Self::Payout {
payout_id: payout_response.payout_id.clone(),
},
webhooks::OutgoingWebhookContent::SubscriptionDetails(subscription) => {
Self::Subscription {
subscription_id: subscription.id.clone(),
invoice_id: subscription.get_optional_invoice_id(),
payment_id: subscription.get_optional_payment_id(),
}
}
}
}
}
@ -1070,5 +1077,15 @@ fn get_outgoing_webhook_event_content_from_event_metadata(
mandate_id,
content: serde_json::Value::Null,
},
diesel_models::EventMetadata::Subscription {
subscription_id,
invoice_id,
payment_id,
} => OutgoingWebhookEventContent::Subscription {
subscription_id,
invoice_id,
payment_id,
content: serde_json::Value::Null,
},
})
}

View File

@ -659,6 +659,16 @@ impl ForeignFrom<storage::EventMetadata> for outgoing_webhook_logs::OutgoingWebh
mandate_id,
content: serde_json::Value::Null,
},
diesel_models::EventMetadata::Subscription {
subscription_id,
invoice_id,
payment_id,
} => Self::Subscription {
subscription_id,
invoice_id,
payment_id,
content: serde_json::Value::Null,
},
}
}
}

View File

@ -72,6 +72,12 @@ pub enum OutgoingWebhookEventContent {
mandate_id: String,
content: Value,
},
Subscription {
subscription_id: common_utils::id_type::SubscriptionId,
invoice_id: Option<common_utils::id_type::InvoiceId>,
payment_id: Option<common_utils::id_type::PaymentId>,
content: Value,
},
}
pub trait OutgoingWebhookEventMetric {
fn get_outgoing_webhook_event_content(&self) -> Option<OutgoingWebhookEventContent>;
@ -111,6 +117,15 @@ impl OutgoingWebhookEventMetric for OutgoingWebhookContent {
content: masking::masked_serialize(&payout_payload)
.unwrap_or(serde_json::json!({"error":"failed to serialize"})),
}),
Self::SubscriptionDetails(subscription) => {
Some(OutgoingWebhookEventContent::Subscription {
subscription_id: subscription.id.clone(),
invoice_id: subscription.get_optional_invoice_id(),
payment_id: subscription.get_optional_payment_id(),
content: masking::masked_serialize(&subscription)
.unwrap_or(serde_json::json!({"error":"failed to serialize"})),
})
}
}
}
}

View File

@ -17,7 +17,7 @@ use std::fmt::Debug;
use api_models::{
enums,
payments::{self},
webhooks,
subscription as subscription_types, webhooks,
};
use common_utils::types::keymanager::KeyManagerState;
pub use common_utils::{
@ -42,7 +42,7 @@ use nanoid::nanoid;
use serde::de::DeserializeOwned;
use serde_json::Value;
#[cfg(feature = "v1")]
use subscriptions::subscription_handler::SubscriptionHandler;
use subscriptions::{subscription_handler::SubscriptionHandler, workflows::InvoiceSyncHandler};
use tracing_futures::Instrument;
pub use self::ext_traits::{OptionExt, ValidateCall};
@ -1225,7 +1225,7 @@ where
event_type,
diesel_models::enums::EventClass::Payments,
payment_id.get_string_repr().to_owned(),
diesel_models::enums::EventObjectType::PaymentDetails,
common_enums::EventObjectType::PaymentDetails,
webhooks::OutgoingWebhookContent::PaymentDetails(Box::new(
payments_response_json,
)),
@ -1301,7 +1301,7 @@ pub async fn trigger_refund_outgoing_webhook(
outgoing_event_type,
diesel_models::enums::EventClass::Refunds,
refund_id.to_string(),
diesel_models::enums::EventObjectType::RefundDetails,
common_enums::EventObjectType::RefundDetails,
webhooks::OutgoingWebhookContent::RefundDetails(Box::new(refund_response)),
primary_object_created_at,
))
@ -1380,7 +1380,7 @@ pub async fn trigger_payouts_webhook(
event_type,
diesel_models::enums::EventClass::Payouts,
cloned_response.payout_id.get_string_repr().to_owned(),
diesel_models::enums::EventObjectType::PayoutDetails,
common_enums::EventObjectType::PayoutDetails,
webhooks::OutgoingWebhookContent::PayoutDetails(Box::new(cloned_response)),
primary_object_created_at,
))
@ -1403,3 +1403,47 @@ pub async fn trigger_payouts_webhook(
) -> RouterResult<()> {
todo!()
}
#[cfg(feature = "v1")]
pub async fn trigger_subscriptions_outgoing_webhook(
state: &SessionState,
payment_response: subscription_types::PaymentResponseData,
invoice: &hyperswitch_domain_models::invoice::Invoice,
subscription: &hyperswitch_domain_models::subscription::Subscription,
merchant_account: &domain::MerchantAccount,
key_store: &domain::MerchantKeyStore,
profile: &domain::Profile,
) -> RouterResult<()> {
if invoice.status != common_enums::enums::InvoiceStatus::InvoicePaid {
logger::info!("Invoice not paid, skipping outgoing webhook trigger");
return Ok(());
}
let response = InvoiceSyncHandler::generate_response(subscription, invoice, &payment_response)
.attach_printable("Subscriptions: Failed to generate response for outgoing webhook")?;
let merchant_context = domain::merchant_context::MerchantContext::NormalMerchant(Box::new(
domain::merchant_context::Context(merchant_account.clone(), key_store.clone()),
));
let cloned_state = state.clone();
let cloned_profile = profile.clone();
let invoice_id = invoice.id.get_string_repr().to_owned();
let created_at = subscription.created_at;
tokio::spawn(async move {
Box::pin(webhooks_core::create_event_and_trigger_outgoing_webhook(
cloned_state,
merchant_context,
cloned_profile,
common_enums::enums::EventType::InvoicePaid,
common_enums::enums::EventClass::Subscriptions,
invoice_id,
common_enums::EventObjectType::SubscriptionDetails,
webhooks::OutgoingWebhookContent::SubscriptionDetails(Box::new(response)),
Some(created_at),
))
.await
});
Ok(())
}

View File

@ -1,4 +1,5 @@
use async_trait::async_trait;
use common_enums::connector_enums::InvoiceStatus;
use common_utils::{errors::CustomResult, ext_traits::ValueExt};
use router_env::logger;
use scheduler::{
@ -6,7 +7,7 @@ use scheduler::{
errors,
};
use crate::{routes::SessionState, types::storage};
use crate::{routes::SessionState, types::storage, utils};
const INVOICE_SYNC_WORKFLOW: &str = "INVOICE_SYNC";
@ -29,12 +30,37 @@ impl ProcessTrackerWorkflow<SessionState> for InvoiceSyncWorkflow {
let subscription_state = state.clone().into();
match process.name.as_deref() {
Some(INVOICE_SYNC_WORKFLOW) => {
Box::pin(subscriptions::workflows::perform_subscription_invoice_sync(
&subscription_state,
process,
tracking_data,
))
.await
let (handler, payments_response) =
Box::pin(subscriptions::workflows::perform_subscription_invoice_sync(
&subscription_state,
process,
tracking_data,
))
.await?;
if handler.invoice.status == InvoiceStatus::InvoicePaid
|| handler.invoice.status == InvoiceStatus::PaymentSucceeded
|| handler.invoice.status == InvoiceStatus::PaymentFailed
{
let _ = utils::trigger_subscriptions_outgoing_webhook(
state,
payments_response,
&handler.invoice,
&handler.subscription,
&handler.merchant_account,
&handler.key_store,
&handler.profile,
)
.await
.map_err(|e| {
logger::error!("Failed to trigger subscriptions outgoing webhook: {e:?}");
errors::ProcessTrackerError::FlowExecutionError {
flow: "Trigger Subscriptions Outgoing Webhook",
}
})?;
}
Ok(())
}
_ => Err(errors::ProcessTrackerError::JobNotFound),
}

View File

@ -19,6 +19,8 @@ use scheduler::{
types::process_data,
utils as scheduler_utils,
};
#[cfg(feature = "v1")]
use subscriptions::workflows::invoice_sync;
#[cfg(feature = "payouts")]
use crate::core::payouts;
@ -383,6 +385,7 @@ async fn get_outgoing_webhook_content_and_event_type(
merchant_account.clone(),
key_store.clone(),
)));
match tracking_data.event_class {
diesel_models::enums::EventClass::Payments => {
let payment_id = tracking_data.primary_object_id.clone();
@ -573,5 +576,30 @@ async fn get_outgoing_webhook_content_and_event_type(
event_type,
))
}
diesel_models::enums::EventClass::Subscriptions => {
let invoice_id = tracking_data.primary_object_id.clone();
let profile_id = &tracking_data.business_profile_id;
let response = Box::pin(
invoice_sync::InvoiceSyncHandler::form_response_for_retry_outgoing_webhook_task(
state.clone().into(),
&key_store,
invoice_id,
profile_id,
&merchant_account,
),
)
.await
.inspect_err(|e| {
logger::error!(
"Failed to generate response for subscription outgoing webhook: {e:?}"
);
})?;
Ok((
OutgoingWebhookContent::SubscriptionDetails(Box::new(response)),
Some(EventType::InvoicePaid),
))
}
}
}

View File

@ -1,6 +1,4 @@
use api_models::subscription::{
self as subscription_types, SubscriptionResponse, SubscriptionStatus,
};
use api_models::subscription::{self as subscription_types, SubscriptionResponse};
use common_enums::connector_enums;
use common_utils::id_type::GenerateId;
use error_stack::ResultExt;
@ -287,7 +285,10 @@ pub async fn create_and_confirm_subscription(
.to_string(),
),
payment_response.payment_method_id.clone(),
Some(SubscriptionStatus::from(subscription_create_response.status).to_string()),
Some(
common_enums::SubscriptionStatus::from(subscription_create_response.status)
.to_string(),
),
request.plan_id,
Some(request.item_price_id),
),
@ -363,7 +364,7 @@ pub async fn confirm_subscription(
&state,
customer.clone(),
subscription.customer_id.clone(),
request.get_billing_address(),
payment_response.get_billing_address(),
request
.payment_details
.payment_method_data
@ -386,7 +387,7 @@ pub async fn confirm_subscription(
&state,
subscription.clone(),
subscription.item_price_id.clone(),
request.get_billing_address(),
payment_response.get_billing_address(),
)
.await?;
@ -423,7 +424,10 @@ pub async fn confirm_subscription(
.to_string(),
),
payment_response.payment_method_id.clone(),
Some(SubscriptionStatus::from(subscription_create_response.status).to_string()),
Some(
common_enums::SubscriptionStatus::from(subscription_create_response.status)
.to_string(),
),
subscription.plan_id.clone(),
subscription.item_price_id.clone(),
),

View File

@ -295,7 +295,7 @@ impl SubscriptionWithHandler<'_> {
Ok(subscription_types::ConfirmSubscriptionResponse {
id: self.subscription.id.clone(),
merchant_reference_id: self.subscription.merchant_reference_id.clone(),
status: subscription_types::SubscriptionStatus::from(status),
status: common_enums::SubscriptionStatus::from(status),
plan_id: self.subscription.plan_id.clone(),
profile_id: self.subscription.profile_id.to_owned(),
payment: Some(payment_response.clone()),
@ -315,8 +315,8 @@ impl SubscriptionWithHandler<'_> {
Ok(SubscriptionResponse::new(
self.subscription.id.clone(),
self.subscription.merchant_reference_id.clone(),
subscription_types::SubscriptionStatus::from_str(&self.subscription.status)
.unwrap_or(subscription_types::SubscriptionStatus::Created),
common_enums::SubscriptionStatus::from_str(&self.subscription.status)
.unwrap_or(common_enums::SubscriptionStatus::Created),
self.subscription.plan_id.clone(),
self.subscription.item_price_id.clone(),
self.subscription.profile_id.to_owned(),
@ -455,6 +455,10 @@ impl ForeignTryFrom<&hyperswitch_domain_models::invoice::Invoice> for subscripti
currency = invoice.currency
))?,
status: invoice.status.clone(),
billing_processor_invoice_id: invoice
.connector_invoice_id
.as_ref()
.map(|id| id.get_string_repr().to_string()),
})
}
}

View File

@ -12,6 +12,7 @@ use storage_impl::{errors, kv_router_store::KVRouterStore, DatabaseStore, MockDb
pub trait SubscriptionStorageInterface:
Send
+ Sync
+ std::any::Any
+ dyn_clone::DynClone
+ master_key::MasterKeyInterface
+ scheduler::SchedulerInterface

View File

@ -1,8 +1,10 @@
use std::str::FromStr;
#[cfg(feature = "v1")]
use api_models::subscription as subscription_types;
use common_utils::{errors::CustomResult, ext_traits::StringExt};
use error_stack::ResultExt;
use hyperswitch_domain_models::invoice::InvoiceUpdateRequest;
use hyperswitch_domain_models::{self as domain, invoice::InvoiceUpdateRequest};
use router_env::logger;
use scheduler::{
errors,
@ -17,6 +19,7 @@ use crate::{
billing_processor_handler as billing, errors as router_errors, invoice_handler,
payments_api_client,
},
helpers::ForeignTryFrom,
state::{SubscriptionState as SessionState, SubscriptionStorageInterface as StorageInterface},
types::storage,
};
@ -132,24 +135,21 @@ impl<'a> InvoiceSyncHandler<'a> {
}
pub async fn perform_payments_sync(
&self,
state: &SessionState,
payment_intent_id: Option<&common_utils::id_type::PaymentId>,
profile_id: &common_utils::id_type::ProfileId,
merchant_id: &common_utils::id_type::MerchantId,
) -> CustomResult<subscription_types::PaymentResponseData, router_errors::ApiErrorResponse>
{
logger::info!(
"perform_payments_sync called for invoice_id: {:?} and payment_id: {:?}",
self.invoice.id,
self.invoice.payment_intent_id
);
let payment_id = self.invoice.payment_intent_id.clone().ok_or(
router_errors::ApiErrorResponse::SubscriptionError {
let payment_id =
payment_intent_id.ok_or(router_errors::ApiErrorResponse::SubscriptionError {
operation: "Invoice_sync: Missing Payment Intent ID in Invoice".to_string(),
},
)?;
})?;
let payments_response = payments_api_client::PaymentsApiClient::sync_payment(
self.state,
state,
payment_id.get_string_repr().to_string(),
self.merchant_account.get_id().get_string_repr(),
self.profile.get_id().get_string_repr(),
merchant_id.get_string_repr(),
profile_id.get_string_repr(),
)
.await
.change_context(router_errors::ApiErrorResponse::SubscriptionError {
@ -157,17 +157,101 @@ impl<'a> InvoiceSyncHandler<'a> {
.to_string(),
})
.attach_printable("Failed to sync payment status from payments microservice")?;
Ok(payments_response)
}
pub fn generate_response(
subscription: &hyperswitch_domain_models::subscription::Subscription,
invoice: &hyperswitch_domain_models::invoice::Invoice,
payment_response: &subscription_types::PaymentResponseData,
) -> CustomResult<
subscription_types::ConfirmSubscriptionResponse,
router_errors::ApiErrorResponse,
> {
subscription_types::ConfirmSubscriptionResponse::foreign_try_from((
subscription,
invoice,
payment_response,
))
}
pub async fn form_response_for_retry_outgoing_webhook_task(
state: SessionState,
key_store: &domain::merchant_key_store::MerchantKeyStore,
invoice_id: String,
profile_id: &common_utils::id_type::ProfileId,
merchant_account: &domain::merchant_account::MerchantAccount,
) -> Result<subscription_types::ConfirmSubscriptionResponse, errors::ProcessTrackerError> {
let key_manager_state = &(&state).into();
let invoice = state
.store
.find_invoice_by_invoice_id(key_manager_state, key_store, invoice_id.clone())
.await
.map_err(|err| {
logger::error!(
?err,
"invoices: unable to get latest invoice with id {invoice_id} from database"
);
errors::ProcessTrackerError::ResourceFetchingFailed {
resource_name: "Invoice".to_string(),
}
})?;
let subscription = state
.store
.find_by_merchant_id_subscription_id(
key_manager_state,
key_store,
merchant_account.get_id(),
invoice.subscription_id.get_string_repr().to_string(),
)
.await
.map_err(|err| {
logger::error!(
?err,
"subscription: unable to get subscription from database"
);
errors::ProcessTrackerError::ResourceFetchingFailed {
resource_name: "Subscription".to_string(),
}
})?;
let payments_response = InvoiceSyncHandler::perform_payments_sync(
&state,
invoice.payment_intent_id.as_ref(),
profile_id,
merchant_account.get_id(),
)
.await
.map_err(|err| {
logger::error!(
?err,
"subscription: unable to make PSync Call to payments microservice"
);
errors::ProcessTrackerError::EApiErrorResponse
})?;
let response = Self::generate_response(&subscription, &invoice, &payments_response)
.map_err(|err| {
logger::error!(
?err,
"subscription: unable to form ConfirmSubscriptionResponse from foreign types"
);
errors::ProcessTrackerError::DeserializationFailed
})?;
Ok(response)
}
pub async fn perform_billing_processor_record_back_if_possible(
&self,
payment_response: subscription_types::PaymentResponseData,
payment_status: common_enums::AttemptStatus,
connector_invoice_id: Option<common_utils::id_type::InvoiceId>,
invoice_sync_status: storage::invoice_sync::InvoiceSyncPaymentStatus,
) -> CustomResult<(), router_errors::ApiErrorResponse> {
) -> CustomResult<hyperswitch_domain_models::invoice::Invoice, router_errors::ApiErrorResponse>
{
if let Some(connector_invoice_id) = connector_invoice_id {
Box::pin(self.perform_billing_processor_record_back(
payment_response,
@ -176,9 +260,10 @@ impl<'a> InvoiceSyncHandler<'a> {
invoice_sync_status,
))
.await
.attach_printable("Failed to record back to billing processor")?;
.attach_printable("Failed to record back to billing processor")
} else {
Ok(self.invoice.clone())
}
Ok(())
}
pub async fn perform_billing_processor_record_back(
@ -187,7 +272,8 @@ impl<'a> InvoiceSyncHandler<'a> {
payment_status: common_enums::AttemptStatus,
connector_invoice_id: common_utils::id_type::InvoiceId,
invoice_sync_status: storage::invoice_sync::InvoiceSyncPaymentStatus,
) -> CustomResult<(), router_errors::ApiErrorResponse> {
) -> CustomResult<hyperswitch_domain_models::invoice::Invoice, router_errors::ApiErrorResponse>
{
logger::info!("perform_billing_processor_record_back");
let billing_handler = billing::BillingHandler::create(
@ -228,9 +314,7 @@ impl<'a> InvoiceSyncHandler<'a> {
invoice_handler
.update_invoice(self.state, self.invoice.id.to_owned(), update_request)
.await
.attach_printable("Failed to update invoice in DB")?;
Ok(())
.attach_printable("Failed to update invoice in DB")
}
pub async fn transition_workflow_state(
@ -238,7 +322,8 @@ impl<'a> InvoiceSyncHandler<'a> {
process: ProcessTracker,
payment_response: subscription_types::PaymentResponseData,
connector_invoice_id: Option<common_utils::id_type::InvoiceId>,
) -> CustomResult<(), router_errors::ApiErrorResponse> {
) -> CustomResult<hyperswitch_domain_models::invoice::Invoice, router_errors::ApiErrorResponse>
{
logger::info!(
"transition_workflow_state called with status: {:?}",
payment_response.status
@ -256,7 +341,7 @@ impl<'a> InvoiceSyncHandler<'a> {
)?,
};
logger::info!("Performing billing processor record back for status: {status}");
Box::pin(self.perform_billing_processor_record_back_if_possible(
let invoice = Box::pin(self.perform_billing_processor_record_back_if_possible(
payment_response.clone(),
payment_status,
connector_invoice_id,
@ -272,7 +357,8 @@ impl<'a> InvoiceSyncHandler<'a> {
.change_context(router_errors::ApiErrorResponse::SubscriptionError {
operation: "Invoice_sync process_tracker task completion".to_string(),
})
.attach_printable("Failed to update process tracker status")
.attach_printable("Failed to update process tracker status")?;
Ok(invoice)
}
}
@ -281,33 +367,49 @@ pub async fn perform_subscription_invoice_sync(
state: &SessionState,
process: ProcessTracker,
tracking_data: storage::invoice_sync::InvoiceSyncTrackingData,
) -> Result<(), errors::ProcessTrackerError> {
let handler = InvoiceSyncHandler::create(state, tracking_data).await?;
) -> Result<
(
InvoiceSyncHandler<'_>,
subscription_types::PaymentResponseData,
),
errors::ProcessTrackerError,
> {
let mut handler = InvoiceSyncHandler::create(state, tracking_data).await?;
let payment_status = handler.perform_payments_sync().await?;
let payments_response = InvoiceSyncHandler::perform_payments_sync(
handler.state,
handler.invoice.payment_intent_id.as_ref(),
handler.profile.get_id(),
handler.merchant_account.get_id(),
)
.await?;
if let Err(e) = Box::pin(handler.transition_workflow_state(
match Box::pin(handler.transition_workflow_state(
process.clone(),
payment_status,
payments_response.clone(),
handler.tracking_data.connector_invoice_id.clone(),
))
.await
{
logger::error!(?e, "Error in transitioning workflow state");
retry_subscription_invoice_sync_task(
&*handler.state.store,
handler.tracking_data.connector_name.to_string().clone(),
handler.merchant_account.get_id().to_owned(),
process,
)
.await
.change_context(router_errors::ApiErrorResponse::SubscriptionError {
operation: "Invoice_sync process_tracker task retry".to_string(),
})
.attach_printable("Failed to update process tracker status")?;
};
Ok(())
Err(e) => {
logger::error!(?e, "Error in transitioning workflow state");
retry_subscription_invoice_sync_task(
&*handler.state.store,
handler.tracking_data.connector_name.to_string().clone(),
handler.merchant_account.get_id().to_owned(),
process,
)
.await
.change_context(router_errors::ApiErrorResponse::SubscriptionError {
operation: "Invoice_sync process_tracker task retry".to_string(),
})
.attach_printable("Failed to update process tracker status")?;
}
Ok(invoice) => {
handler.invoice = invoice.clone();
}
}
Ok((handler, payments_response))
}
pub async fn create_invoice_sync_job(
@ -403,3 +505,42 @@ pub async fn retry_subscription_invoice_sync_task(
Ok(())
}
impl
ForeignTryFrom<(
&domain::subscription::Subscription,
&domain::invoice::Invoice,
&subscription_types::PaymentResponseData,
)> for subscription_types::ConfirmSubscriptionResponse
{
type Error = error_stack::Report<router_errors::ApiErrorResponse>;
fn foreign_try_from(
value: (
&domain::subscription::Subscription,
&domain::invoice::Invoice,
&subscription_types::PaymentResponseData,
),
) -> Result<Self, Self::Error> {
let (subscription, invoice, payment_response) = value;
let status = common_enums::SubscriptionStatus::from_str(subscription.status.as_str())
.map_err(|_| router_errors::ApiErrorResponse::SubscriptionError {
operation: "Failed to parse subscription status".to_string(),
})
.attach_printable("Failed to parse subscription status")?;
Ok(Self {
id: subscription.id.clone(),
merchant_reference_id: subscription.merchant_reference_id.clone(),
status,
plan_id: subscription.plan_id.clone(),
profile_id: subscription.profile_id.to_owned(),
payment: Some(payment_response.clone()),
customer_id: Some(subscription.customer_id.clone()),
item_price_id: subscription.item_price_id.clone(),
coupon: None,
billing_processor_subscription_id: subscription.connector_subscription_id.clone(),
invoice: Some(subscription_types::Invoice::foreign_try_from(invoice)?),
})
}
}

View File

@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
SELECT(1);

View File

@ -0,0 +1,9 @@
-- Your SQL goes here
ALTER TYPE "EventType"
ADD VALUE IF NOT EXISTS 'invoice_paid';
ALTER TYPE "EventObjectType"
ADD VALUE IF NOT EXISTS 'subscription_details';
ALTER TYPE "EventClass"
ADD VALUE IF NOT EXISTS 'subscriptions';

View File

@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
CREATE INDEX IF NOT EXISTS invoice_subscription_id_connector_invoice_id_index ON invoice (subscription_id, connector_invoice_id);

View File

@ -0,0 +1,2 @@
-- Your SQL goes here
DROP INDEX IF EXISTS invoice_subscription_id_connector_invoice_id_index;