feat(subscription): domain_model for subscription and invoice (#9640)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: Jagan <jaganelavarasan@gmail.com>
This commit is contained in:
Ankit Kumar Gupta
2025-10-09 15:13:56 +05:30
committed by GitHub
parent a968844589
commit 17986c3b31
15 changed files with 1093 additions and 386 deletions

View File

@ -81,11 +81,13 @@ pub async fn create_subscription(
.attach_printable("subscriptions: failed to create invoice")?;
subscription
.update_subscription(diesel_models::subscription::SubscriptionUpdate::new(
payment.payment_method_id.clone(),
None,
None,
))
.update_subscription(
hyperswitch_domain_models::subscription::SubscriptionUpdate::new(
payment.payment_method_id.clone(),
None,
None,
),
)
.await
.attach_printable("subscriptions: failed to update subscription")?;
@ -146,7 +148,6 @@ pub async fn get_subscription_plans(
}
Ok(ApplicationResponse::Json(response))
}
/// Creates and confirms a subscription in one operation.
pub async fn create_and_confirm_subscription(
state: SessionState,
@ -253,16 +254,18 @@ pub async fn create_and_confirm_subscription(
.await?;
subs_handler
.update_subscription(diesel_models::subscription::SubscriptionUpdate::new(
payment_response.payment_method_id.clone(),
Some(SubscriptionStatus::from(subscription_create_response.status).to_string()),
Some(
subscription_create_response
.subscription_id
.get_string_repr()
.to_string(),
.update_subscription(
hyperswitch_domain_models::subscription::SubscriptionUpdate::new(
payment_response.payment_method_id.clone(),
Some(SubscriptionStatus::from(subscription_create_response.status).to_string()),
Some(
subscription_create_response
.subscription_id
.get_string_repr()
.to_string(),
),
),
))
)
.await?;
let response = subs_handler.generate_response(
@ -386,16 +389,18 @@ pub async fn confirm_subscription(
.await?;
subscription_entry
.update_subscription(diesel_models::subscription::SubscriptionUpdate::new(
payment_response.payment_method_id.clone(),
Some(SubscriptionStatus::from(subscription_create_response.status).to_string()),
Some(
subscription_create_response
.subscription_id
.get_string_repr()
.to_string(),
.update_subscription(
hyperswitch_domain_models::subscription::SubscriptionUpdate::new(
payment_response.payment_method_id.clone(),
Some(SubscriptionStatus::from(subscription_create_response.status).to_string()),
Some(
subscription_create_response
.subscription_id
.get_string_repr()
.to_string(),
),
),
))
)
.await?;
let response = subscription_entry.generate_response(

View File

@ -176,7 +176,7 @@ impl BillingHandler {
pub async fn create_subscription_on_connector(
&self,
state: &SessionState,
subscription: diesel_models::subscription::Subscription,
subscription: hyperswitch_domain_models::subscription::Subscription,
item_price_id: Option<String>,
billing_address: Option<api_models::payments::Address>,
) -> errors::RouterResult<subscription_response_types::SubscriptionCreateResponse> {

View File

@ -10,12 +10,14 @@ use masking::{PeekInterface, Secret};
use super::errors;
use crate::{
core::subscription::payments_api_client, routes::SessionState, types::storage as storage_types,
core::{errors::utils::StorageErrorExt, subscription::payments_api_client},
routes::SessionState,
types::storage as storage_types,
workflows::invoice_sync as invoice_sync_workflow,
};
pub struct InvoiceHandler {
pub subscription: diesel_models::subscription::Subscription,
pub subscription: hyperswitch_domain_models::subscription::Subscription,
pub merchant_account: hyperswitch_domain_models::merchant_account::MerchantAccount,
pub profile: hyperswitch_domain_models::business_profile::Profile,
}
@ -23,7 +25,7 @@ pub struct InvoiceHandler {
#[allow(clippy::todo)]
impl InvoiceHandler {
pub fn new(
subscription: diesel_models::subscription::Subscription,
subscription: hyperswitch_domain_models::subscription::Subscription,
merchant_account: hyperswitch_domain_models::merchant_account::MerchantAccount,
profile: hyperswitch_domain_models::business_profile::Profile,
) -> Self {
@ -45,8 +47,8 @@ impl InvoiceHandler {
provider_name: connector_enums::Connector,
metadata: Option<pii::SecretSerdeValue>,
connector_invoice_id: Option<String>,
) -> errors::RouterResult<diesel_models::invoice::Invoice> {
let invoice_new = diesel_models::invoice::InvoiceNew::new(
) -> errors::RouterResult<hyperswitch_domain_models::invoice::Invoice> {
let invoice_new = hyperswitch_domain_models::invoice::Invoice::to_invoice(
self.subscription.id.to_owned(),
self.subscription.merchant_id.to_owned(),
self.subscription.profile_id.to_owned(),
@ -62,9 +64,19 @@ impl InvoiceHandler {
connector_invoice_id,
);
let key_manager_state = &(state).into();
let merchant_key_store = state
.store
.get_merchant_key_store_by_merchant_id(
key_manager_state,
self.merchant_account.get_id(),
&state.store.get_master_key().to_vec().into(),
)
.await
.to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound)?;
let invoice = state
.store
.insert_invoice_entry(invoice_new)
.insert_invoice_entry(key_manager_state, &merchant_key_store, invoice_new)
.await
.change_context(errors::ApiErrorResponse::SubscriptionError {
operation: "Create Invoice".to_string(),
@ -82,16 +94,31 @@ impl InvoiceHandler {
payment_intent_id: Option<common_utils::id_type::PaymentId>,
status: connector_enums::InvoiceStatus,
connector_invoice_id: Option<String>,
) -> errors::RouterResult<diesel_models::invoice::Invoice> {
let update_invoice = diesel_models::invoice::InvoiceUpdate::new(
) -> errors::RouterResult<hyperswitch_domain_models::invoice::Invoice> {
let update_invoice = hyperswitch_domain_models::invoice::InvoiceUpdate::new(
payment_method_id.as_ref().map(|id| id.peek()).cloned(),
Some(status),
connector_invoice_id,
payment_intent_id,
);
let key_manager_state = &(state).into();
let merchant_key_store = state
.store
.get_merchant_key_store_by_merchant_id(
key_manager_state,
self.merchant_account.get_id(),
&state.store.get_master_key().to_vec().into(),
)
.await
.to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound)?;
state
.store
.update_invoice_entry(invoice_id.get_string_repr().to_string(), update_invoice)
.update_invoice_entry(
key_manager_state,
&merchant_key_store,
invoice_id.get_string_repr().to_string(),
update_invoice,
)
.await
.change_context(errors::ApiErrorResponse::SubscriptionError {
operation: "Invoice Update".to_string(),
@ -213,10 +240,24 @@ impl InvoiceHandler {
pub async fn get_latest_invoice(
&self,
state: &SessionState,
) -> errors::RouterResult<diesel_models::invoice::Invoice> {
) -> errors::RouterResult<hyperswitch_domain_models::invoice::Invoice> {
let key_manager_state = &(state).into();
let merchant_key_store = state
.store
.get_merchant_key_store_by_merchant_id(
key_manager_state,
self.merchant_account.get_id(),
&state.store.get_master_key().to_vec().into(),
)
.await
.to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound)?;
state
.store
.get_latest_invoice_for_subscription(self.subscription.id.get_string_repr().to_string())
.get_latest_invoice_for_subscription(
key_manager_state,
&merchant_key_store,
self.subscription.id.get_string_repr().to_string(),
)
.await
.change_context(errors::ApiErrorResponse::SubscriptionError {
operation: "Get Latest Invoice".to_string(),
@ -228,10 +269,24 @@ impl InvoiceHandler {
&self,
state: &SessionState,
invoice_id: common_utils::id_type::InvoiceId,
) -> errors::RouterResult<diesel_models::invoice::Invoice> {
) -> errors::RouterResult<hyperswitch_domain_models::invoice::Invoice> {
let key_manager_state = &(state).into();
let merchant_key_store = state
.store
.get_merchant_key_store_by_merchant_id(
key_manager_state,
self.merchant_account.get_id(),
&state.store.get_master_key().to_vec().into(),
)
.await
.to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound)?;
state
.store
.find_invoice_by_invoice_id(invoice_id.get_string_repr().to_string())
.find_invoice_by_invoice_id(
key_manager_state,
&merchant_key_store,
invoice_id.get_string_repr().to_string(),
)
.await
.change_context(errors::ApiErrorResponse::SubscriptionError {
operation: "Get Invoice by ID".to_string(),
@ -242,7 +297,7 @@ impl InvoiceHandler {
pub async fn create_invoice_sync_job(
&self,
state: &SessionState,
invoice: &diesel_models::invoice::Invoice,
invoice: &hyperswitch_domain_models::invoice::Invoice,
connector_invoice_id: String,
connector_name: connector_enums::Connector,
) -> errors::RouterResult<()> {

View File

@ -2,15 +2,15 @@ use std::str::FromStr;
use api_models::{
enums as api_enums,
subscription::{self as subscription_types, SubscriptionResponse, SubscriptionStatus},
subscription::{self as subscription_types, SubscriptionResponse},
};
use common_enums::connector_enums;
use common_utils::{consts, ext_traits::OptionExt};
use diesel_models::subscription::SubscriptionNew;
use error_stack::ResultExt;
use hyperswitch_domain_models::{
merchant_context::MerchantContext,
router_response_types::subscriptions as subscription_response_types,
subscription::{Subscription, SubscriptionStatus},
};
use masking::Secret;
@ -48,28 +48,33 @@ impl<'a> SubscriptionHandler<'a> {
let store = self.state.store.clone();
let db = store.as_ref();
let mut subscription = SubscriptionNew::new(
subscription_id,
SubscriptionStatus::Created.to_string(),
Some(billing_processor.to_string()),
None,
Some(merchant_connector_id),
None,
None,
self.merchant_context
let mut subscription = Subscription {
id: subscription_id,
status: SubscriptionStatus::Created.to_string(),
billing_processor: Some(billing_processor.to_string()),
payment_method_id: None,
merchant_connector_id: Some(merchant_connector_id),
client_secret: None,
connector_subscription_id: None,
merchant_id: self
.merchant_context
.get_merchant_account()
.get_id()
.clone(),
customer_id.clone(),
None,
profile.get_id().clone(),
customer_id: customer_id.clone(),
metadata: None,
profile_id: profile.get_id().clone(),
merchant_reference_id,
);
created_at: common_utils::date_time::now(),
modified_at: common_utils::date_time::now(),
};
subscription.generate_and_set_client_secret();
let key_manager_state = &(self.state).into();
let merchant_key_store = self.merchant_context.get_merchant_key_store();
let new_subscription = db
.insert_subscription_entry(subscription)
.insert_subscription_entry(key_manager_state, merchant_key_store, subscription)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("subscriptions: unable to insert subscription entry to database")?;
@ -129,10 +134,15 @@ impl<'a> SubscriptionHandler<'a> {
) -> errors::RouterResult<()> {
let subscription_id = client_secret.get_subscription_id()?;
let key_manager_state = &(self.state).into();
let key_store = self.merchant_context.get_merchant_key_store();
let subscription = self
.state
.store
.find_by_merchant_id_subscription_id(
key_manager_state,
key_store,
self.merchant_context.get_merchant_account().get_id(),
subscription_id.to_string(),
)
@ -150,7 +160,7 @@ impl<'a> SubscriptionHandler<'a> {
pub fn validate_client_secret(
&self,
client_secret: &hyperswitch_domain_models::subscription::ClientSecret,
subscription: &diesel_models::subscription::Subscription,
subscription: &Subscription,
) -> errors::RouterResult<()> {
let stored_client_secret = subscription
.client_secret
@ -185,6 +195,8 @@ impl<'a> SubscriptionHandler<'a> {
.state
.store
.find_by_merchant_id_subscription_id(
&(self.state).into(),
self.merchant_context.get_merchant_key_store(),
self.merchant_context.get_merchant_account().get_id(),
subscription_id.get_string_repr().to_string().clone(),
)
@ -205,21 +217,21 @@ impl<'a> SubscriptionHandler<'a> {
}
pub struct SubscriptionWithHandler<'a> {
pub handler: &'a SubscriptionHandler<'a>,
pub subscription: diesel_models::subscription::Subscription,
pub subscription: Subscription,
pub merchant_account: hyperswitch_domain_models::merchant_account::MerchantAccount,
}
impl SubscriptionWithHandler<'_> {
pub fn generate_response(
&self,
invoice: &diesel_models::invoice::Invoice,
invoice: &hyperswitch_domain_models::invoice::Invoice,
payment_response: &subscription_types::PaymentResponseData,
status: subscription_response_types::SubscriptionStatus,
) -> errors::RouterResult<subscription_types::ConfirmSubscriptionResponse> {
Ok(subscription_types::ConfirmSubscriptionResponse {
id: self.subscription.id.clone(),
merchant_reference_id: self.subscription.merchant_reference_id.clone(),
status: SubscriptionStatus::from(status),
status: subscription_types::SubscriptionStatus::from(status),
plan_id: None,
profile_id: self.subscription.profile_id.to_owned(),
payment: Some(payment_response.clone()),
@ -234,13 +246,13 @@ impl SubscriptionWithHandler<'_> {
pub fn to_subscription_response(
&self,
payment: Option<subscription_types::PaymentResponseData>,
invoice: Option<&diesel_models::invoice::Invoice>,
invoice: Option<&hyperswitch_domain_models::invoice::Invoice>,
) -> errors::RouterResult<SubscriptionResponse> {
Ok(SubscriptionResponse::new(
self.subscription.id.clone(),
self.subscription.merchant_reference_id.clone(),
SubscriptionStatus::from_str(&self.subscription.status)
.unwrap_or(SubscriptionStatus::Created),
subscription_types::SubscriptionStatus::from_str(&self.subscription.status)
.unwrap_or(subscription_types::SubscriptionStatus::Created),
None,
self.subscription.profile_id.to_owned(),
self.subscription.merchant_id.to_owned(),
@ -259,11 +271,13 @@ impl SubscriptionWithHandler<'_> {
pub async fn update_subscription(
&mut self,
subscription_update: diesel_models::subscription::SubscriptionUpdate,
subscription_update: hyperswitch_domain_models::subscription::SubscriptionUpdate,
) -> errors::RouterResult<()> {
let db = self.handler.state.store.as_ref();
let updated_subscription = db
.update_subscription_entry(
&(self.handler.state).into(),
self.handler.merchant_context.get_merchant_key_store(),
self.handler
.merchant_context
.get_merchant_account()
@ -363,10 +377,12 @@ impl SubscriptionWithHandler<'_> {
}
}
impl ForeignTryFrom<&diesel_models::invoice::Invoice> for subscription_types::Invoice {
impl ForeignTryFrom<&hyperswitch_domain_models::invoice::Invoice> for subscription_types::Invoice {
type Error = error_stack::Report<errors::ApiErrorResponse>;
fn foreign_try_from(invoice: &diesel_models::invoice::Invoice) -> Result<Self, Self::Error> {
fn foreign_try_from(
invoice: &hyperswitch_domain_models::invoice::Invoice,
) -> Result<Self, Self::Error> {
Ok(Self {
id: invoice.id.clone(),
subscription_id: invoice.subscription_id.clone(),

View File

@ -21,7 +21,6 @@ pub mod generic_link;
pub mod gsm;
pub mod health_check;
pub mod hyperswitch_ai_interaction;
pub mod invoice;
pub mod kafka_store;
pub mod locker_mock_up;
pub mod mandate;
@ -36,7 +35,6 @@ pub mod relay;
pub mod reverse_lookup;
pub mod role;
pub mod routing_algorithm;
pub mod subscription;
pub mod unified_translations;
pub mod user;
pub mod user_authentication_method;
@ -147,8 +145,8 @@ pub trait StorageInterface:
+ payment_method_session::PaymentMethodsSessionInterface
+ tokenization::TokenizationInterface
+ callback_mapper::CallbackMapperInterface
+ subscription::SubscriptionInterface
+ invoice::InvoiceInterface
+ storage_impl::subscription::SubscriptionInterface<Error = StorageError>
+ storage_impl::invoice::InvoiceInterface<Error = StorageError>
+ 'static
{
fn get_scheduler_db(&self) -> Box<dyn scheduler::SchedulerInterface>;

View File

@ -1,169 +0,0 @@
use error_stack::report;
use router_env::{instrument, tracing};
use storage_impl::MockDb;
use super::Store;
use crate::{
connection,
core::errors::{self, CustomResult},
db::kafka_store::KafkaStore,
types::storage,
};
#[async_trait::async_trait]
pub trait InvoiceInterface {
async fn insert_invoice_entry(
&self,
invoice_new: storage::invoice::InvoiceNew,
) -> CustomResult<storage::Invoice, errors::StorageError>;
async fn find_invoice_by_invoice_id(
&self,
invoice_id: String,
) -> CustomResult<storage::Invoice, errors::StorageError>;
async fn update_invoice_entry(
&self,
invoice_id: String,
data: storage::invoice::InvoiceUpdate,
) -> CustomResult<storage::Invoice, errors::StorageError>;
async fn get_latest_invoice_for_subscription(
&self,
subscription_id: String,
) -> CustomResult<storage::Invoice, errors::StorageError>;
}
#[async_trait::async_trait]
impl InvoiceInterface for Store {
#[instrument(skip_all)]
async fn insert_invoice_entry(
&self,
invoice_new: storage::invoice::InvoiceNew,
) -> CustomResult<storage::Invoice, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
invoice_new
.insert(&conn)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
#[instrument(skip_all)]
async fn find_invoice_by_invoice_id(
&self,
invoice_id: String,
) -> CustomResult<storage::Invoice, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage::Invoice::find_invoice_by_id_invoice_id(&conn, invoice_id)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
#[instrument(skip_all)]
async fn update_invoice_entry(
&self,
invoice_id: String,
data: storage::invoice::InvoiceUpdate,
) -> CustomResult<storage::Invoice, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
storage::Invoice::update_invoice_entry(&conn, invoice_id, data)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
#[instrument(skip_all)]
async fn get_latest_invoice_for_subscription(
&self,
subscription_id: String,
) -> CustomResult<storage::Invoice, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
storage::Invoice::list_invoices_by_subscription_id(
&conn,
subscription_id.clone(),
Some(1),
None,
false,
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
.map(|e| e.last().cloned())?
.ok_or(report!(errors::StorageError::ValueNotFound(format!(
"Invoice not found for subscription_id: {}",
subscription_id
))))
}
}
#[async_trait::async_trait]
impl InvoiceInterface for MockDb {
#[instrument(skip_all)]
async fn insert_invoice_entry(
&self,
_invoice_new: storage::invoice::InvoiceNew,
) -> CustomResult<storage::Invoice, errors::StorageError> {
Err(errors::StorageError::MockDbError)?
}
async fn find_invoice_by_invoice_id(
&self,
_invoice_id: String,
) -> CustomResult<storage::Invoice, errors::StorageError> {
Err(errors::StorageError::MockDbError)?
}
async fn update_invoice_entry(
&self,
_invoice_id: String,
_data: storage::invoice::InvoiceUpdate,
) -> CustomResult<storage::Invoice, errors::StorageError> {
Err(errors::StorageError::MockDbError)?
}
async fn get_latest_invoice_for_subscription(
&self,
_subscription_id: String,
) -> CustomResult<storage::Invoice, errors::StorageError> {
Err(errors::StorageError::MockDbError)?
}
}
#[async_trait::async_trait]
impl InvoiceInterface for KafkaStore {
#[instrument(skip_all)]
async fn insert_invoice_entry(
&self,
invoice_new: storage::invoice::InvoiceNew,
) -> CustomResult<storage::Invoice, errors::StorageError> {
self.diesel_store.insert_invoice_entry(invoice_new).await
}
#[instrument(skip_all)]
async fn find_invoice_by_invoice_id(
&self,
invoice_id: String,
) -> CustomResult<storage::Invoice, errors::StorageError> {
self.diesel_store
.find_invoice_by_invoice_id(invoice_id)
.await
}
#[instrument(skip_all)]
async fn update_invoice_entry(
&self,
invoice_id: String,
data: storage::invoice::InvoiceUpdate,
) -> CustomResult<storage::Invoice, errors::StorageError> {
self.diesel_store
.update_invoice_entry(invoice_id, data)
.await
}
async fn get_latest_invoice_for_subscription(
&self,
subscription_id: String,
) -> CustomResult<storage::Invoice, errors::StorageError> {
self.diesel_store
.get_latest_invoice_for_subscription(subscription_id)
.await
}
}

View File

@ -23,9 +23,14 @@ use hyperswitch_domain_models::payouts::{
use hyperswitch_domain_models::{
cards_info::CardsInfoInterface,
disputes,
invoice::{Invoice as DomainInvoice, InvoiceInterface, InvoiceUpdate as DomainInvoiceUpdate},
payment_methods::PaymentMethodInterface,
payments::{payment_attempt::PaymentAttemptInterface, payment_intent::PaymentIntentInterface},
refunds,
subscription::{
Subscription as DomainSubscription, SubscriptionInterface,
SubscriptionUpdate as DomainSubscriptionUpdate,
},
};
#[cfg(not(feature = "payouts"))]
use hyperswitch_domain_models::{PayoutAttemptInterface, PayoutsInterface};
@ -4335,3 +4340,101 @@ impl TokenizationInterface for KafkaStore {
#[cfg(not(all(feature = "v2", feature = "tokenization_v2")))]
impl TokenizationInterface for KafkaStore {}
#[async_trait::async_trait]
impl InvoiceInterface for KafkaStore {
type Error = errors::StorageError;
#[instrument(skip_all)]
async fn insert_invoice_entry(
&self,
state: &KeyManagerState,
key_store: &hyperswitch_domain_models::merchant_key_store::MerchantKeyStore,
invoice_new: DomainInvoice,
) -> CustomResult<DomainInvoice, errors::StorageError> {
self.diesel_store
.insert_invoice_entry(state, key_store, invoice_new)
.await
}
#[instrument(skip_all)]
async fn find_invoice_by_invoice_id(
&self,
state: &KeyManagerState,
key_store: &hyperswitch_domain_models::merchant_key_store::MerchantKeyStore,
invoice_id: String,
) -> CustomResult<DomainInvoice, errors::StorageError> {
self.diesel_store
.find_invoice_by_invoice_id(state, key_store, invoice_id)
.await
}
#[instrument(skip_all)]
async fn update_invoice_entry(
&self,
state: &KeyManagerState,
key_store: &hyperswitch_domain_models::merchant_key_store::MerchantKeyStore,
invoice_id: String,
data: DomainInvoiceUpdate,
) -> CustomResult<DomainInvoice, errors::StorageError> {
self.diesel_store
.update_invoice_entry(state, key_store, invoice_id, data)
.await
}
#[instrument(skip_all)]
async fn get_latest_invoice_for_subscription(
&self,
state: &KeyManagerState,
key_store: &hyperswitch_domain_models::merchant_key_store::MerchantKeyStore,
subscription_id: String,
) -> CustomResult<DomainInvoice, errors::StorageError> {
self.diesel_store
.get_latest_invoice_for_subscription(state, key_store, subscription_id)
.await
}
}
#[async_trait::async_trait]
impl SubscriptionInterface for KafkaStore {
type Error = errors::StorageError;
#[instrument(skip_all)]
async fn insert_subscription_entry(
&self,
state: &KeyManagerState,
key_store: &hyperswitch_domain_models::merchant_key_store::MerchantKeyStore,
subscription_new: DomainSubscription,
) -> CustomResult<DomainSubscription, errors::StorageError> {
self.diesel_store
.insert_subscription_entry(state, key_store, subscription_new)
.await
}
#[instrument(skip_all)]
async fn find_by_merchant_id_subscription_id(
&self,
state: &KeyManagerState,
key_store: &hyperswitch_domain_models::merchant_key_store::MerchantKeyStore,
merchant_id: &id_type::MerchantId,
subscription_id: String,
) -> CustomResult<DomainSubscription, errors::StorageError> {
self.diesel_store
.find_by_merchant_id_subscription_id(state, key_store, merchant_id, subscription_id)
.await
}
#[instrument(skip_all)]
async fn update_subscription_entry(
&self,
state: &KeyManagerState,
key_store: &hyperswitch_domain_models::merchant_key_store::MerchantKeyStore,
merchant_id: &id_type::MerchantId,
subscription_id: String,
data: DomainSubscriptionUpdate,
) -> CustomResult<DomainSubscription, errors::StorageError> {
self.diesel_store
.update_subscription_entry(state, key_store, merchant_id, subscription_id, data)
.await
}
}

View File

@ -1,140 +0,0 @@
use error_stack::report;
use router_env::{instrument, tracing};
use storage_impl::MockDb;
use super::Store;
use crate::{
connection,
core::errors::{self, CustomResult},
db::kafka_store::KafkaStore,
types::storage,
};
#[async_trait::async_trait]
pub trait SubscriptionInterface {
async fn insert_subscription_entry(
&self,
subscription_new: storage::subscription::SubscriptionNew,
) -> CustomResult<storage::Subscription, errors::StorageError>;
async fn find_by_merchant_id_subscription_id(
&self,
merchant_id: &common_utils::id_type::MerchantId,
subscription_id: String,
) -> CustomResult<storage::Subscription, errors::StorageError>;
async fn update_subscription_entry(
&self,
merchant_id: &common_utils::id_type::MerchantId,
subscription_id: String,
data: storage::SubscriptionUpdate,
) -> CustomResult<storage::Subscription, errors::StorageError>;
}
#[async_trait::async_trait]
impl SubscriptionInterface for Store {
#[instrument(skip_all)]
async fn insert_subscription_entry(
&self,
subscription_new: storage::subscription::SubscriptionNew,
) -> CustomResult<storage::Subscription, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
subscription_new
.insert(&conn)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
#[instrument(skip_all)]
async fn find_by_merchant_id_subscription_id(
&self,
merchant_id: &common_utils::id_type::MerchantId,
subscription_id: String,
) -> CustomResult<storage::Subscription, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
storage::Subscription::find_by_merchant_id_subscription_id(
&conn,
merchant_id,
subscription_id,
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
#[instrument(skip_all)]
async fn update_subscription_entry(
&self,
merchant_id: &common_utils::id_type::MerchantId,
subscription_id: String,
data: storage::SubscriptionUpdate,
) -> CustomResult<storage::Subscription, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
storage::Subscription::update_subscription_entry(&conn, merchant_id, subscription_id, data)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
}
#[async_trait::async_trait]
impl SubscriptionInterface for MockDb {
#[instrument(skip_all)]
async fn insert_subscription_entry(
&self,
_subscription_new: storage::subscription::SubscriptionNew,
) -> CustomResult<storage::Subscription, errors::StorageError> {
Err(errors::StorageError::MockDbError)?
}
async fn find_by_merchant_id_subscription_id(
&self,
_merchant_id: &common_utils::id_type::MerchantId,
_subscription_id: String,
) -> CustomResult<storage::Subscription, errors::StorageError> {
Err(errors::StorageError::MockDbError)?
}
async fn update_subscription_entry(
&self,
_merchant_id: &common_utils::id_type::MerchantId,
_subscription_id: String,
_data: storage::SubscriptionUpdate,
) -> CustomResult<storage::Subscription, errors::StorageError> {
Err(errors::StorageError::MockDbError)?
}
}
#[async_trait::async_trait]
impl SubscriptionInterface for KafkaStore {
#[instrument(skip_all)]
async fn insert_subscription_entry(
&self,
subscription_new: storage::subscription::SubscriptionNew,
) -> CustomResult<storage::Subscription, errors::StorageError> {
self.diesel_store
.insert_subscription_entry(subscription_new)
.await
}
#[instrument(skip_all)]
async fn find_by_merchant_id_subscription_id(
&self,
merchant_id: &common_utils::id_type::MerchantId,
subscription_id: String,
) -> CustomResult<storage::Subscription, errors::StorageError> {
self.diesel_store
.find_by_merchant_id_subscription_id(merchant_id, subscription_id)
.await
}
#[instrument(skip_all)]
async fn update_subscription_entry(
&self,
merchant_id: &common_utils::id_type::MerchantId,
subscription_id: String,
data: storage::SubscriptionUpdate,
) -> CustomResult<storage::Subscription, errors::StorageError> {
self.diesel_store
.update_subscription_entry(merchant_id, subscription_id, data)
.await
}
}

View File

@ -5,9 +5,7 @@ use common_utils::{
errors::CustomResult,
ext_traits::{StringExt, ValueExt},
};
use diesel_models::{
invoice::Invoice, process_tracker::business_status, subscription::Subscription,
};
use diesel_models::process_tracker::business_status;
use error_stack::ResultExt;
use router_env::logger;
use scheduler::{
@ -39,8 +37,8 @@ pub struct InvoiceSyncHandler<'a> {
pub merchant_account: domain::MerchantAccount,
pub customer: domain::Customer,
pub profile: domain::Profile,
pub subscription: Subscription,
pub invoice: Invoice,
pub subscription: hyperswitch_domain_models::subscription::Subscription,
pub invoice: hyperswitch_domain_models::invoice::Invoice,
}
#[cfg(feature = "v1")]
@ -95,6 +93,8 @@ impl<'a> InvoiceSyncHandler<'a> {
let subscription = state
.store
.find_by_merchant_id_subscription_id(
&state.into(),
&key_store,
merchant_account.get_id(),
tracking_data.subscription_id.get_string_repr().to_string(),
)
@ -103,7 +103,11 @@ impl<'a> InvoiceSyncHandler<'a> {
let invoice = state
.store
.find_invoice_by_invoice_id(tracking_data.invoice_id.get_string_repr().to_string())
.find_invoice_by_invoice_id(
&state.into(),
&key_store,
tracking_data.invoice_id.get_string_repr().to_string(),
)
.await
.attach_printable("invoices: unable to get latest invoice from database")?;