From 0a35c617e6d7f410e623f1505c80ff57f7326cd5 Mon Sep 17 00:00:00 2001 From: Sarthak Soni <76486416+Sarthak1799@users.noreply.github.com> Date: Sat, 4 Oct 2025 00:41:38 +0530 Subject: [PATCH] feat(subscriptions): Invoice record back workflow (#9529) Co-authored-by: Prajjwal kumar Co-authored-by: Prajjwal Kumar Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Co-authored-by: Jagan Elavarasan --- crates/api_models/src/subscription.rs | 1 + crates/common_enums/src/enums.rs | 1 + .../src/connectors/chargebee.rs | 26 +- .../src/connectors/recurly.rs | 25 +- .../src/connectors/stripebilling.rs | 69 ++- .../src/default_implementations.rs | 34 +- .../src/router_data_v2/flow_common_types.rs | 4 +- .../src/api/subscriptions.rs | 66 ++- .../src/api/subscriptions_v2.rs | 25 +- .../src/conversion_impls.rs | 16 +- crates/router/src/bin/scheduler.rs | 3 + .../router/src/core/revenue_recovery/types.rs | 4 +- crates/router/src/core/subscription.rs | 64 ++- .../subscription/billing_processor_handler.rs | 77 +++- .../src/core/subscription/invoice_handler.rs | 57 ++- crates/router/src/types/storage.rs | 1 + .../router/src/types/storage/invoice_sync.rs | 113 +++++ crates/router/src/workflows.rs | 2 + crates/router/src/workflows/invoice_sync.rs | 436 ++++++++++++++++++ .../src/consumer/types/process_data.rs | 20 + crates/scheduler/src/utils.rs | 17 + 21 files changed, 950 insertions(+), 111 deletions(-) create mode 100644 crates/router/src/types/storage/invoice_sync.rs create mode 100644 crates/router/src/workflows/invoice_sync.rs diff --git a/crates/api_models/src/subscription.rs b/crates/api_models/src/subscription.rs index c0403848b9..c9cc3acd2c 100644 --- a/crates/api_models/src/subscription.rs +++ b/crates/api_models/src/subscription.rs @@ -226,6 +226,7 @@ pub struct PaymentResponseData { pub payment_experience: Option, pub error_code: Option, pub error_message: Option, + pub payment_method_type: Option, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema)] pub struct ConfirmSubscriptionRequest { diff --git a/crates/common_enums/src/enums.rs b/crates/common_enums/src/enums.rs index 83aeda0dfb..b1859cc6c2 100644 --- a/crates/common_enums/src/enums.rs +++ b/crates/common_enums/src/enums.rs @@ -9434,6 +9434,7 @@ pub enum ProcessTrackerRunner { PassiveRecoveryWorkflow, ProcessDisputeWorkflow, DisputeListWorkflow, + InvoiceSyncflow, } #[derive(Debug)] diff --git a/crates/hyperswitch_connectors/src/connectors/chargebee.rs b/crates/hyperswitch_connectors/src/connectors/chargebee.rs index 4377302259..b2d66db1f4 100644 --- a/crates/hyperswitch_connectors/src/connectors/chargebee.rs +++ b/crates/hyperswitch_connectors/src/connectors/chargebee.rs @@ -737,16 +737,31 @@ impl ConnectorIntegration CustomResult { let metadata: chargebee::ChargebeeMetadata = utils::to_connector_meta_from_secret(req.connector_meta_data.clone())?; - let url = self - .base_url(connectors) - .to_string() - .replace("{{merchant_endpoint_prefix}}", metadata.site.peek()); + + let site = metadata.site.peek(); + + let mut base = self.base_url(connectors).to_string(); + + base = base.replace("{{merchant_endpoint_prefix}}", site); + base = base.replace("$", site); + + if base.contains("{{merchant_endpoint_prefix}}") || base.contains('$') { + return Err(errors::ConnectorError::InvalidConnectorConfig { + config: "Chargebee base_url has an unresolved placeholder (expected `$` or `{{merchant_endpoint_prefix}}`).", + } + .into()); + } + + if !base.ends_with('/') { + base.push('/'); + } + let invoice_id = req .request .merchant_reference_id .get_string_repr() .to_string(); - Ok(format!("{url}v2/invoices/{invoice_id}/record_payment")) + Ok(format!("{base}v2/invoices/{invoice_id}/record_payment")) } fn get_content_type(&self) -> &'static str { @@ -833,6 +848,7 @@ fn get_chargebee_plans_query_params( } impl api::subscriptions::GetSubscriptionPlansFlow for Chargebee {} +impl api::subscriptions::SubscriptionRecordBackFlow for Chargebee {} impl ConnectorIntegration< diff --git a/crates/hyperswitch_connectors/src/connectors/recurly.rs b/crates/hyperswitch_connectors/src/connectors/recurly.rs index bfa9a50a99..1abc555031 100644 --- a/crates/hyperswitch_connectors/src/connectors/recurly.rs +++ b/crates/hyperswitch_connectors/src/connectors/recurly.rs @@ -12,7 +12,7 @@ use hyperswitch_domain_models::{ router_data_v2::{ flow_common_types::{ GetSubscriptionEstimateData, GetSubscriptionPlanPricesData, GetSubscriptionPlansData, - SubscriptionCreateData, SubscriptionCustomerData, + InvoiceRecordBackData, SubscriptionCreateData, SubscriptionCustomerData, }, UasFlowData, }, @@ -24,9 +24,10 @@ use hyperswitch_domain_models::{ unified_authentication_service::{ Authenticate, AuthenticationConfirmation, PostAuthenticate, PreAuthenticate, }, - CreateConnectorCustomer, + CreateConnectorCustomer, InvoiceRecordBack, }, router_request_types::{ + revenue_recovery::InvoiceRecordBackRequest, subscriptions::{ GetSubscriptionEstimateRequest, GetSubscriptionPlanPricesRequest, GetSubscriptionPlansRequest, SubscriptionCreateRequest, @@ -39,6 +40,7 @@ use hyperswitch_domain_models::{ ConnectorCustomerData, }, router_response_types::{ + revenue_recovery::InvoiceRecordBackResponse, subscriptions::{ GetSubscriptionEstimateResponse, GetSubscriptionPlanPricesResponse, GetSubscriptionPlansResponse, SubscriptionCreateResponse, @@ -161,6 +163,7 @@ impl impl api::revenue_recovery_v2::RevenueRecoveryV2 for Recurly {} impl api::subscriptions_v2::SubscriptionsV2 for Recurly {} impl api::subscriptions_v2::GetSubscriptionPlansV2 for Recurly {} +impl api::subscriptions_v2::SubscriptionRecordBackV2 for Recurly {} impl api::subscriptions_v2::SubscriptionConnectorCustomerV2 for Recurly {} impl @@ -173,6 +176,16 @@ impl { } +#[cfg(feature = "v1")] +impl + ConnectorIntegrationV2< + InvoiceRecordBack, + InvoiceRecordBackData, + InvoiceRecordBackRequest, + InvoiceRecordBackResponse, + > for Recurly +{ +} impl ConnectorIntegrationV2< CreateConnectorCustomer, @@ -385,10 +398,10 @@ impl #[cfg(all(feature = "v2", feature = "revenue_recovery"))] impl ConnectorIntegrationV2< - recovery_router_flows::InvoiceRecordBack, - recovery_flow_common_types::InvoiceRecordBackData, - recovery_request_types::InvoiceRecordBackRequest, - recovery_response_types::InvoiceRecordBackResponse, + InvoiceRecordBack, + InvoiceRecordBackData, + InvoiceRecordBackRequest, + InvoiceRecordBackResponse, > for Recurly { fn get_headers( diff --git a/crates/hyperswitch_connectors/src/connectors/stripebilling.rs b/crates/hyperswitch_connectors/src/connectors/stripebilling.rs index 99d83e4ef8..3bbeabd42d 100644 --- a/crates/hyperswitch_connectors/src/connectors/stripebilling.rs +++ b/crates/hyperswitch_connectors/src/connectors/stripebilling.rs @@ -12,35 +12,35 @@ use common_utils::{ use error_stack::{report, ResultExt}; #[cfg(all(feature = "v2", feature = "revenue_recovery"))] use hyperswitch_domain_models::revenue_recovery; +#[cfg(all(feature = "v2", feature = "revenue_recovery"))] +use hyperswitch_domain_models::types as recovery_router_data_types; use hyperswitch_domain_models::{ router_data::{AccessToken, ConnectorAuthType, ErrorResponse, RouterData}, router_flow_types::{ access_token_auth::AccessTokenAuth, payments::{Authorize, Capture, PSync, PaymentMethodToken, Session, SetupMandate, Void}, refunds::{Execute, RSync}, + revenue_recovery as recovery_router_flows, subscriptions as subscription_flow_types, }, router_request_types::{ + revenue_recovery as recovery_request_types, subscriptions as subscription_request_types, AccessTokenRequestData, PaymentMethodTokenizationData, PaymentsAuthorizeData, PaymentsCancelData, PaymentsCaptureData, PaymentsSessionData, PaymentsSyncData, RefundsData, SetupMandateRequestData, }, - router_response_types::{ConnectorInfo, PaymentsResponseData, RefundsResponseData}, + router_response_types::{ + revenue_recovery as recovery_response_types, subscriptions as subscription_response_types, + ConnectorInfo, PaymentsResponseData, RefundsResponseData, + }, types::{ PaymentsAuthorizeRouterData, PaymentsCaptureRouterData, PaymentsSyncRouterData, RefundSyncRouterData, RefundsRouterData, }, }; -#[cfg(all(feature = "v2", feature = "revenue_recovery"))] -use hyperswitch_domain_models::{ - router_flow_types::revenue_recovery as recovery_router_flows, - router_request_types::revenue_recovery as recovery_request_types, - router_response_types::revenue_recovery as recovery_response_types, - types as recovery_router_data_types, -}; use hyperswitch_interfaces::{ api::{ - self, ConnectorCommon, ConnectorCommonExt, ConnectorIntegration, ConnectorSpecifications, - ConnectorValidation, + self, subscriptions as subscriptions_api, ConnectorCommon, ConnectorCommonExt, + ConnectorIntegration, ConnectorSpecifications, ConnectorValidation, }, configs::Connectors, errors, @@ -90,6 +90,45 @@ impl ConnectorIntegration for Stripebilling +{ +} +impl subscriptions_api::GetSubscriptionPlanPricesFlow for Stripebilling {} +impl + ConnectorIntegration< + subscription_flow_types::GetSubscriptionPlanPrices, + subscription_request_types::GetSubscriptionPlanPricesRequest, + subscription_response_types::GetSubscriptionPlanPricesResponse, + > for Stripebilling +{ +} +impl + ConnectorIntegration< + subscription_flow_types::SubscriptionCreate, + subscription_request_types::SubscriptionCreateRequest, + subscription_response_types::SubscriptionCreateResponse, + > for Stripebilling +{ +} +impl subscriptions_api::GetSubscriptionEstimateFlow for Stripebilling {} +impl + ConnectorIntegration< + subscription_flow_types::GetSubscriptionEstimate, + subscription_request_types::GetSubscriptionEstimateRequest, + subscription_response_types::GetSubscriptionEstimateResponse, + > for Stripebilling +{ +} + impl ConnectorCommonExt for Stripebilling where Self: ConnectorIntegration, @@ -660,6 +699,16 @@ impl } } +#[cfg(feature = "v1")] +impl + ConnectorIntegration< + recovery_router_flows::InvoiceRecordBack, + recovery_request_types::InvoiceRecordBackRequest, + recovery_response_types::InvoiceRecordBackResponse, + > for Stripebilling +{ +} + #[cfg(all(feature = "v2", feature = "revenue_recovery"))] impl ConnectorIntegration< diff --git a/crates/hyperswitch_connectors/src/default_implementations.rs b/crates/hyperswitch_connectors/src/default_implementations.rs index b3eb01ba92..e7e39adaf8 100644 --- a/crates/hyperswitch_connectors/src/default_implementations.rs +++ b/crates/hyperswitch_connectors/src/default_implementations.rs @@ -8,7 +8,7 @@ use common_enums::{CallConnectorAction, PaymentAction}; use common_utils::errors::CustomResult; #[cfg(all(feature = "v2", feature = "revenue_recovery"))] use hyperswitch_domain_models::router_flow_types::{ - BillingConnectorInvoiceSync, BillingConnectorPaymentsSync, InvoiceRecordBack, + BillingConnectorInvoiceSync, BillingConnectorPaymentsSync, }; #[cfg(feature = "dummy_connector")] use hyperswitch_domain_models::router_request_types::authentication::{ @@ -17,12 +17,10 @@ use hyperswitch_domain_models::router_request_types::authentication::{ #[cfg(all(feature = "v2", feature = "revenue_recovery"))] use hyperswitch_domain_models::router_request_types::revenue_recovery::{ BillingConnectorInvoiceSyncRequest, BillingConnectorPaymentsSyncRequest, - InvoiceRecordBackRequest, }; #[cfg(all(feature = "v2", feature = "revenue_recovery"))] use hyperswitch_domain_models::router_response_types::revenue_recovery::{ BillingConnectorInvoiceSyncResponse, BillingConnectorPaymentsSyncResponse, - InvoiceRecordBackResponse, }; use hyperswitch_domain_models::{ router_data::AccessTokenAuthenticationResponse, @@ -43,11 +41,12 @@ use hyperswitch_domain_models::{ webhooks::VerifyWebhookSource, AccessTokenAuthentication, Authenticate, AuthenticationConfirmation, ExternalVaultCreateFlow, ExternalVaultDeleteFlow, ExternalVaultInsertFlow, - ExternalVaultProxy, ExternalVaultRetrieveFlow, PostAuthenticate, PreAuthenticate, - SubscriptionCreate as SubscriptionCreateFlow, + ExternalVaultProxy, ExternalVaultRetrieveFlow, InvoiceRecordBack, PostAuthenticate, + PreAuthenticate, SubscriptionCreate as SubscriptionCreateFlow, }, router_request_types::{ authentication, + revenue_recovery::InvoiceRecordBackRequest, subscriptions::{ GetSubscriptionEstimateRequest, GetSubscriptionPlanPricesRequest, GetSubscriptionPlansRequest, SubscriptionCreateRequest, @@ -70,6 +69,7 @@ use hyperswitch_domain_models::{ VerifyWebhookSourceRequestData, }, router_response_types::{ + revenue_recovery::InvoiceRecordBackResponse, subscriptions::{ GetSubscriptionEstimateResponse, GetSubscriptionPlanPricesResponse, GetSubscriptionPlansResponse, SubscriptionCreateResponse, @@ -139,7 +139,7 @@ use hyperswitch_interfaces::{ revenue_recovery::RevenueRecovery, subscriptions::{ GetSubscriptionEstimateFlow, GetSubscriptionPlanPricesFlow, GetSubscriptionPlansFlow, - SubscriptionCreate, Subscriptions, + SubscriptionCreate, SubscriptionRecordBackFlow, Subscriptions, }, vault::{ ExternalVault, ExternalVaultCreate, ExternalVaultDelete, ExternalVaultInsert, @@ -7186,6 +7186,7 @@ macro_rules! default_imp_for_subscriptions { ($($path:ident::$connector:ident),*) => { $( impl Subscriptions for $path::$connector {} impl GetSubscriptionPlansFlow for $path::$connector {} + impl SubscriptionRecordBackFlow for $path::$connector {} impl SubscriptionCreate for $path::$connector {} impl ConnectorIntegration< @@ -7193,6 +7194,9 @@ macro_rules! default_imp_for_subscriptions { GetSubscriptionPlansRequest, GetSubscriptionPlansResponse > for $path::$connector + {} + impl + ConnectorIntegration for $path::$connector {} impl GetSubscriptionPlanPricesFlow for $path::$connector {} impl @@ -7330,7 +7334,6 @@ default_imp_for_subscriptions!( connectors::Stax, connectors::Stripe, connectors::Square, - connectors::Stripebilling, connectors::Taxjar, connectors::Tesouro, connectors::Threedsecureio, @@ -7508,13 +7511,6 @@ default_imp_for_billing_connector_payment_sync!( macro_rules! default_imp_for_revenue_recovery_record_back { ($($path:ident::$connector:ident),*) => { $( impl recovery_traits::RevenueRecoveryRecordBack for $path::$connector {} - impl - ConnectorIntegration< - InvoiceRecordBack, - InvoiceRecordBackRequest, - InvoiceRecordBackResponse - > for $path::$connector - {} )* }; } @@ -9561,6 +9557,9 @@ impl #[cfg(feature = "dummy_connector")] impl GetSubscriptionPlansFlow for connectors::DummyConnector {} + +#[cfg(feature = "dummy_connector")] +impl SubscriptionRecordBackFlow for connectors::DummyConnector {} #[cfg(feature = "dummy_connector")] impl ConnectorIntegration< @@ -9571,6 +9570,13 @@ impl { } +#[cfg(all(feature = "dummy_connector", feature = "v1"))] +impl + ConnectorIntegration + for connectors::DummyConnector +{ +} + #[cfg(feature = "dummy_connector")] impl SubscriptionCreate for connectors::DummyConnector {} diff --git a/crates/hyperswitch_domain_models/src/router_data_v2/flow_common_types.rs b/crates/hyperswitch_domain_models/src/router_data_v2/flow_common_types.rs index 245d57064b..f012c62b8b 100644 --- a/crates/hyperswitch_domain_models/src/router_data_v2/flow_common_types.rs +++ b/crates/hyperswitch_domain_models/src/router_data_v2/flow_common_types.rs @@ -148,7 +148,9 @@ pub struct FilesFlowData { } #[derive(Debug, Clone)] -pub struct InvoiceRecordBackData; +pub struct InvoiceRecordBackData { + pub connector_meta_data: Option, +} #[derive(Debug, Clone)] pub struct SubscriptionCustomerData { diff --git a/crates/hyperswitch_interfaces/src/api/subscriptions.rs b/crates/hyperswitch_interfaces/src/api/subscriptions.rs index bea4c4773e..7f9f4ea827 100644 --- a/crates/hyperswitch_interfaces/src/api/subscriptions.rs +++ b/crates/hyperswitch_interfaces/src/api/subscriptions.rs @@ -1,26 +1,33 @@ //! Subscriptions Interface for V1 -#[cfg(feature = "v1")] + use hyperswitch_domain_models::{ - router_flow_types::subscriptions::SubscriptionCreate as SubscriptionCreateFlow, - router_flow_types::subscriptions::{ - GetSubscriptionEstimate, GetSubscriptionPlanPrices, GetSubscriptionPlans, + router_flow_types::{ + subscriptions::{ + GetSubscriptionEstimate, GetSubscriptionPlanPrices, GetSubscriptionPlans, + SubscriptionCreate as SubscriptionCreateFlow, + }, + InvoiceRecordBack, }, - router_request_types::subscriptions::{ - GetSubscriptionEstimateRequest, GetSubscriptionPlanPricesRequest, - GetSubscriptionPlansRequest, SubscriptionCreateRequest, + router_request_types::{ + revenue_recovery::InvoiceRecordBackRequest, + subscriptions::{ + GetSubscriptionEstimateRequest, GetSubscriptionPlanPricesRequest, + GetSubscriptionPlansRequest, SubscriptionCreateRequest, + }, }, - router_response_types::subscriptions::{ - GetSubscriptionEstimateResponse, GetSubscriptionPlanPricesResponse, - GetSubscriptionPlansResponse, SubscriptionCreateResponse, + router_response_types::{ + revenue_recovery::InvoiceRecordBackResponse, + subscriptions::{ + GetSubscriptionEstimateResponse, GetSubscriptionPlanPricesResponse, + GetSubscriptionPlansResponse, SubscriptionCreateResponse, + }, }, }; -#[cfg(feature = "v1")] use super::{ payments::ConnectorCustomer as PaymentsConnectorCustomer, ConnectorCommon, ConnectorIntegration, }; -#[cfg(feature = "v1")] /// trait GetSubscriptionPlans for V1 pub trait GetSubscriptionPlansFlow: ConnectorIntegration< @@ -31,7 +38,12 @@ pub trait GetSubscriptionPlansFlow: { } -#[cfg(feature = "v1")] +/// trait SubscriptionRecordBack for V1 +pub trait SubscriptionRecordBackFlow: + ConnectorIntegration +{ +} + /// trait GetSubscriptionPlanPrices for V1 pub trait GetSubscriptionPlanPricesFlow: ConnectorIntegration< @@ -42,14 +54,12 @@ pub trait GetSubscriptionPlanPricesFlow: { } -#[cfg(feature = "v1")] /// trait SubscriptionCreate pub trait SubscriptionCreate: ConnectorIntegration { } -#[cfg(feature = "v1")] /// trait GetSubscriptionEstimate for V1 pub trait GetSubscriptionEstimateFlow: ConnectorIntegration< @@ -60,37 +70,13 @@ pub trait GetSubscriptionEstimateFlow: { } /// trait Subscriptions -#[cfg(feature = "v1")] pub trait Subscriptions: ConnectorCommon + GetSubscriptionPlansFlow + GetSubscriptionPlanPricesFlow + SubscriptionCreate + PaymentsConnectorCustomer + + SubscriptionRecordBackFlow + GetSubscriptionEstimateFlow { } - -/// trait Subscriptions (disabled when not V1) -#[cfg(not(feature = "v1"))] -pub trait Subscriptions {} - -/// trait GetSubscriptionPlansFlow (disabled when not V1) -#[cfg(not(feature = "v1"))] -pub trait GetSubscriptionPlansFlow {} - -/// trait GetSubscriptionPlanPricesFlow (disabled when not V1) -#[cfg(not(feature = "v1"))] -pub trait GetSubscriptionPlanPricesFlow {} - -#[cfg(not(feature = "v1"))] -/// trait CreateCustomer (disabled when not V1) -pub trait ConnectorCustomer {} - -/// trait SubscriptionCreate -#[cfg(not(feature = "v1"))] -pub trait SubscriptionCreate {} - -/// trait GetSubscriptionEstimateFlow (disabled when not V1) -#[cfg(not(feature = "v1"))] -pub trait GetSubscriptionEstimateFlow {} diff --git a/crates/hyperswitch_interfaces/src/api/subscriptions_v2.rs b/crates/hyperswitch_interfaces/src/api/subscriptions_v2.rs index 47a8b4484a..9dd9fa6e06 100644 --- a/crates/hyperswitch_interfaces/src/api/subscriptions_v2.rs +++ b/crates/hyperswitch_interfaces/src/api/subscriptions_v2.rs @@ -2,13 +2,18 @@ use hyperswitch_domain_models::{ router_data_v2::flow_common_types::{ GetSubscriptionEstimateData, GetSubscriptionPlanPricesData, GetSubscriptionPlansData, - SubscriptionCreateData, SubscriptionCustomerData, + InvoiceRecordBackData, SubscriptionCreateData, SubscriptionCustomerData, }, router_flow_types::{ - subscriptions::{GetSubscriptionPlanPrices, GetSubscriptionPlans, SubscriptionCreate}, - CreateConnectorCustomer, GetSubscriptionEstimate, + revenue_recovery::InvoiceRecordBack, + subscriptions::{ + GetSubscriptionEstimate, GetSubscriptionPlanPrices, GetSubscriptionPlans, + SubscriptionCreate, + }, + CreateConnectorCustomer, }, router_request_types::{ + revenue_recovery::InvoiceRecordBackRequest, subscriptions::{ GetSubscriptionEstimateRequest, GetSubscriptionPlanPricesRequest, GetSubscriptionPlansRequest, SubscriptionCreateRequest, @@ -16,6 +21,7 @@ use hyperswitch_domain_models::{ ConnectorCustomerData, }, router_response_types::{ + revenue_recovery::InvoiceRecordBackResponse, subscriptions::{ GetSubscriptionEstimateResponse, GetSubscriptionPlanPricesResponse, GetSubscriptionPlansResponse, SubscriptionCreateResponse, @@ -32,6 +38,7 @@ pub trait SubscriptionsV2: + SubscriptionsCreateV2 + SubscriptionConnectorCustomerV2 + GetSubscriptionPlanPricesV2 + + SubscriptionRecordBackV2 + GetSubscriptionEstimateV2 { } @@ -47,7 +54,17 @@ pub trait GetSubscriptionPlansV2: { } -/// trait GetSubscriptionPlans for V2 +/// trait SubscriptionRecordBack for V2 +pub trait SubscriptionRecordBackV2: + ConnectorIntegrationV2< + InvoiceRecordBack, + InvoiceRecordBackData, + InvoiceRecordBackRequest, + InvoiceRecordBackResponse, +> +{ +} +/// trait GetSubscriptionPlanPricesV2 for V2 pub trait GetSubscriptionPlanPricesV2: ConnectorIntegrationV2< GetSubscriptionPlanPrices, diff --git a/crates/hyperswitch_interfaces/src/conversion_impls.rs b/crates/hyperswitch_interfaces/src/conversion_impls.rs index a1172f160c..9c5e420e86 100644 --- a/crates/hyperswitch_interfaces/src/conversion_impls.rs +++ b/crates/hyperswitch_interfaces/src/conversion_impls.rs @@ -808,7 +808,9 @@ impl RouterDataConversion for InvoiceR where Self: Sized, { - let resource_common_data = Self {}; + let resource_common_data = Self { + connector_meta_data: old_router_data.connector_meta_data.clone(), + }; Ok(RouterDataV2 { flow: std::marker::PhantomData, tenant_id: old_router_data.tenant_id.clone(), @@ -825,16 +827,18 @@ impl RouterDataConversion for InvoiceR where Self: Sized, { - let router_data = get_default_router_data( + let Self { + connector_meta_data, + } = new_router_data.resource_common_data; + let mut router_data = get_default_router_data( new_router_data.tenant_id.clone(), "recovery_record_back", new_router_data.request, new_router_data.response, ); - Ok(RouterData { - connector_auth_type: new_router_data.connector_auth_type.clone(), - ..router_data - }) + router_data.connector_meta_data = connector_meta_data; + router_data.connector_auth_type = new_router_data.connector_auth_type.clone(); + Ok(router_data) } } diff --git a/crates/router/src/bin/scheduler.rs b/crates/router/src/bin/scheduler.rs index 492c07e5ab..a442a338c6 100644 --- a/crates/router/src/bin/scheduler.rs +++ b/crates/router/src/bin/scheduler.rs @@ -287,6 +287,9 @@ impl ProcessTrackerWorkflows for WorkflowRunner { storage::ProcessTrackerRunner::DisputeListWorkflow => { Ok(Box::new(workflows::dispute_list::DisputeListWorkflow)) } + storage::ProcessTrackerRunner::InvoiceSyncflow => { + Ok(Box::new(workflows::invoice_sync::InvoiceSyncWorkflow)) + } storage::ProcessTrackerRunner::DeleteTokenizeDataWorkflow => Ok(Box::new( workflows::tokenized_data::DeleteTokenizeDataWorkflow, )), diff --git a/crates/router/src/core/revenue_recovery/types.rs b/crates/router/src/core/revenue_recovery/types.rs index 3bdc822859..bf90eb0d8c 100644 --- a/crates/router/src/core/revenue_recovery/types.rs +++ b/crates/router/src/core/revenue_recovery/types.rs @@ -1394,7 +1394,9 @@ pub fn construct_invoice_record_back_router_data( let router_data = router_data_v2::RouterDataV2 { flow: PhantomData::, tenant_id: state.tenant.tenant_id.clone(), - resource_common_data: flow_common_types::InvoiceRecordBackData, + resource_common_data: flow_common_types::InvoiceRecordBackData { + connector_meta_data: None, + }, connector_auth_type: auth_type, request: revenue_recovery_request::InvoiceRecordBackRequest { merchant_reference_id, diff --git a/crates/router/src/core/subscription.rs b/crates/router/src/core/subscription.rs index 5c53a60c5d..7ef35b8c0d 100644 --- a/crates/router/src/core/subscription.rs +++ b/crates/router/src/core/subscription.rs @@ -39,8 +39,14 @@ pub async fn create_subscription( SubscriptionHandler::find_customer(&state, &merchant_context, &request.customer_id) .await .attach_printable("subscriptions: failed to find customer")?; - let billing_handler = - BillingHandler::create(&state, &merchant_context, customer, profile.clone()).await?; + let billing_handler = BillingHandler::create( + &state, + merchant_context.get_merchant_account(), + merchant_context.get_merchant_key_store(), + customer, + profile.clone(), + ) + .await?; let subscription_handler = SubscriptionHandler::new(&state, &merchant_context); let mut subscription = subscription_handler @@ -106,8 +112,14 @@ pub async fn create_and_confirm_subscription( .await .attach_printable("subscriptions: failed to find customer")?; - let billing_handler = - BillingHandler::create(&state, &merchant_context, customer, profile.clone()).await?; + let billing_handler = BillingHandler::create( + &state, + merchant_context.get_merchant_account(), + merchant_context.get_merchant_key_store(), + customer, + profile.clone(), + ) + .await?; let subscription_handler = SubscriptionHandler::new(&state, &merchant_context); let mut subs_handler = subscription_handler .create_subscription_entry( @@ -162,6 +174,7 @@ pub async fn create_and_confirm_subscription( amount, currency, invoice_details + .clone() .and_then(|invoice| invoice.status) .unwrap_or(connector_enums::InvoiceStatus::InvoiceCreated), billing_handler.connector_data.connector_name, @@ -169,9 +182,20 @@ pub async fn create_and_confirm_subscription( ) .await?; - // invoice_entry - // .create_invoice_record_back_job(&payment_response) - // .await?; + invoice_handler + .create_invoice_sync_job( + &state, + &invoice_entry, + invoice_details + .ok_or(errors::ApiErrorResponse::MissingRequiredField { + field_name: "invoice_details", + })? + .id + .get_string_repr() + .to_string(), + billing_handler.connector_data.connector_name, + ) + .await?; subs_handler .update_subscription(diesel_models::subscription::SubscriptionUpdate::new( @@ -231,8 +255,14 @@ pub async fn confirm_subscription( ) .await?; - let billing_handler = - BillingHandler::create(&state, &merchant_context, customer, profile.clone()).await?; + let billing_handler = BillingHandler::create( + &state, + merchant_context.get_merchant_account(), + merchant_context.get_merchant_key_store(), + customer, + profile.clone(), + ) + .await?; let invoice_handler = subscription_entry.get_invoice_handler(profile); let subscription = subscription_entry.subscription.clone(); @@ -270,6 +300,22 @@ pub async fn confirm_subscription( ) .await?; + invoice_handler + .create_invoice_sync_job( + &state, + &invoice_entry, + invoice_details + .clone() + .ok_or(errors::ApiErrorResponse::MissingRequiredField { + field_name: "invoice_details", + })? + .id + .get_string_repr() + .to_string(), + billing_handler.connector_data.connector_name, + ) + .await?; + subscription_entry .update_subscription(diesel_models::subscription::SubscriptionUpdate::new( payment_response.payment_method_id.clone(), diff --git a/crates/router/src/core/subscription/billing_processor_handler.rs b/crates/router/src/core/subscription/billing_processor_handler.rs index 7e0e303f04..4cc8ad3107 100644 --- a/crates/router/src/core/subscription/billing_processor_handler.rs +++ b/crates/router/src/core/subscription/billing_processor_handler.rs @@ -4,12 +4,16 @@ use common_enums::connector_enums; use common_utils::{ext_traits::ValueExt, pii}; use error_stack::ResultExt; use hyperswitch_domain_models::{ - merchant_context::MerchantContext, - router_data_v2::flow_common_types::{SubscriptionCreateData, SubscriptionCustomerData}, - router_request_types::{subscriptions as subscription_request_types, ConnectorCustomerData}, + router_data_v2::flow_common_types::{ + InvoiceRecordBackData, SubscriptionCreateData, SubscriptionCustomerData, + }, + router_request_types::{ + revenue_recovery::InvoiceRecordBackRequest, subscriptions as subscription_request_types, + ConnectorCustomerData, + }, router_response_types::{ - subscriptions as subscription_response_types, ConnectorCustomerResponseData, - PaymentsResponseData, + revenue_recovery::InvoiceRecordBackResponse, subscriptions as subscription_response_types, + ConnectorCustomerResponseData, PaymentsResponseData, }, }; @@ -31,7 +35,8 @@ pub struct BillingHandler { impl BillingHandler { pub async fn create( state: &SessionState, - merchant_context: &MerchantContext, + merchant_account: &hyperswitch_domain_models::merchant_account::MerchantAccount, + key_store: &hyperswitch_domain_models::merchant_key_store::MerchantKeyStore, customer: hyperswitch_domain_models::customer::Customer, profile: hyperswitch_domain_models::business_profile::Profile, ) -> errors::RouterResult { @@ -41,9 +46,9 @@ impl BillingHandler { .store .find_by_merchant_connector_account_merchant_id_merchant_connector_id( &(state).into(), - merchant_context.get_merchant_account().get_id(), + merchant_account.get_id(), &merchant_connector_id, - merchant_context.get_merchant_key_store(), + key_store, ) .await .change_context(errors::ApiErrorResponse::MerchantConnectorAccountNotFound { @@ -213,6 +218,62 @@ impl BillingHandler { .into()), } } + #[allow(clippy::too_many_arguments)] + pub async fn record_back_to_billing_processor( + &self, + state: &SessionState, + invoice_id: String, + payment_id: common_utils::id_type::PaymentId, + payment_status: common_enums::AttemptStatus, + amount: common_utils::types::MinorUnit, + currency: common_enums::Currency, + payment_method_type: Option, + ) -> errors::RouterResult { + let invoice_record_back_req = InvoiceRecordBackRequest { + amount, + currency, + payment_method_type, + attempt_status: payment_status, + merchant_reference_id: common_utils::id_type::PaymentReferenceId::from_str(&invoice_id) + .change_context(errors::ApiErrorResponse::InvalidDataValue { + field_name: "invoice_id", + })?, + connector_params: self.connector_params.clone(), + connector_transaction_id: Some(common_utils::types::ConnectorTransactionId::TxnId( + payment_id.get_string_repr().to_string(), + )), + }; + + let router_data = self.build_router_data( + state, + invoice_record_back_req, + InvoiceRecordBackData { + connector_meta_data: self.connector_metadata.clone(), + }, + )?; + let connector_integration = self.connector_data.connector.get_connector_integration(); + + let response = self + .call_connector( + state, + router_data, + "invoice record back", + connector_integration, + ) + .await?; + + match response { + Ok(response_data) => Ok(response_data), + Err(err) => Err(errors::ApiErrorResponse::ExternalConnectorError { + code: err.code, + message: err.message, + connector: self.connector_data.connector_name.to_string(), + status_code: err.status_code, + reason: err.reason, + } + .into()), + } + } async fn call_connector( &self, diff --git a/crates/router/src/core/subscription/invoice_handler.rs b/crates/router/src/core/subscription/invoice_handler.rs index eae1b9d05c..3809474acf 100644 --- a/crates/router/src/core/subscription/invoice_handler.rs +++ b/crates/router/src/core/subscription/invoice_handler.rs @@ -9,7 +9,10 @@ use hyperswitch_domain_models::router_response_types::subscriptions as subscript use masking::{PeekInterface, Secret}; use super::errors; -use crate::{core::subscription::payments_api_client, routes::SessionState}; +use crate::{ + core::subscription::payments_api_client, routes::SessionState, types::storage as storage_types, + workflows::invoice_sync as invoice_sync_workflow, +}; pub struct InvoiceHandler { pub subscription: diesel_models::subscription::Subscription, @@ -19,9 +22,20 @@ pub struct InvoiceHandler { #[allow(clippy::todo)] impl InvoiceHandler { + pub fn new( + subscription: diesel_models::subscription::Subscription, + merchant_account: hyperswitch_domain_models::merchant_account::MerchantAccount, + profile: hyperswitch_domain_models::business_profile::Profile, + ) -> Self { + Self { + subscription, + merchant_account, + profile, + } + } #[allow(clippy::too_many_arguments)] pub async fn create_invoice_entry( - self, + &self, state: &SessionState, merchant_connector_id: common_utils::id_type::MerchantConnectorAccountId, payment_intent_id: Option, @@ -204,12 +218,41 @@ impl InvoiceHandler { .attach_printable("invoices: unable to get latest invoice from database") } - pub async fn create_invoice_record_back_job( + pub async fn get_invoice_by_id( &self, - // _invoice: &subscription_types::Invoice, - _payment_response: &subscription_types::PaymentResponseData, + state: &SessionState, + invoice_id: common_utils::id_type::InvoiceId, + ) -> errors::RouterResult { + state + .store + .find_invoice_by_invoice_id(invoice_id.get_string_repr().to_string()) + .await + .change_context(errors::ApiErrorResponse::SubscriptionError { + operation: "Get Invoice by ID".to_string(), + }) + .attach_printable("invoices: unable to get invoice by id from database") + } + + pub async fn create_invoice_sync_job( + &self, + state: &SessionState, + invoice: &diesel_models::invoice::Invoice, + connector_invoice_id: String, + connector_name: connector_enums::Connector, ) -> errors::RouterResult<()> { - // Create an invoice job entry based on payment status - todo!("Create an invoice job entry based on payment status") + let request = storage_types::invoice_sync::InvoiceSyncRequest::new( + self.subscription.id.to_owned(), + invoice.id.to_owned(), + self.subscription.merchant_id.to_owned(), + self.subscription.profile_id.to_owned(), + self.subscription.customer_id.to_owned(), + connector_invoice_id, + connector_name, + ); + + invoice_sync_workflow::create_invoice_sync_job(state, request) + .await + .attach_printable("invoices: unable to create invoice sync job in database")?; + Ok(()) } } diff --git a/crates/router/src/types/storage.rs b/crates/router/src/types/storage.rs index bc5fa611d3..5f4c0cd8c6 100644 --- a/crates/router/src/types/storage.rs +++ b/crates/router/src/types/storage.rs @@ -23,6 +23,7 @@ pub mod generic_link; pub mod gsm; pub mod hyperswitch_ai_interaction; pub mod invoice; +pub mod invoice_sync; #[cfg(feature = "kv_store")] pub mod kv; pub mod locker_mock_up; diff --git a/crates/router/src/types/storage/invoice_sync.rs b/crates/router/src/types/storage/invoice_sync.rs new file mode 100644 index 0000000000..01bbcf17d9 --- /dev/null +++ b/crates/router/src/types/storage/invoice_sync.rs @@ -0,0 +1,113 @@ +use api_models::enums as api_enums; +use common_utils::id_type; +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct InvoiceSyncTrackingData { + pub subscription_id: id_type::SubscriptionId, + pub invoice_id: id_type::InvoiceId, + pub merchant_id: id_type::MerchantId, + pub profile_id: id_type::ProfileId, + pub customer_id: id_type::CustomerId, + pub connector_invoice_id: String, + pub connector_name: api_enums::Connector, // The connector to which the invoice belongs +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct InvoiceSyncRequest { + pub subscription_id: id_type::SubscriptionId, + pub invoice_id: id_type::InvoiceId, + pub merchant_id: id_type::MerchantId, + pub profile_id: id_type::ProfileId, + pub customer_id: id_type::CustomerId, + pub connector_invoice_id: String, + pub connector_name: api_enums::Connector, +} + +impl From for InvoiceSyncTrackingData { + fn from(item: InvoiceSyncRequest) -> Self { + Self { + subscription_id: item.subscription_id, + invoice_id: item.invoice_id, + merchant_id: item.merchant_id, + profile_id: item.profile_id, + customer_id: item.customer_id, + connector_invoice_id: item.connector_invoice_id, + connector_name: item.connector_name, + } + } +} + +impl InvoiceSyncRequest { + #[allow(clippy::too_many_arguments)] + pub fn new( + subscription_id: id_type::SubscriptionId, + invoice_id: id_type::InvoiceId, + merchant_id: id_type::MerchantId, + profile_id: id_type::ProfileId, + customer_id: id_type::CustomerId, + connector_invoice_id: String, + connector_name: api_enums::Connector, + ) -> Self { + Self { + subscription_id, + invoice_id, + merchant_id, + profile_id, + customer_id, + connector_invoice_id, + connector_name, + } + } +} + +impl InvoiceSyncTrackingData { + #[allow(clippy::too_many_arguments)] + pub fn new( + subscription_id: id_type::SubscriptionId, + invoice_id: id_type::InvoiceId, + merchant_id: id_type::MerchantId, + profile_id: id_type::ProfileId, + customer_id: id_type::CustomerId, + connector_invoice_id: String, + connector_name: api_enums::Connector, + ) -> Self { + Self { + subscription_id, + invoice_id, + merchant_id, + profile_id, + customer_id, + connector_invoice_id, + connector_name, + } + } +} + +#[derive(Debug, Clone)] +pub enum InvoiceSyncPaymentStatus { + PaymentSucceeded, + PaymentProcessing, + PaymentFailed, +} + +impl From for InvoiceSyncPaymentStatus { + fn from(value: common_enums::IntentStatus) -> Self { + match value { + common_enums::IntentStatus::Succeeded => Self::PaymentSucceeded, + common_enums::IntentStatus::Processing + | common_enums::IntentStatus::RequiresCustomerAction + | common_enums::IntentStatus::RequiresConfirmation + | common_enums::IntentStatus::RequiresPaymentMethod => Self::PaymentProcessing, + _ => Self::PaymentFailed, + } + } +} + +impl From for common_enums::connector_enums::InvoiceStatus { + fn from(value: InvoiceSyncPaymentStatus) -> Self { + match value { + InvoiceSyncPaymentStatus::PaymentSucceeded => Self::InvoicePaid, + InvoiceSyncPaymentStatus::PaymentProcessing => Self::PaymentPending, + InvoiceSyncPaymentStatus::PaymentFailed => Self::PaymentFailed, + } + } +} diff --git a/crates/router/src/workflows.rs b/crates/router/src/workflows.rs index 2c2410f744..3c205377af 100644 --- a/crates/router/src/workflows.rs +++ b/crates/router/src/workflows.rs @@ -15,3 +15,5 @@ pub mod revenue_recovery; pub mod process_dispute; pub mod dispute_list; + +pub mod invoice_sync; diff --git a/crates/router/src/workflows/invoice_sync.rs b/crates/router/src/workflows/invoice_sync.rs new file mode 100644 index 0000000000..83aa45dc84 --- /dev/null +++ b/crates/router/src/workflows/invoice_sync.rs @@ -0,0 +1,436 @@ +#[cfg(feature = "v1")] +use api_models::subscription as subscription_types; +use async_trait::async_trait; +use common_utils::{ + errors::CustomResult, + ext_traits::{StringExt, ValueExt}, +}; +use diesel_models::{ + invoice::Invoice, process_tracker::business_status, subscription::Subscription, +}; +use error_stack::ResultExt; +use router_env::logger; +use scheduler::{ + consumer::{self, workflows::ProcessTrackerWorkflow}, + errors, + types::process_data, + utils as scheduler_utils, +}; + +#[cfg(feature = "v1")] +use crate::core::subscription::{ + billing_processor_handler as billing, invoice_handler, payments_api_client, +}; +use crate::{ + db::StorageInterface, + errors as router_errors, + routes::SessionState, + types::{domain, storage}, +}; + +const INVOICE_SYNC_WORKFLOW: &str = "INVOICE_SYNC"; +const INVOICE_SYNC_WORKFLOW_TAG: &str = "INVOICE"; +pub struct InvoiceSyncWorkflow; + +pub struct InvoiceSyncHandler<'a> { + pub state: &'a SessionState, + pub tracking_data: storage::invoice_sync::InvoiceSyncTrackingData, + pub key_store: domain::MerchantKeyStore, + pub merchant_account: domain::MerchantAccount, + pub customer: domain::Customer, + pub profile: domain::Profile, + pub subscription: Subscription, + pub invoice: Invoice, +} + +#[cfg(feature = "v1")] +impl<'a> InvoiceSyncHandler<'a> { + pub async fn create( + state: &'a SessionState, + tracking_data: storage::invoice_sync::InvoiceSyncTrackingData, + ) -> Result { + let key_manager_state = &state.into(); + let key_store = state + .store + .get_merchant_key_store_by_merchant_id( + key_manager_state, + &tracking_data.merchant_id, + &state.store.get_master_key().to_vec().into(), + ) + .await + .attach_printable("Failed to fetch Merchant key store from DB")?; + + let merchant_account = state + .store + .find_merchant_account_by_merchant_id( + key_manager_state, + &tracking_data.merchant_id, + &key_store, + ) + .await + .attach_printable("Subscriptions: Failed to fetch Merchant Account from DB")?; + + let profile = state + .store + .find_business_profile_by_profile_id( + &(state).into(), + &key_store, + &tracking_data.profile_id, + ) + .await + .attach_printable("Subscriptions: Failed to fetch Business Profile from DB")?; + + let customer = state + .store + .find_customer_by_customer_id_merchant_id( + &(state).into(), + &tracking_data.customer_id, + merchant_account.get_id(), + &key_store, + merchant_account.storage_scheme, + ) + .await + .attach_printable("Subscriptions: Failed to fetch Customer from DB")?; + + let subscription = state + .store + .find_by_merchant_id_subscription_id( + merchant_account.get_id(), + tracking_data.subscription_id.get_string_repr().to_string(), + ) + .await + .attach_printable("Subscriptions: Failed to fetch subscription from DB")?; + + let invoice = state + .store + .find_invoice_by_invoice_id(tracking_data.invoice_id.get_string_repr().to_string()) + .await + .attach_printable("invoices: unable to get latest invoice from database")?; + + Ok(Self { + state, + tracking_data, + key_store, + merchant_account, + customer, + profile, + subscription, + invoice, + }) + } + + async fn finish_process_with_business_status( + &self, + process: &storage::ProcessTracker, + business_status: &'static str, + ) -> CustomResult<(), router_errors::ApiErrorResponse> { + self.state + .store + .as_scheduler() + .finish_process_with_business_status(process.clone(), business_status) + .await + .change_context(router_errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to update process tracker status") + } + + pub async fn perform_payments_sync( + &self, + ) -> CustomResult + { + let payment_id = self.invoice.payment_intent_id.clone().ok_or( + router_errors::ApiErrorResponse::SubscriptionError { + operation: "Invoice_sync: Missing Payment Intent ID in Invoice".to_string(), + }, + )?; + let payments_response = payments_api_client::PaymentsApiClient::sync_payment( + self.state, + payment_id.get_string_repr().to_string(), + self.merchant_account.get_id().get_string_repr(), + self.profile.get_id().get_string_repr(), + ) + .await + .change_context(router_errors::ApiErrorResponse::SubscriptionError { + operation: "Invoice_sync: Failed to sync payment status from payments microservice" + .to_string(), + }) + .attach_printable("Failed to sync payment status from payments microservice")?; + + Ok(payments_response) + } + + pub async fn perform_billing_processor_record_back( + &self, + payment_response: subscription_types::PaymentResponseData, + payment_status: common_enums::AttemptStatus, + connector_invoice_id: String, + invoice_sync_status: storage::invoice_sync::InvoiceSyncPaymentStatus, + ) -> CustomResult<(), router_errors::ApiErrorResponse> { + logger::info!("perform_billing_processor_record_back"); + + let billing_handler = billing::BillingHandler::create( + self.state, + &self.merchant_account, + &self.key_store, + self.customer.clone(), + self.profile.clone(), + ) + .await + .attach_printable("Failed to create billing handler")?; + + let invoice_handler = invoice_handler::InvoiceHandler::new( + self.subscription.clone(), + self.merchant_account.clone(), + self.profile.clone(), + ); + + // TODO: Handle retries here on failure + billing_handler + .record_back_to_billing_processor( + self.state, + connector_invoice_id.clone(), + payment_response.payment_id.to_owned(), + payment_status, + payment_response.amount, + payment_response.currency, + payment_response.payment_method_type, + ) + .await + .attach_printable("Failed to record back to billing processor")?; + + invoice_handler + .update_invoice( + self.state, + self.invoice.id.to_owned(), + None, + common_enums::connector_enums::InvoiceStatus::from(invoice_sync_status), + ) + .await + .attach_printable("Failed to update invoice in DB")?; + + Ok(()) + } + + pub async fn transition_workflow_state( + &self, + process: storage::ProcessTracker, + payment_response: subscription_types::PaymentResponseData, + connector_invoice_id: String, + ) -> CustomResult<(), router_errors::ApiErrorResponse> { + let invoice_sync_status = + storage::invoice_sync::InvoiceSyncPaymentStatus::from(payment_response.status); + match invoice_sync_status { + storage::invoice_sync::InvoiceSyncPaymentStatus::PaymentSucceeded => { + Box::pin(self.perform_billing_processor_record_back( + payment_response, + common_enums::AttemptStatus::Charged, + connector_invoice_id, + invoice_sync_status, + )) + .await + .attach_printable("Failed to record back to billing processor")?; + + self.finish_process_with_business_status(&process, business_status::COMPLETED_BY_PT) + .await + .change_context(router_errors::ApiErrorResponse::SubscriptionError { + operation: "Invoice_sync process_tracker task completion".to_string(), + }) + .attach_printable("Failed to update process tracker status") + } + storage::invoice_sync::InvoiceSyncPaymentStatus::PaymentProcessing => { + retry_subscription_invoice_sync_task( + &*self.state.store, + self.tracking_data.connector_name.to_string().clone(), + self.merchant_account.get_id().to_owned(), + process, + ) + .await + .change_context(router_errors::ApiErrorResponse::SubscriptionError { + operation: "Invoice_sync process_tracker task retry".to_string(), + }) + .attach_printable("Failed to update process tracker status") + } + storage::invoice_sync::InvoiceSyncPaymentStatus::PaymentFailed => { + Box::pin(self.perform_billing_processor_record_back( + payment_response, + common_enums::AttemptStatus::Failure, + connector_invoice_id, + invoice_sync_status, + )) + .await + .attach_printable("Failed to record back to billing processor")?; + + self.finish_process_with_business_status(&process, business_status::COMPLETED_BY_PT) + .await + .change_context(router_errors::ApiErrorResponse::SubscriptionError { + operation: "Invoice_sync process_tracker task completion".to_string(), + }) + .attach_printable("Failed to update process tracker status") + } + } + } +} + +#[async_trait] +impl ProcessTrackerWorkflow for InvoiceSyncWorkflow { + #[cfg(feature = "v1")] + async fn execute_workflow<'a>( + &'a self, + state: &'a SessionState, + process: storage::ProcessTracker, + ) -> Result<(), errors::ProcessTrackerError> { + let tracking_data = process + .tracking_data + .clone() + .parse_value::( + "InvoiceSyncTrackingData", + )?; + + match process.name.as_deref() { + Some(INVOICE_SYNC_WORKFLOW) => { + Box::pin(perform_subscription_invoice_sync( + state, + process, + tracking_data, + )) + .await + } + _ => Err(errors::ProcessTrackerError::JobNotFound), + } + } + + async fn error_handler<'a>( + &'a self, + state: &'a SessionState, + process: storage::ProcessTracker, + error: errors::ProcessTrackerError, + ) -> CustomResult<(), errors::ProcessTrackerError> { + logger::error!("Encountered error"); + consumer::consumer_error_handler(state.store.as_scheduler(), process, error).await + } + + #[cfg(feature = "v2")] + async fn execute_workflow<'a>( + &'a self, + state: &'a SessionState, + process: storage::ProcessTracker, + ) -> Result<(), errors::ProcessTrackerError> { + Ok(()) + } +} + +#[cfg(feature = "v1")] +async fn perform_subscription_invoice_sync( + state: &SessionState, + process: storage::ProcessTracker, + tracking_data: storage::invoice_sync::InvoiceSyncTrackingData, +) -> Result<(), errors::ProcessTrackerError> { + let handler = InvoiceSyncHandler::create(state, tracking_data).await?; + + let payment_status = handler.perform_payments_sync().await?; + + Box::pin(handler.transition_workflow_state( + process, + payment_status, + handler.tracking_data.connector_invoice_id.clone(), + )) + .await?; + + Ok(()) +} + +pub async fn create_invoice_sync_job( + state: &SessionState, + request: storage::invoice_sync::InvoiceSyncRequest, +) -> CustomResult<(), router_errors::ApiErrorResponse> { + let tracking_data = storage::invoice_sync::InvoiceSyncTrackingData::from(request); + + let process_tracker_entry = diesel_models::ProcessTrackerNew::new( + common_utils::generate_id(crate::consts::ID_LENGTH, "proc"), + INVOICE_SYNC_WORKFLOW.to_string(), + common_enums::ProcessTrackerRunner::InvoiceSyncflow, + vec![INVOICE_SYNC_WORKFLOW_TAG.to_string()], + tracking_data, + Some(0), + common_utils::date_time::now(), + common_types::consts::API_VERSION, + ) + .change_context(router_errors::ApiErrorResponse::InternalServerError) + .attach_printable("subscriptions: unable to form process_tracker type")?; + + state + .store + .insert_process(process_tracker_entry) + .await + .change_context(router_errors::ApiErrorResponse::InternalServerError) + .attach_printable("subscriptions: unable to insert process_tracker entry in DB")?; + + Ok(()) +} + +pub async fn get_subscription_invoice_sync_process_schedule_time( + db: &dyn StorageInterface, + connector: &str, + merchant_id: &common_utils::id_type::MerchantId, + retry_count: i32, +) -> Result, errors::ProcessTrackerError> { + let mapping: CustomResult< + process_data::SubscriptionInvoiceSyncPTMapping, + router_errors::StorageError, + > = db + .find_config_by_key(&format!("invoice_sync_pt_mapping_{connector}")) + .await + .map(|value| value.config) + .and_then(|config| { + config + .parse_struct("SubscriptionInvoiceSyncPTMapping") + .change_context(router_errors::StorageError::DeserializationFailed) + .attach_printable("Failed to deserialize invoice_sync_pt_mapping config to struct") + }); + let mapping = match mapping { + Ok(x) => x, + Err(error) => { + logger::info!(?error, "Redis Mapping Error"); + process_data::SubscriptionInvoiceSyncPTMapping::default() + } + }; + + let time_delta = scheduler_utils::get_subscription_invoice_sync_retry_schedule_time( + mapping, + merchant_id, + retry_count, + ); + + Ok(scheduler_utils::get_time_from_delta(time_delta)) +} + +pub async fn retry_subscription_invoice_sync_task( + db: &dyn StorageInterface, + connector: String, + merchant_id: common_utils::id_type::MerchantId, + pt: storage::ProcessTracker, +) -> Result<(), errors::ProcessTrackerError> { + let schedule_time = get_subscription_invoice_sync_process_schedule_time( + db, + connector.as_str(), + &merchant_id, + pt.retry_count + 1, + ) + .await?; + + match schedule_time { + Some(s_time) => { + db.as_scheduler() + .retry_process(pt, s_time) + .await + .attach_printable("Failed to retry subscription invoice sync task")?; + } + None => { + db.as_scheduler() + .finish_process_with_business_status(pt, business_status::RETRIES_EXCEEDED) + .await + .attach_printable("Failed to finish subscription invoice sync task")?; + } + } + + Ok(()) +} diff --git a/crates/scheduler/src/consumer/types/process_data.rs b/crates/scheduler/src/consumer/types/process_data.rs index 26d0fdf702..6e236e1827 100644 --- a/crates/scheduler/src/consumer/types/process_data.rs +++ b/crates/scheduler/src/consumer/types/process_data.rs @@ -29,6 +29,26 @@ impl Default for ConnectorPTMapping { } } +#[derive(Serialize, Deserialize)] +pub struct SubscriptionInvoiceSyncPTMapping { + pub default_mapping: RetryMapping, + pub custom_merchant_mapping: HashMap, + pub max_retries_count: i32, +} + +impl Default for SubscriptionInvoiceSyncPTMapping { + fn default() -> Self { + Self { + custom_merchant_mapping: HashMap::new(), + default_mapping: RetryMapping { + start_after: 60, + frequencies: vec![(300, 5)], + }, + max_retries_count: 5, + } + } +} + #[derive(Serialize, Deserialize)] pub struct PaymentMethodsPTMapping { pub default_mapping: RetryMapping, diff --git a/crates/scheduler/src/utils.rs b/crates/scheduler/src/utils.rs index 58dd0be4da..0db72a24a1 100644 --- a/crates/scheduler/src/utils.rs +++ b/crates/scheduler/src/utils.rs @@ -386,6 +386,23 @@ pub fn get_pcr_payments_retry_schedule_time( } } +pub fn get_subscription_invoice_sync_retry_schedule_time( + mapping: process_data::SubscriptionInvoiceSyncPTMapping, + merchant_id: &common_utils::id_type::MerchantId, + retry_count: i32, +) -> Option { + let mapping = match mapping.custom_merchant_mapping.get(merchant_id) { + Some(map) => map.clone(), + None => mapping.default_mapping, + }; + + if retry_count == 0 { + Some(mapping.start_after) + } else { + get_delay(retry_count, &mapping.frequencies) + } +} + /// Get the delay based on the retry count pub fn get_delay<'a>( retry_count: i32,