feat(subscriptions): Add Subscription confirm handler (#9353)

Co-authored-by: Prajjwal kumar <write2prajjwal@gmail.com>
Co-authored-by: Prajjwal Kumar <prajjwal.kumar@juspay.in>
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: Jagan Elavarasan <jaganelavarasan@gmail.com>
This commit is contained in:
Sarthak Soni
2025-09-25 19:17:51 +05:30
committed by GitHub
parent 84f3013c88
commit f02d18038c
20 changed files with 1007 additions and 63 deletions

View File

@ -285,6 +285,8 @@ pub enum StripeErrorCode {
PlatformUnauthorizedRequest,
#[error(error_type = StripeErrorType::HyperswitchError, code = "", message = "Profile Acquirer not found")]
ProfileAcquirerNotFound,
#[error(error_type = StripeErrorType::HyperswitchError, code = "Subscription Error", message = "Subscription operation: {operation} failed with connector")]
SubscriptionError { operation: String },
// [#216]: https://github.com/juspay/hyperswitch/issues/216
// Implement the remaining stripe error codes
@ -697,6 +699,9 @@ impl From<errors::ApiErrorResponse> for StripeErrorCode {
object: "tokenization record".to_owned(),
id,
},
errors::ApiErrorResponse::SubscriptionError { operation } => {
Self::SubscriptionError { operation }
}
}
}
}
@ -781,7 +786,8 @@ impl actix_web::ResponseError for StripeErrorCode {
| Self::WebhookProcessingError
| Self::InvalidTenant
| Self::ExternalVaultFailed
| Self::AmountConversionFailed { .. } => StatusCode::INTERNAL_SERVER_ERROR,
| Self::AmountConversionFailed { .. }
| Self::SubscriptionError { .. } => StatusCode::INTERNAL_SERVER_ERROR,
Self::ReturnUrlUnavailable => StatusCode::SERVICE_UNAVAILABLE,
Self::ExternalConnectorError { status_code, .. } => {
StatusCode::from_u16(*status_code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)

View File

@ -1,16 +1,31 @@
use std::str::FromStr;
use api_models::subscription::{
self as subscription_types, CreateSubscriptionResponse, SubscriptionStatus,
use api_models::{
enums as api_enums,
subscription::{self as subscription_types, CreateSubscriptionResponse, SubscriptionStatus},
};
use common_utils::id_type::GenerateId;
use common_utils::{ext_traits::ValueExt, id_type::GenerateId, pii};
use diesel_models::subscription::SubscriptionNew;
use error_stack::ResultExt;
use hyperswitch_domain_models::{api::ApplicationResponse, merchant_context::MerchantContext};
use hyperswitch_domain_models::{
api::ApplicationResponse,
merchant_context::MerchantContext,
router_data_v2::flow_common_types::{SubscriptionCreateData, SubscriptionCustomerData},
router_request_types::{subscriptions as subscription_request_types, ConnectorCustomerData},
router_response_types::{
subscriptions as subscription_response_types, ConnectorCustomerResponseData,
PaymentsResponseData,
},
};
use masking::Secret;
use super::errors::{self, RouterResponse};
use crate::routes::SessionState;
use crate::{
core::payments as payments_core, routes::SessionState, services, types::api as api_types,
};
pub const SUBSCRIPTION_CONNECTOR_ID: &str = "DefaultSubscriptionConnectorId";
pub const SUBSCRIPTION_PAYMENT_ID: &str = "DefaultSubscriptionPaymentId";
pub async fn create_subscription(
state: SessionState,
@ -63,3 +78,544 @@ pub async fn create_subscription(
Ok(ApplicationResponse::Json(response))
}
pub async fn confirm_subscription(
state: SessionState,
merchant_context: MerchantContext,
profile_id: String,
request: subscription_types::ConfirmSubscriptionRequest,
subscription_id: common_utils::id_type::SubscriptionId,
) -> RouterResponse<subscription_types::ConfirmSubscriptionResponse> {
let profile_id = common_utils::id_type::ProfileId::from_str(&profile_id).change_context(
errors::ApiErrorResponse::InvalidDataValue {
field_name: "X-Profile-Id",
},
)?;
let key_manager_state = &(&state).into();
let merchant_key_store = merchant_context.get_merchant_key_store();
let profile = state
.store
.find_business_profile_by_profile_id(key_manager_state, merchant_key_store, &profile_id)
.await
.change_context(errors::ApiErrorResponse::ProfileNotFound {
id: profile_id.get_string_repr().to_string(),
})?;
let customer = state
.store
.find_customer_by_customer_id_merchant_id(
key_manager_state,
&request.customer_id,
merchant_context.get_merchant_account().get_id(),
merchant_key_store,
merchant_context.get_merchant_account().storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::CustomerNotFound)
.attach_printable("subscriptions: unable to fetch customer from database")?;
let handler = SubscriptionHandler::new(state, merchant_context, request, profile);
let mut subscription_entry = handler
.find_subscription(subscription_id.get_string_repr().to_string())
.await?;
let billing_handler = subscription_entry.get_billing_handler(customer).await?;
let invoice_handler = subscription_entry.get_invoice_handler().await?;
let _customer_create_response = billing_handler
.create_customer_on_connector(&handler.state)
.await?;
let subscription_create_response = billing_handler
.create_subscription_on_connector(&handler.state)
.await?;
// let payment_response = invoice_handler.create_cit_payment().await?;
let invoice_entry = invoice_handler
.create_invoice_entry(
&handler.state,
subscription_entry.profile.get_billing_processor_id()?,
None,
billing_handler.request.amount,
billing_handler.request.currency.to_string(),
common_enums::connector_enums::InvoiceStatus::InvoiceCreated,
billing_handler.connector_data.connector_name,
None,
)
.await?;
// invoice_entry
// .create_invoice_record_back_job(&payment_response)
// .await?;
subscription_entry
.update_subscription_status(
SubscriptionStatus::from(subscription_create_response.status).to_string(),
)
.await?;
let response = subscription_entry
.generate_response(&invoice_entry, subscription_create_response.status)?;
Ok(ApplicationResponse::Json(response))
}
pub struct SubscriptionHandler {
state: SessionState,
merchant_context: MerchantContext,
request: subscription_types::ConfirmSubscriptionRequest,
profile: hyperswitch_domain_models::business_profile::Profile,
}
impl SubscriptionHandler {
pub fn new(
state: SessionState,
merchant_context: MerchantContext,
request: subscription_types::ConfirmSubscriptionRequest,
profile: hyperswitch_domain_models::business_profile::Profile,
) -> Self {
Self {
state,
merchant_context,
request,
profile,
}
}
pub async fn find_subscription(
&self,
subscription_id: String,
) -> errors::RouterResult<SubscriptionWithHandler<'_>> {
let subscription = self
.state
.store
.find_by_merchant_id_subscription_id(
self.merchant_context.get_merchant_account().get_id(),
subscription_id.clone(),
)
.await
.change_context(errors::ApiErrorResponse::GenericNotFoundError {
message: format!("subscription not found for id: {subscription_id}"),
})?;
Ok(SubscriptionWithHandler {
handler: self,
subscription,
profile: self.profile.clone(),
})
}
}
pub struct SubscriptionWithHandler<'a> {
handler: &'a SubscriptionHandler,
subscription: diesel_models::subscription::Subscription,
profile: hyperswitch_domain_models::business_profile::Profile,
}
impl<'a> SubscriptionWithHandler<'a> {
fn generate_response(
&self,
invoice: &diesel_models::invoice::Invoice,
// _payment_response: &subscription_types::PaymentResponseData,
status: subscription_response_types::SubscriptionStatus,
) -> errors::RouterResult<subscription_types::ConfirmSubscriptionResponse> {
Ok(subscription_types::ConfirmSubscriptionResponse {
id: self.subscription.id.clone(),
merchant_reference_id: self.subscription.merchant_reference_id.clone(),
status: SubscriptionStatus::from(status),
plan_id: None,
profile_id: self.subscription.profile_id.to_owned(),
payment: None,
customer_id: Some(self.subscription.customer_id.clone()),
price_id: None,
coupon: None,
invoice: Some(subscription_types::Invoice {
id: invoice.id.clone(),
subscription_id: invoice.subscription_id.clone(),
merchant_id: invoice.merchant_id.clone(),
profile_id: invoice.profile_id.clone(),
merchant_connector_id: invoice.merchant_connector_id.clone(),
payment_intent_id: invoice.payment_intent_id.clone(),
payment_method_id: invoice.payment_method_id.clone(),
customer_id: invoice.customer_id.clone(),
amount: invoice.amount,
currency: api_enums::Currency::from_str(invoice.currency.as_str())
.change_context(errors::ApiErrorResponse::InvalidDataValue {
field_name: "currency",
})
.attach_printable(format!(
"unable to parse currency name {currency:?}",
currency = invoice.currency
))?,
status: invoice.status.clone(),
}),
})
}
async fn update_subscription_status(&mut self, status: String) -> errors::RouterResult<()> {
let db = self.handler.state.store.as_ref();
let updated_subscription = db
.update_subscription_entry(
self.handler
.merchant_context
.get_merchant_account()
.get_id(),
self.subscription.id.get_string_repr().to_string(),
diesel_models::subscription::SubscriptionUpdate::new(None, Some(status)),
)
.await
.change_context(errors::ApiErrorResponse::SubscriptionError {
operation: "Subscription Update".to_string(),
})
.attach_printable("subscriptions: unable to update subscription entry in database")?;
self.subscription = updated_subscription;
Ok(())
}
pub async fn get_billing_handler(
&self,
customer: hyperswitch_domain_models::customer::Customer,
) -> errors::RouterResult<BillingHandler> {
let mca_id = self.profile.get_billing_processor_id()?;
let billing_processor_mca = self
.handler
.state
.store
.find_by_merchant_connector_account_merchant_id_merchant_connector_id(
&(&self.handler.state).into(),
self.handler
.merchant_context
.get_merchant_account()
.get_id(),
&mca_id,
self.handler.merchant_context.get_merchant_key_store(),
)
.await
.change_context(errors::ApiErrorResponse::MerchantConnectorAccountNotFound {
id: mca_id.get_string_repr().to_string(),
})?;
let connector_name = billing_processor_mca.connector_name.clone();
let auth_type: hyperswitch_domain_models::router_data::ConnectorAuthType =
payments_core::helpers::MerchantConnectorAccountType::DbVal(Box::new(
billing_processor_mca.clone(),
))
.get_connector_account_details()
.parse_value("ConnectorAuthType")
.change_context(errors::ApiErrorResponse::InvalidDataFormat {
field_name: "connector_account_details".to_string(),
expected_format: "auth_type and api_key".to_string(),
})?;
let connector_data = api_types::ConnectorData::get_connector_by_name(
&self.handler.state.conf.connectors,
&connector_name,
api_types::GetToken::Connector,
Some(billing_processor_mca.get_id()),
)
.change_context(errors::ApiErrorResponse::IncorrectConnectorNameGiven)
.attach_printable(
"invalid connector name received in billing merchant connector account",
)?;
let connector_enum =
common_enums::connector_enums::Connector::from_str(connector_name.as_str())
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable(format!("unable to parse connector name {connector_name:?}"))?;
let connector_params =
hyperswitch_domain_models::connector_endpoints::Connectors::get_connector_params(
&self.handler.state.conf.connectors,
connector_enum,
)
.change_context(errors::ApiErrorResponse::ConfigNotFound)
.attach_printable(format!(
"cannot find connector params for this connector {connector_name} in this flow",
))?;
Ok(BillingHandler {
subscription: self.subscription.clone(),
auth_type,
connector_data,
connector_params,
request: self.handler.request.clone(),
connector_metadata: billing_processor_mca.metadata.clone(),
customer,
})
}
pub async fn get_invoice_handler(&self) -> errors::RouterResult<InvoiceHandler> {
Ok(InvoiceHandler {
subscription: self.subscription.clone(),
})
}
}
pub struct BillingHandler {
subscription: diesel_models::subscription::Subscription,
auth_type: hyperswitch_domain_models::router_data::ConnectorAuthType,
connector_data: api_types::ConnectorData,
connector_params: hyperswitch_domain_models::connector_endpoints::ConnectorParams,
connector_metadata: Option<pii::SecretSerdeValue>,
customer: hyperswitch_domain_models::customer::Customer,
request: subscription_types::ConfirmSubscriptionRequest,
}
pub struct InvoiceHandler {
subscription: diesel_models::subscription::Subscription,
}
#[allow(clippy::todo)]
impl InvoiceHandler {
#[allow(clippy::too_many_arguments)]
pub async fn create_invoice_entry(
self,
state: &SessionState,
merchant_connector_id: common_utils::id_type::MerchantConnectorAccountId,
payment_intent_id: Option<common_utils::id_type::PaymentId>,
amount: common_utils::types::MinorUnit,
currency: String,
status: common_enums::connector_enums::InvoiceStatus,
provider_name: common_enums::connector_enums::Connector,
metadata: Option<pii::SecretSerdeValue>,
) -> errors::RouterResult<diesel_models::invoice::Invoice> {
let invoice_new = diesel_models::invoice::InvoiceNew::new(
self.subscription.id.to_owned(),
self.subscription.merchant_id.to_owned(),
self.subscription.profile_id.to_owned(),
merchant_connector_id,
payment_intent_id,
self.subscription.payment_method_id.clone(),
self.subscription.customer_id.to_owned(),
amount,
currency,
status,
provider_name,
metadata,
);
let invoice = state
.store
.insert_invoice_entry(invoice_new)
.await
.change_context(errors::ApiErrorResponse::SubscriptionError {
operation: "Subscription Confirm".to_string(),
})
.attach_printable("invoices: unable to insert invoice entry to database")?;
Ok(invoice)
}
pub async fn create_cit_payment(
&self,
) -> errors::RouterResult<subscription_types::PaymentResponseData> {
// Create a CIT payment for the invoice
todo!("Create a CIT payment for the invoice")
}
pub async fn create_invoice_record_back_job(
&self,
// _invoice: &subscription_types::Invoice,
_payment_response: &subscription_types::PaymentResponseData,
) -> errors::RouterResult<()> {
// Create an invoice job entry based on payment status
todo!("Create an invoice job entry based on payment status")
}
}
#[allow(clippy::todo)]
impl BillingHandler {
pub async fn create_customer_on_connector(
&self,
state: &SessionState,
) -> errors::RouterResult<ConnectorCustomerResponseData> {
let customer_req = ConnectorCustomerData {
email: self.customer.email.clone().map(pii::Email::from),
payment_method_data: self
.request
.payment_details
.payment_method_data
.payment_method_data
.clone()
.map(|pmd| pmd.into()),
description: None,
phone: None,
name: None,
preprocessing_id: None,
split_payments: None,
setup_future_usage: None,
customer_acceptance: None,
customer_id: Some(self.subscription.customer_id.to_owned()),
billing_address: self
.request
.billing_address
.as_ref()
.and_then(|add| add.address.clone())
.and_then(|addr| addr.into()),
};
let router_data = self.build_router_data(
state,
customer_req,
SubscriptionCustomerData {
connector_meta_data: self.connector_metadata.clone(),
},
)?;
let connector_integration = self.connector_data.connector.get_connector_integration();
let response = Box::pin(self.call_connector(
state,
router_data,
"create customer on connector",
connector_integration,
))
.await?;
match response {
Ok(response_data) => match response_data {
PaymentsResponseData::ConnectorCustomerResponse(customer_response) => {
Ok(customer_response)
}
_ => Err(errors::ApiErrorResponse::SubscriptionError {
operation: "Subscription Customer Create".to_string(),
}
.into()),
},
Err(err) => Err(errors::ApiErrorResponse::ExternalConnectorError {
code: err.code,
message: err.message,
connector: self.connector_data.connector_name.to_string(),
status_code: err.status_code,
reason: err.reason,
}
.into()),
}
}
pub async fn create_subscription_on_connector(
&self,
state: &SessionState,
) -> errors::RouterResult<subscription_response_types::SubscriptionCreateResponse> {
let subscription_item = subscription_request_types::SubscriptionItem {
item_price_id: self.request.get_item_price_id().change_context(
errors::ApiErrorResponse::MissingRequiredField {
field_name: "item_price_id",
},
)?,
quantity: Some(1),
};
let subscription_req = subscription_request_types::SubscriptionCreateRequest {
subscription_id: self.subscription.id.to_owned(),
customer_id: self.subscription.customer_id.to_owned(),
subscription_items: vec![subscription_item],
billing_address: self.request.get_billing_address().change_context(
errors::ApiErrorResponse::MissingRequiredField {
field_name: "billing_address",
},
)?,
auto_collection: subscription_request_types::SubscriptionAutoCollection::Off,
connector_params: self.connector_params.clone(),
};
let router_data = self.build_router_data(
state,
subscription_req,
SubscriptionCreateData {
connector_meta_data: self.connector_metadata.clone(),
},
)?;
let connector_integration = self.connector_data.connector.get_connector_integration();
let response = self
.call_connector(
state,
router_data,
"create subscription on connector",
connector_integration,
)
.await?;
match response {
Ok(response_data) => Ok(response_data),
Err(err) => Err(errors::ApiErrorResponse::ExternalConnectorError {
code: err.code,
message: err.message,
connector: self.connector_data.connector_name.to_string(),
status_code: err.status_code,
reason: err.reason,
}
.into()),
}
}
async fn call_connector<F, ResourceCommonData, Req, Resp>(
&self,
state: &SessionState,
router_data: hyperswitch_domain_models::router_data_v2::RouterDataV2<
F,
ResourceCommonData,
Req,
Resp,
>,
operation_name: &str,
connector_integration: hyperswitch_interfaces::connector_integration_interface::BoxedConnectorIntegrationInterface<F, ResourceCommonData, Req, Resp>,
) -> errors::RouterResult<Result<Resp, hyperswitch_domain_models::router_data::ErrorResponse>>
where
F: Clone + std::fmt::Debug + 'static,
Req: Clone + std::fmt::Debug + 'static,
Resp: Clone + std::fmt::Debug + 'static,
ResourceCommonData:
hyperswitch_interfaces::connector_integration_interface::RouterDataConversion<
F,
Req,
Resp,
> + Clone
+ 'static,
{
let old_router_data = ResourceCommonData::to_old_router_data(router_data).change_context(
errors::ApiErrorResponse::SubscriptionError {
operation: { operation_name.to_string() },
},
)?;
let router_resp = services::execute_connector_processing_step(
state,
connector_integration,
&old_router_data,
payments_core::CallConnectorAction::Trigger,
None,
None,
)
.await
.change_context(errors::ApiErrorResponse::SubscriptionError {
operation: operation_name.to_string(),
})
.attach_printable(format!(
"Failed while in subscription operation: {operation_name}"
))?;
Ok(router_resp.response)
}
fn build_router_data<F, ResourceCommonData, Req, Resp>(
&self,
state: &SessionState,
req: Req,
resource_common_data: ResourceCommonData,
) -> errors::RouterResult<
hyperswitch_domain_models::router_data_v2::RouterDataV2<F, ResourceCommonData, Req, Resp>,
> {
Ok(hyperswitch_domain_models::router_data_v2::RouterDataV2 {
flow: std::marker::PhantomData,
connector_auth_type: self.auth_type.clone(),
resource_common_data,
tenant_id: state.tenant.tenant_id.clone(),
request: req,
response: Err(hyperswitch_domain_models::router_data::ErrorResponse::default()),
})
}
}

View File

@ -1174,13 +1174,21 @@ pub struct Subscription;
#[cfg(all(feature = "oltp", feature = "v1"))]
impl Subscription {
pub fn server(state: AppState) -> Scope {
web::scope("/subscription/create")
.app_data(web::Data::new(state.clone()))
.service(web::resource("").route(
let route = web::scope("/subscription").app_data(web::Data::new(state.clone()));
route
.service(web::resource("/create").route(
web::post().to(|state, req, payload| {
subscription::create_subscription(state, req, payload)
}),
))
.service(
web::resource("/{subscription_id}/confirm").route(web::post().to(
|state, req, id, payload| {
subscription::confirm_subscription(state, req, id, payload)
},
)),
)
}
}

View File

@ -88,7 +88,7 @@ impl From<Flow> for ApiIdentifier {
| Flow::DecisionEngineDecideGatewayCall
| Flow::DecisionEngineGatewayFeedbackCall => Self::Routing,
Flow::CreateSubscription => Self::Subscription,
Flow::CreateSubscription | Flow::ConfirmSubscription => Self::Subscription,
Flow::RetrieveForexFlow => Self::Forex,
Flow::AddToBlocklist => Self::Blocklist,

View File

@ -67,3 +67,55 @@ pub async fn create_subscription(
))
.await
}
#[cfg(all(feature = "olap", feature = "v1"))]
#[instrument(skip_all)]
pub async fn confirm_subscription(
state: web::Data<AppState>,
req: HttpRequest,
subscription_id: web::Path<common_utils::id_type::SubscriptionId>,
json_payload: web::Json<subscription_types::ConfirmSubscriptionRequest>,
) -> impl Responder {
let flow = Flow::ConfirmSubscription;
let subscription_id = subscription_id.into_inner();
let profile_id = match req.headers().get(X_PROFILE_ID) {
Some(val) => val.to_str().unwrap_or_default().to_string(),
None => {
return HttpResponse::BadRequest().json(
errors::api_error_response::ApiErrorResponse::MissingRequiredField {
field_name: "x-profile-id",
},
);
}
};
Box::pin(oss_api::server_wrap(
flow,
state,
&req,
json_payload.into_inner(),
|state, auth: auth::AuthenticationData, payload, _| {
let merchant_context = domain::MerchantContext::NormalMerchant(Box::new(
domain::Context(auth.merchant_account, auth.key_store),
));
subscription::confirm_subscription(
state,
merchant_context,
profile_id.clone(),
payload.clone(),
subscription_id.clone(),
)
},
auth::auth_type(
&auth::HeaderAuth(auth::ApiKeyAuth {
is_connected_allowed: false,
is_platform_allowed: false,
}),
&auth::JWTAuth {
permission: Permission::ProfileSubscriptionWrite,
},
req.headers(),
),
api_locking::LockAction::NotApplicable,
))
.await
}

View File

@ -3,6 +3,7 @@ use std::{fmt::Debug, marker::PhantomData, str::FromStr, sync::Arc, time::Durati
use async_trait::async_trait;
use common_utils::{id_type::GenerateId, pii::Email};
use error_stack::Report;
use hyperswitch_domain_models::router_data_v2::flow_common_types::PaymentFlowData;
use masking::Secret;
use router::{
configs::settings::Settings,
@ -117,7 +118,8 @@ pub trait ConnectorActions: Connector {
payment_data: Option<types::ConnectorCustomerData>,
payment_info: Option<PaymentInfo>,
) -> Result<types::ConnectorCustomerRouterData, Report<ConnectorError>> {
let integration = self.get_data().connector.get_connector_integration();
let integration: BoxedConnectorIntegrationInterface<_, PaymentFlowData, _, _> =
self.get_data().connector.get_connector_integration();
let request = self.generate_data(
types::ConnectorCustomerData {
..(payment_data.unwrap_or(CustomerType::default().0))