feat(subscriptions): Invoice record back workflow (#9529)

Co-authored-by: Prajjwal kumar <write2prajjwal@gmail.com>
Co-authored-by: Prajjwal Kumar <prajjwal.kumar@juspay.in>
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: Jagan Elavarasan <jaganelavarasan@gmail.com>
This commit is contained in:
Sarthak Soni
2025-10-04 00:41:38 +05:30
committed by GitHub
parent 32dd9e10e3
commit 0a35c617e6
21 changed files with 950 additions and 111 deletions

View File

@ -287,6 +287,9 @@ impl ProcessTrackerWorkflows<routes::SessionState> for WorkflowRunner {
storage::ProcessTrackerRunner::DisputeListWorkflow => {
Ok(Box::new(workflows::dispute_list::DisputeListWorkflow))
}
storage::ProcessTrackerRunner::InvoiceSyncflow => {
Ok(Box::new(workflows::invoice_sync::InvoiceSyncWorkflow))
}
storage::ProcessTrackerRunner::DeleteTokenizeDataWorkflow => Ok(Box::new(
workflows::tokenized_data::DeleteTokenizeDataWorkflow,
)),

View File

@ -1394,7 +1394,9 @@ pub fn construct_invoice_record_back_router_data(
let router_data = router_data_v2::RouterDataV2 {
flow: PhantomData::<router_flow_types::InvoiceRecordBack>,
tenant_id: state.tenant.tenant_id.clone(),
resource_common_data: flow_common_types::InvoiceRecordBackData,
resource_common_data: flow_common_types::InvoiceRecordBackData {
connector_meta_data: None,
},
connector_auth_type: auth_type,
request: revenue_recovery_request::InvoiceRecordBackRequest {
merchant_reference_id,

View File

@ -39,8 +39,14 @@ pub async fn create_subscription(
SubscriptionHandler::find_customer(&state, &merchant_context, &request.customer_id)
.await
.attach_printable("subscriptions: failed to find customer")?;
let billing_handler =
BillingHandler::create(&state, &merchant_context, customer, profile.clone()).await?;
let billing_handler = BillingHandler::create(
&state,
merchant_context.get_merchant_account(),
merchant_context.get_merchant_key_store(),
customer,
profile.clone(),
)
.await?;
let subscription_handler = SubscriptionHandler::new(&state, &merchant_context);
let mut subscription = subscription_handler
@ -106,8 +112,14 @@ pub async fn create_and_confirm_subscription(
.await
.attach_printable("subscriptions: failed to find customer")?;
let billing_handler =
BillingHandler::create(&state, &merchant_context, customer, profile.clone()).await?;
let billing_handler = BillingHandler::create(
&state,
merchant_context.get_merchant_account(),
merchant_context.get_merchant_key_store(),
customer,
profile.clone(),
)
.await?;
let subscription_handler = SubscriptionHandler::new(&state, &merchant_context);
let mut subs_handler = subscription_handler
.create_subscription_entry(
@ -162,6 +174,7 @@ pub async fn create_and_confirm_subscription(
amount,
currency,
invoice_details
.clone()
.and_then(|invoice| invoice.status)
.unwrap_or(connector_enums::InvoiceStatus::InvoiceCreated),
billing_handler.connector_data.connector_name,
@ -169,9 +182,20 @@ pub async fn create_and_confirm_subscription(
)
.await?;
// invoice_entry
// .create_invoice_record_back_job(&payment_response)
// .await?;
invoice_handler
.create_invoice_sync_job(
&state,
&invoice_entry,
invoice_details
.ok_or(errors::ApiErrorResponse::MissingRequiredField {
field_name: "invoice_details",
})?
.id
.get_string_repr()
.to_string(),
billing_handler.connector_data.connector_name,
)
.await?;
subs_handler
.update_subscription(diesel_models::subscription::SubscriptionUpdate::new(
@ -231,8 +255,14 @@ pub async fn confirm_subscription(
)
.await?;
let billing_handler =
BillingHandler::create(&state, &merchant_context, customer, profile.clone()).await?;
let billing_handler = BillingHandler::create(
&state,
merchant_context.get_merchant_account(),
merchant_context.get_merchant_key_store(),
customer,
profile.clone(),
)
.await?;
let invoice_handler = subscription_entry.get_invoice_handler(profile);
let subscription = subscription_entry.subscription.clone();
@ -270,6 +300,22 @@ pub async fn confirm_subscription(
)
.await?;
invoice_handler
.create_invoice_sync_job(
&state,
&invoice_entry,
invoice_details
.clone()
.ok_or(errors::ApiErrorResponse::MissingRequiredField {
field_name: "invoice_details",
})?
.id
.get_string_repr()
.to_string(),
billing_handler.connector_data.connector_name,
)
.await?;
subscription_entry
.update_subscription(diesel_models::subscription::SubscriptionUpdate::new(
payment_response.payment_method_id.clone(),

View File

@ -4,12 +4,16 @@ use common_enums::connector_enums;
use common_utils::{ext_traits::ValueExt, pii};
use error_stack::ResultExt;
use hyperswitch_domain_models::{
merchant_context::MerchantContext,
router_data_v2::flow_common_types::{SubscriptionCreateData, SubscriptionCustomerData},
router_request_types::{subscriptions as subscription_request_types, ConnectorCustomerData},
router_data_v2::flow_common_types::{
InvoiceRecordBackData, SubscriptionCreateData, SubscriptionCustomerData,
},
router_request_types::{
revenue_recovery::InvoiceRecordBackRequest, subscriptions as subscription_request_types,
ConnectorCustomerData,
},
router_response_types::{
subscriptions as subscription_response_types, ConnectorCustomerResponseData,
PaymentsResponseData,
revenue_recovery::InvoiceRecordBackResponse, subscriptions as subscription_response_types,
ConnectorCustomerResponseData, PaymentsResponseData,
},
};
@ -31,7 +35,8 @@ pub struct BillingHandler {
impl BillingHandler {
pub async fn create(
state: &SessionState,
merchant_context: &MerchantContext,
merchant_account: &hyperswitch_domain_models::merchant_account::MerchantAccount,
key_store: &hyperswitch_domain_models::merchant_key_store::MerchantKeyStore,
customer: hyperswitch_domain_models::customer::Customer,
profile: hyperswitch_domain_models::business_profile::Profile,
) -> errors::RouterResult<Self> {
@ -41,9 +46,9 @@ impl BillingHandler {
.store
.find_by_merchant_connector_account_merchant_id_merchant_connector_id(
&(state).into(),
merchant_context.get_merchant_account().get_id(),
merchant_account.get_id(),
&merchant_connector_id,
merchant_context.get_merchant_key_store(),
key_store,
)
.await
.change_context(errors::ApiErrorResponse::MerchantConnectorAccountNotFound {
@ -213,6 +218,62 @@ impl BillingHandler {
.into()),
}
}
#[allow(clippy::too_many_arguments)]
pub async fn record_back_to_billing_processor(
&self,
state: &SessionState,
invoice_id: String,
payment_id: common_utils::id_type::PaymentId,
payment_status: common_enums::AttemptStatus,
amount: common_utils::types::MinorUnit,
currency: common_enums::Currency,
payment_method_type: Option<common_enums::PaymentMethodType>,
) -> errors::RouterResult<InvoiceRecordBackResponse> {
let invoice_record_back_req = InvoiceRecordBackRequest {
amount,
currency,
payment_method_type,
attempt_status: payment_status,
merchant_reference_id: common_utils::id_type::PaymentReferenceId::from_str(&invoice_id)
.change_context(errors::ApiErrorResponse::InvalidDataValue {
field_name: "invoice_id",
})?,
connector_params: self.connector_params.clone(),
connector_transaction_id: Some(common_utils::types::ConnectorTransactionId::TxnId(
payment_id.get_string_repr().to_string(),
)),
};
let router_data = self.build_router_data(
state,
invoice_record_back_req,
InvoiceRecordBackData {
connector_meta_data: self.connector_metadata.clone(),
},
)?;
let connector_integration = self.connector_data.connector.get_connector_integration();
let response = self
.call_connector(
state,
router_data,
"invoice record back",
connector_integration,
)
.await?;
match response {
Ok(response_data) => Ok(response_data),
Err(err) => Err(errors::ApiErrorResponse::ExternalConnectorError {
code: err.code,
message: err.message,
connector: self.connector_data.connector_name.to_string(),
status_code: err.status_code,
reason: err.reason,
}
.into()),
}
}
async fn call_connector<F, ResourceCommonData, Req, Resp>(
&self,

View File

@ -9,7 +9,10 @@ use hyperswitch_domain_models::router_response_types::subscriptions as subscript
use masking::{PeekInterface, Secret};
use super::errors;
use crate::{core::subscription::payments_api_client, routes::SessionState};
use crate::{
core::subscription::payments_api_client, routes::SessionState, types::storage as storage_types,
workflows::invoice_sync as invoice_sync_workflow,
};
pub struct InvoiceHandler {
pub subscription: diesel_models::subscription::Subscription,
@ -19,9 +22,20 @@ pub struct InvoiceHandler {
#[allow(clippy::todo)]
impl InvoiceHandler {
pub fn new(
subscription: diesel_models::subscription::Subscription,
merchant_account: hyperswitch_domain_models::merchant_account::MerchantAccount,
profile: hyperswitch_domain_models::business_profile::Profile,
) -> Self {
Self {
subscription,
merchant_account,
profile,
}
}
#[allow(clippy::too_many_arguments)]
pub async fn create_invoice_entry(
self,
&self,
state: &SessionState,
merchant_connector_id: common_utils::id_type::MerchantConnectorAccountId,
payment_intent_id: Option<common_utils::id_type::PaymentId>,
@ -204,12 +218,41 @@ impl InvoiceHandler {
.attach_printable("invoices: unable to get latest invoice from database")
}
pub async fn create_invoice_record_back_job(
pub async fn get_invoice_by_id(
&self,
// _invoice: &subscription_types::Invoice,
_payment_response: &subscription_types::PaymentResponseData,
state: &SessionState,
invoice_id: common_utils::id_type::InvoiceId,
) -> errors::RouterResult<diesel_models::invoice::Invoice> {
state
.store
.find_invoice_by_invoice_id(invoice_id.get_string_repr().to_string())
.await
.change_context(errors::ApiErrorResponse::SubscriptionError {
operation: "Get Invoice by ID".to_string(),
})
.attach_printable("invoices: unable to get invoice by id from database")
}
pub async fn create_invoice_sync_job(
&self,
state: &SessionState,
invoice: &diesel_models::invoice::Invoice,
connector_invoice_id: String,
connector_name: connector_enums::Connector,
) -> errors::RouterResult<()> {
// Create an invoice job entry based on payment status
todo!("Create an invoice job entry based on payment status")
let request = storage_types::invoice_sync::InvoiceSyncRequest::new(
self.subscription.id.to_owned(),
invoice.id.to_owned(),
self.subscription.merchant_id.to_owned(),
self.subscription.profile_id.to_owned(),
self.subscription.customer_id.to_owned(),
connector_invoice_id,
connector_name,
);
invoice_sync_workflow::create_invoice_sync_job(state, request)
.await
.attach_printable("invoices: unable to create invoice sync job in database")?;
Ok(())
}
}

View File

@ -23,6 +23,7 @@ pub mod generic_link;
pub mod gsm;
pub mod hyperswitch_ai_interaction;
pub mod invoice;
pub mod invoice_sync;
#[cfg(feature = "kv_store")]
pub mod kv;
pub mod locker_mock_up;

View File

@ -0,0 +1,113 @@
use api_models::enums as api_enums;
use common_utils::id_type;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct InvoiceSyncTrackingData {
pub subscription_id: id_type::SubscriptionId,
pub invoice_id: id_type::InvoiceId,
pub merchant_id: id_type::MerchantId,
pub profile_id: id_type::ProfileId,
pub customer_id: id_type::CustomerId,
pub connector_invoice_id: String,
pub connector_name: api_enums::Connector, // The connector to which the invoice belongs
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct InvoiceSyncRequest {
pub subscription_id: id_type::SubscriptionId,
pub invoice_id: id_type::InvoiceId,
pub merchant_id: id_type::MerchantId,
pub profile_id: id_type::ProfileId,
pub customer_id: id_type::CustomerId,
pub connector_invoice_id: String,
pub connector_name: api_enums::Connector,
}
impl From<InvoiceSyncRequest> for InvoiceSyncTrackingData {
fn from(item: InvoiceSyncRequest) -> Self {
Self {
subscription_id: item.subscription_id,
invoice_id: item.invoice_id,
merchant_id: item.merchant_id,
profile_id: item.profile_id,
customer_id: item.customer_id,
connector_invoice_id: item.connector_invoice_id,
connector_name: item.connector_name,
}
}
}
impl InvoiceSyncRequest {
#[allow(clippy::too_many_arguments)]
pub fn new(
subscription_id: id_type::SubscriptionId,
invoice_id: id_type::InvoiceId,
merchant_id: id_type::MerchantId,
profile_id: id_type::ProfileId,
customer_id: id_type::CustomerId,
connector_invoice_id: String,
connector_name: api_enums::Connector,
) -> Self {
Self {
subscription_id,
invoice_id,
merchant_id,
profile_id,
customer_id,
connector_invoice_id,
connector_name,
}
}
}
impl InvoiceSyncTrackingData {
#[allow(clippy::too_many_arguments)]
pub fn new(
subscription_id: id_type::SubscriptionId,
invoice_id: id_type::InvoiceId,
merchant_id: id_type::MerchantId,
profile_id: id_type::ProfileId,
customer_id: id_type::CustomerId,
connector_invoice_id: String,
connector_name: api_enums::Connector,
) -> Self {
Self {
subscription_id,
invoice_id,
merchant_id,
profile_id,
customer_id,
connector_invoice_id,
connector_name,
}
}
}
#[derive(Debug, Clone)]
pub enum InvoiceSyncPaymentStatus {
PaymentSucceeded,
PaymentProcessing,
PaymentFailed,
}
impl From<common_enums::IntentStatus> for InvoiceSyncPaymentStatus {
fn from(value: common_enums::IntentStatus) -> Self {
match value {
common_enums::IntentStatus::Succeeded => Self::PaymentSucceeded,
common_enums::IntentStatus::Processing
| common_enums::IntentStatus::RequiresCustomerAction
| common_enums::IntentStatus::RequiresConfirmation
| common_enums::IntentStatus::RequiresPaymentMethod => Self::PaymentProcessing,
_ => Self::PaymentFailed,
}
}
}
impl From<InvoiceSyncPaymentStatus> for common_enums::connector_enums::InvoiceStatus {
fn from(value: InvoiceSyncPaymentStatus) -> Self {
match value {
InvoiceSyncPaymentStatus::PaymentSucceeded => Self::InvoicePaid,
InvoiceSyncPaymentStatus::PaymentProcessing => Self::PaymentPending,
InvoiceSyncPaymentStatus::PaymentFailed => Self::PaymentFailed,
}
}
}

View File

@ -15,3 +15,5 @@ pub mod revenue_recovery;
pub mod process_dispute;
pub mod dispute_list;
pub mod invoice_sync;

View File

@ -0,0 +1,436 @@
#[cfg(feature = "v1")]
use api_models::subscription as subscription_types;
use async_trait::async_trait;
use common_utils::{
errors::CustomResult,
ext_traits::{StringExt, ValueExt},
};
use diesel_models::{
invoice::Invoice, process_tracker::business_status, subscription::Subscription,
};
use error_stack::ResultExt;
use router_env::logger;
use scheduler::{
consumer::{self, workflows::ProcessTrackerWorkflow},
errors,
types::process_data,
utils as scheduler_utils,
};
#[cfg(feature = "v1")]
use crate::core::subscription::{
billing_processor_handler as billing, invoice_handler, payments_api_client,
};
use crate::{
db::StorageInterface,
errors as router_errors,
routes::SessionState,
types::{domain, storage},
};
const INVOICE_SYNC_WORKFLOW: &str = "INVOICE_SYNC";
const INVOICE_SYNC_WORKFLOW_TAG: &str = "INVOICE";
pub struct InvoiceSyncWorkflow;
pub struct InvoiceSyncHandler<'a> {
pub state: &'a SessionState,
pub tracking_data: storage::invoice_sync::InvoiceSyncTrackingData,
pub key_store: domain::MerchantKeyStore,
pub merchant_account: domain::MerchantAccount,
pub customer: domain::Customer,
pub profile: domain::Profile,
pub subscription: Subscription,
pub invoice: Invoice,
}
#[cfg(feature = "v1")]
impl<'a> InvoiceSyncHandler<'a> {
pub async fn create(
state: &'a SessionState,
tracking_data: storage::invoice_sync::InvoiceSyncTrackingData,
) -> Result<Self, errors::ProcessTrackerError> {
let key_manager_state = &state.into();
let key_store = state
.store
.get_merchant_key_store_by_merchant_id(
key_manager_state,
&tracking_data.merchant_id,
&state.store.get_master_key().to_vec().into(),
)
.await
.attach_printable("Failed to fetch Merchant key store from DB")?;
let merchant_account = state
.store
.find_merchant_account_by_merchant_id(
key_manager_state,
&tracking_data.merchant_id,
&key_store,
)
.await
.attach_printable("Subscriptions: Failed to fetch Merchant Account from DB")?;
let profile = state
.store
.find_business_profile_by_profile_id(
&(state).into(),
&key_store,
&tracking_data.profile_id,
)
.await
.attach_printable("Subscriptions: Failed to fetch Business Profile from DB")?;
let customer = state
.store
.find_customer_by_customer_id_merchant_id(
&(state).into(),
&tracking_data.customer_id,
merchant_account.get_id(),
&key_store,
merchant_account.storage_scheme,
)
.await
.attach_printable("Subscriptions: Failed to fetch Customer from DB")?;
let subscription = state
.store
.find_by_merchant_id_subscription_id(
merchant_account.get_id(),
tracking_data.subscription_id.get_string_repr().to_string(),
)
.await
.attach_printable("Subscriptions: Failed to fetch subscription from DB")?;
let invoice = state
.store
.find_invoice_by_invoice_id(tracking_data.invoice_id.get_string_repr().to_string())
.await
.attach_printable("invoices: unable to get latest invoice from database")?;
Ok(Self {
state,
tracking_data,
key_store,
merchant_account,
customer,
profile,
subscription,
invoice,
})
}
async fn finish_process_with_business_status(
&self,
process: &storage::ProcessTracker,
business_status: &'static str,
) -> CustomResult<(), router_errors::ApiErrorResponse> {
self.state
.store
.as_scheduler()
.finish_process_with_business_status(process.clone(), business_status)
.await
.change_context(router_errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to update process tracker status")
}
pub async fn perform_payments_sync(
&self,
) -> CustomResult<subscription_types::PaymentResponseData, router_errors::ApiErrorResponse>
{
let payment_id = self.invoice.payment_intent_id.clone().ok_or(
router_errors::ApiErrorResponse::SubscriptionError {
operation: "Invoice_sync: Missing Payment Intent ID in Invoice".to_string(),
},
)?;
let payments_response = payments_api_client::PaymentsApiClient::sync_payment(
self.state,
payment_id.get_string_repr().to_string(),
self.merchant_account.get_id().get_string_repr(),
self.profile.get_id().get_string_repr(),
)
.await
.change_context(router_errors::ApiErrorResponse::SubscriptionError {
operation: "Invoice_sync: Failed to sync payment status from payments microservice"
.to_string(),
})
.attach_printable("Failed to sync payment status from payments microservice")?;
Ok(payments_response)
}
pub async fn perform_billing_processor_record_back(
&self,
payment_response: subscription_types::PaymentResponseData,
payment_status: common_enums::AttemptStatus,
connector_invoice_id: String,
invoice_sync_status: storage::invoice_sync::InvoiceSyncPaymentStatus,
) -> CustomResult<(), router_errors::ApiErrorResponse> {
logger::info!("perform_billing_processor_record_back");
let billing_handler = billing::BillingHandler::create(
self.state,
&self.merchant_account,
&self.key_store,
self.customer.clone(),
self.profile.clone(),
)
.await
.attach_printable("Failed to create billing handler")?;
let invoice_handler = invoice_handler::InvoiceHandler::new(
self.subscription.clone(),
self.merchant_account.clone(),
self.profile.clone(),
);
// TODO: Handle retries here on failure
billing_handler
.record_back_to_billing_processor(
self.state,
connector_invoice_id.clone(),
payment_response.payment_id.to_owned(),
payment_status,
payment_response.amount,
payment_response.currency,
payment_response.payment_method_type,
)
.await
.attach_printable("Failed to record back to billing processor")?;
invoice_handler
.update_invoice(
self.state,
self.invoice.id.to_owned(),
None,
common_enums::connector_enums::InvoiceStatus::from(invoice_sync_status),
)
.await
.attach_printable("Failed to update invoice in DB")?;
Ok(())
}
pub async fn transition_workflow_state(
&self,
process: storage::ProcessTracker,
payment_response: subscription_types::PaymentResponseData,
connector_invoice_id: String,
) -> CustomResult<(), router_errors::ApiErrorResponse> {
let invoice_sync_status =
storage::invoice_sync::InvoiceSyncPaymentStatus::from(payment_response.status);
match invoice_sync_status {
storage::invoice_sync::InvoiceSyncPaymentStatus::PaymentSucceeded => {
Box::pin(self.perform_billing_processor_record_back(
payment_response,
common_enums::AttemptStatus::Charged,
connector_invoice_id,
invoice_sync_status,
))
.await
.attach_printable("Failed to record back to billing processor")?;
self.finish_process_with_business_status(&process, business_status::COMPLETED_BY_PT)
.await
.change_context(router_errors::ApiErrorResponse::SubscriptionError {
operation: "Invoice_sync process_tracker task completion".to_string(),
})
.attach_printable("Failed to update process tracker status")
}
storage::invoice_sync::InvoiceSyncPaymentStatus::PaymentProcessing => {
retry_subscription_invoice_sync_task(
&*self.state.store,
self.tracking_data.connector_name.to_string().clone(),
self.merchant_account.get_id().to_owned(),
process,
)
.await
.change_context(router_errors::ApiErrorResponse::SubscriptionError {
operation: "Invoice_sync process_tracker task retry".to_string(),
})
.attach_printable("Failed to update process tracker status")
}
storage::invoice_sync::InvoiceSyncPaymentStatus::PaymentFailed => {
Box::pin(self.perform_billing_processor_record_back(
payment_response,
common_enums::AttemptStatus::Failure,
connector_invoice_id,
invoice_sync_status,
))
.await
.attach_printable("Failed to record back to billing processor")?;
self.finish_process_with_business_status(&process, business_status::COMPLETED_BY_PT)
.await
.change_context(router_errors::ApiErrorResponse::SubscriptionError {
operation: "Invoice_sync process_tracker task completion".to_string(),
})
.attach_printable("Failed to update process tracker status")
}
}
}
}
#[async_trait]
impl ProcessTrackerWorkflow<SessionState> for InvoiceSyncWorkflow {
#[cfg(feature = "v1")]
async fn execute_workflow<'a>(
&'a self,
state: &'a SessionState,
process: storage::ProcessTracker,
) -> Result<(), errors::ProcessTrackerError> {
let tracking_data = process
.tracking_data
.clone()
.parse_value::<storage::invoice_sync::InvoiceSyncTrackingData>(
"InvoiceSyncTrackingData",
)?;
match process.name.as_deref() {
Some(INVOICE_SYNC_WORKFLOW) => {
Box::pin(perform_subscription_invoice_sync(
state,
process,
tracking_data,
))
.await
}
_ => Err(errors::ProcessTrackerError::JobNotFound),
}
}
async fn error_handler<'a>(
&'a self,
state: &'a SessionState,
process: storage::ProcessTracker,
error: errors::ProcessTrackerError,
) -> CustomResult<(), errors::ProcessTrackerError> {
logger::error!("Encountered error");
consumer::consumer_error_handler(state.store.as_scheduler(), process, error).await
}
#[cfg(feature = "v2")]
async fn execute_workflow<'a>(
&'a self,
state: &'a SessionState,
process: storage::ProcessTracker,
) -> Result<(), errors::ProcessTrackerError> {
Ok(())
}
}
#[cfg(feature = "v1")]
async fn perform_subscription_invoice_sync(
state: &SessionState,
process: storage::ProcessTracker,
tracking_data: storage::invoice_sync::InvoiceSyncTrackingData,
) -> Result<(), errors::ProcessTrackerError> {
let handler = InvoiceSyncHandler::create(state, tracking_data).await?;
let payment_status = handler.perform_payments_sync().await?;
Box::pin(handler.transition_workflow_state(
process,
payment_status,
handler.tracking_data.connector_invoice_id.clone(),
))
.await?;
Ok(())
}
pub async fn create_invoice_sync_job(
state: &SessionState,
request: storage::invoice_sync::InvoiceSyncRequest,
) -> CustomResult<(), router_errors::ApiErrorResponse> {
let tracking_data = storage::invoice_sync::InvoiceSyncTrackingData::from(request);
let process_tracker_entry = diesel_models::ProcessTrackerNew::new(
common_utils::generate_id(crate::consts::ID_LENGTH, "proc"),
INVOICE_SYNC_WORKFLOW.to_string(),
common_enums::ProcessTrackerRunner::InvoiceSyncflow,
vec![INVOICE_SYNC_WORKFLOW_TAG.to_string()],
tracking_data,
Some(0),
common_utils::date_time::now(),
common_types::consts::API_VERSION,
)
.change_context(router_errors::ApiErrorResponse::InternalServerError)
.attach_printable("subscriptions: unable to form process_tracker type")?;
state
.store
.insert_process(process_tracker_entry)
.await
.change_context(router_errors::ApiErrorResponse::InternalServerError)
.attach_printable("subscriptions: unable to insert process_tracker entry in DB")?;
Ok(())
}
pub async fn get_subscription_invoice_sync_process_schedule_time(
db: &dyn StorageInterface,
connector: &str,
merchant_id: &common_utils::id_type::MerchantId,
retry_count: i32,
) -> Result<Option<time::PrimitiveDateTime>, errors::ProcessTrackerError> {
let mapping: CustomResult<
process_data::SubscriptionInvoiceSyncPTMapping,
router_errors::StorageError,
> = db
.find_config_by_key(&format!("invoice_sync_pt_mapping_{connector}"))
.await
.map(|value| value.config)
.and_then(|config| {
config
.parse_struct("SubscriptionInvoiceSyncPTMapping")
.change_context(router_errors::StorageError::DeserializationFailed)
.attach_printable("Failed to deserialize invoice_sync_pt_mapping config to struct")
});
let mapping = match mapping {
Ok(x) => x,
Err(error) => {
logger::info!(?error, "Redis Mapping Error");
process_data::SubscriptionInvoiceSyncPTMapping::default()
}
};
let time_delta = scheduler_utils::get_subscription_invoice_sync_retry_schedule_time(
mapping,
merchant_id,
retry_count,
);
Ok(scheduler_utils::get_time_from_delta(time_delta))
}
pub async fn retry_subscription_invoice_sync_task(
db: &dyn StorageInterface,
connector: String,
merchant_id: common_utils::id_type::MerchantId,
pt: storage::ProcessTracker,
) -> Result<(), errors::ProcessTrackerError> {
let schedule_time = get_subscription_invoice_sync_process_schedule_time(
db,
connector.as_str(),
&merchant_id,
pt.retry_count + 1,
)
.await?;
match schedule_time {
Some(s_time) => {
db.as_scheduler()
.retry_process(pt, s_time)
.await
.attach_printable("Failed to retry subscription invoice sync task")?;
}
None => {
db.as_scheduler()
.finish_process_with_business_status(pt, business_status::RETRIES_EXCEEDED)
.await
.attach_printable("Failed to finish subscription invoice sync task")?;
}
}
Ok(())
}