feat(platform): support incremental authorization for platform payments (#11146)

This commit is contained in:
Apoorv Dixit
2026-02-04 19:42:52 +05:30
committed by GitHub
parent 6fe246cc05
commit e2f3e948be
15 changed files with 72 additions and 43 deletions

View File

@@ -23,6 +23,7 @@ pub struct Authorization {
pub error_message: Option<String>,
pub connector_authorization_id: Option<String>,
pub previously_authorized_amount: MinorUnit,
pub processor_merchant_id: Option<common_utils::id_type::MerchantId>,
}
#[derive(Clone, Debug, Insertable, router_derive::DebugAsDisplay, Serialize, Deserialize)]
@@ -37,6 +38,7 @@ pub struct AuthorizationNew {
pub error_message: Option<String>,
pub connector_authorization_id: Option<String>,
pub previously_authorized_amount: MinorUnit,
pub processor_merchant_id: Option<common_utils::id_type::MerchantId>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@@ -17,9 +17,9 @@ impl AuthorizationNew {
}
impl Authorization {
pub async fn update_by_merchant_id_authorization_id(
pub async fn update_by_processor_merchant_id_authorization_id(
conn: &PgPooledConn,
merchant_id: common_utils::id_type::MerchantId,
processor_merchant_id: common_utils::id_type::MerchantId,
authorization_id: String,
authorization_update: AuthorizationUpdate,
) -> StorageResult<Self> {
@@ -30,8 +30,8 @@ impl Authorization {
_,
>(
conn,
dsl::merchant_id
.eq(merchant_id.to_owned())
dsl::processor_merchant_id
.eq(processor_merchant_id.to_owned())
.and(dsl::authorization_id.eq(authorization_id.to_owned())),
AuthorizationUpdateInternal::from(authorization_update),
)
@@ -44,8 +44,8 @@ impl Authorization {
errors::DatabaseError::NoFieldsToUpdate => {
generics::generic_find_one::<<Self as HasTable>::Table, _, _>(
conn,
dsl::merchant_id
.eq(merchant_id.to_owned())
dsl::processor_merchant_id
.eq(processor_merchant_id.to_owned())
.and(dsl::authorization_id.eq(authorization_id.to_owned())),
)
.await
@@ -56,15 +56,15 @@ impl Authorization {
}
}
pub async fn find_by_merchant_id_payment_id(
pub async fn find_by_processor_merchant_id_payment_id(
conn: &PgPooledConn,
merchant_id: &common_utils::id_type::MerchantId,
processor_merchant_id: &common_utils::id_type::MerchantId,
payment_id: &common_utils::id_type::PaymentId,
) -> StorageResult<Vec<Self>> {
generics::generic_filter::<<Self as HasTable>::Table, _, _, _>(
conn,
dsl::merchant_id
.eq(merchant_id.to_owned())
dsl::processor_merchant_id
.eq(processor_merchant_id.to_owned())
.and(dsl::payment_id.eq(payment_id.to_owned())),
None,
None,

View File

@@ -760,6 +760,8 @@ diesel::table! {
#[max_length = 64]
connector_authorization_id -> Nullable<Varchar>,
previously_authorized_amount -> Int8,
#[max_length = 64]
processor_merchant_id -> Nullable<Varchar>,
}
}

View File

@@ -774,6 +774,8 @@ diesel::table! {
#[max_length = 64]
connector_authorization_id -> Nullable<Varchar>,
previously_authorized_amount -> Int8,
#[max_length = 64]
processor_merchant_id -> Nullable<Varchar>,
}
}

View File

@@ -561,7 +561,7 @@ pub async fn get_token_pm_type_mandate_details(
verify_mandate_details_for_recurring_payments(
&payment_method_info.merchant_id,
platform.get_processor().get_account().get_id(),
platform.get_provider().get_account().get_id(),
&payment_method_info.customer_id,
customer_id,
)?;

View File

@@ -840,8 +840,8 @@ impl<F: Clone> PostUpdateTracker<F, PaymentData<F>, types::PaymentsIncrementalAu
)?;
state
.store
.update_authorization_by_merchant_id_authorization_id(
router_data.merchant_id.clone(),
.update_authorization_by_processor_merchant_id_authorization_id(
payment_data.payment_intent.processor_merchant_id.clone(),
authorization_id,
authorization_update,
)
@@ -851,8 +851,8 @@ impl<F: Clone> PostUpdateTracker<F, PaymentData<F>, types::PaymentsIncrementalAu
//Fetch all the authorizations of the payment and send in incremental authorization response
let authorizations = state
.store
.find_all_authorizations_by_merchant_id_payment_id(
&router_data.merchant_id,
.find_all_authorizations_by_processor_merchant_id_payment_id(
&payment_data.payment_intent.processor_merchant_id,
payment_data.payment_intent.get_id(),
)
.await

View File

@@ -375,7 +375,7 @@ async fn get_tracker_for_sync<
})?;
let authorizations = db
.find_all_authorizations_by_merchant_id_payment_id(
.find_all_authorizations_by_processor_merchant_id_payment_id(
platform.get_processor().get_account().get_id(),
&payment_id,
)

View File

@@ -246,6 +246,7 @@ impl<F: Clone + Sync>
error_message: None,
connector_authorization_id: None,
previously_authorized_amount: payment_data.payment_attempt.get_total_amount(),
processor_merchant_id: Some(payment_data.payment_intent.processor_merchant_id.clone()),
};
let authorization = state
.store

View File

@@ -16,15 +16,15 @@ pub trait AuthorizationInterface {
authorization: storage::AuthorizationNew,
) -> CustomResult<storage::Authorization, errors::StorageError>;
async fn find_all_authorizations_by_merchant_id_payment_id(
async fn find_all_authorizations_by_processor_merchant_id_payment_id(
&self,
merchant_id: &common_utils::id_type::MerchantId,
processor_merchant_id: &common_utils::id_type::MerchantId,
payment_id: &common_utils::id_type::PaymentId,
) -> CustomResult<Vec<storage::Authorization>, errors::StorageError>;
async fn update_authorization_by_merchant_id_authorization_id(
async fn update_authorization_by_processor_merchant_id_authorization_id(
&self,
merchant_id: common_utils::id_type::MerchantId,
processor_merchant_id: common_utils::id_type::MerchantId,
authorization_id: String,
authorization: storage::AuthorizationUpdate,
) -> CustomResult<storage::Authorization, errors::StorageError>;
@@ -45,28 +45,32 @@ impl AuthorizationInterface for Store {
}
#[instrument(skip_all)]
async fn find_all_authorizations_by_merchant_id_payment_id(
async fn find_all_authorizations_by_processor_merchant_id_payment_id(
&self,
merchant_id: &common_utils::id_type::MerchantId,
processor_merchant_id: &common_utils::id_type::MerchantId,
payment_id: &common_utils::id_type::PaymentId,
) -> CustomResult<Vec<storage::Authorization>, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage::Authorization::find_by_merchant_id_payment_id(&conn, merchant_id, payment_id)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
storage::Authorization::find_by_processor_merchant_id_payment_id(
&conn,
processor_merchant_id,
payment_id,
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
#[instrument(skip_all)]
async fn update_authorization_by_merchant_id_authorization_id(
async fn update_authorization_by_processor_merchant_id_authorization_id(
&self,
merchant_id: common_utils::id_type::MerchantId,
processor_merchant_id: common_utils::id_type::MerchantId,
authorization_id: String,
authorization: storage::AuthorizationUpdate,
) -> CustomResult<storage::Authorization, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
storage::Authorization::update_by_merchant_id_authorization_id(
storage::Authorization::update_by_processor_merchant_id_authorization_id(
&conn,
merchant_id,
processor_merchant_id,
authorization_id,
authorization,
)
@@ -102,36 +106,40 @@ impl AuthorizationInterface for MockDb {
error_message: authorization.error_message,
connector_authorization_id: authorization.connector_authorization_id,
previously_authorized_amount: authorization.previously_authorized_amount,
processor_merchant_id: authorization.processor_merchant_id,
};
authorizations.push(authorization.clone());
Ok(authorization)
}
async fn find_all_authorizations_by_merchant_id_payment_id(
async fn find_all_authorizations_by_processor_merchant_id_payment_id(
&self,
merchant_id: &common_utils::id_type::MerchantId,
processor_merchant_id: &common_utils::id_type::MerchantId,
payment_id: &common_utils::id_type::PaymentId,
) -> CustomResult<Vec<storage::Authorization>, errors::StorageError> {
let authorizations = self.authorizations.lock().await;
let authorizations_found: Vec<storage::Authorization> = authorizations
.iter()
.filter(|a| a.merchant_id == *merchant_id && a.payment_id == *payment_id)
.filter(|a| {
a.processor_merchant_id.as_ref() == Some(processor_merchant_id)
&& a.payment_id == *payment_id
})
.cloned()
.collect();
Ok(authorizations_found)
}
async fn update_authorization_by_merchant_id_authorization_id(
async fn update_authorization_by_processor_merchant_id_authorization_id(
&self,
merchant_id: common_utils::id_type::MerchantId,
processor_merchant_id: common_utils::id_type::MerchantId,
authorization_id: String,
authorization_update: storage::AuthorizationUpdate,
) -> CustomResult<storage::Authorization, errors::StorageError> {
let mut authorizations = self.authorizations.lock().await;
authorizations
.iter_mut()
.find(|authorization| authorization.authorization_id == authorization_id && authorization.merchant_id == merchant_id)
.find(|authorization| authorization.authorization_id == authorization_id && authorization.processor_merchant_id.as_ref() == Some(&processor_merchant_id))
.map(|authorization| {
let authorization_updated =
AuthorizationUpdateInternal::from(authorization_update)
@@ -141,7 +149,7 @@ impl AuthorizationInterface for MockDb {
})
.ok_or(
errors::StorageError::ValueNotFound(format!(
"cannot find authorization for authorization_id = {authorization_id} and merchant_id = {merchant_id:?}"
"cannot find authorization for authorization_id = {authorization_id} and processor_merchant_id = {processor_merchant_id:?}"
))
.into(),
)

View File

@@ -3767,25 +3767,28 @@ impl AuthorizationInterface for KafkaStore {
self.diesel_store.insert_authorization(authorization).await
}
async fn find_all_authorizations_by_merchant_id_payment_id(
async fn find_all_authorizations_by_processor_merchant_id_payment_id(
&self,
merchant_id: &id_type::MerchantId,
processor_merchant_id: &id_type::MerchantId,
payment_id: &id_type::PaymentId,
) -> CustomResult<Vec<storage::Authorization>, errors::StorageError> {
self.diesel_store
.find_all_authorizations_by_merchant_id_payment_id(merchant_id, payment_id)
.find_all_authorizations_by_processor_merchant_id_payment_id(
processor_merchant_id,
payment_id,
)
.await
}
async fn update_authorization_by_merchant_id_authorization_id(
async fn update_authorization_by_processor_merchant_id_authorization_id(
&self,
merchant_id: id_type::MerchantId,
processor_merchant_id: id_type::MerchantId,
authorization_id: String,
authorization: storage::AuthorizationUpdate,
) -> CustomResult<storage::Authorization, errors::StorageError> {
self.diesel_store
.update_authorization_by_merchant_id_authorization_id(
merchant_id,
.update_authorization_by_processor_merchant_id_authorization_id(
processor_merchant_id,
authorization_id,
authorization,
)

View File

@@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
ALTER TABLE incremental_authorization DROP COLUMN IF EXISTS processor_merchant_id;

View File

@@ -0,0 +1,4 @@
-- Your SQL goes here
ALTER TABLE incremental_authorization ADD COLUMN IF NOT EXISTS processor_merchant_id VARCHAR(64);
-- This backfill should be executed again after deployment
UPDATE incremental_authorization SET processor_merchant_id = merchant_id where processor_merchant_id IS NULL;

View File

@@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
DROP INDEX CONCURRENTLY IF EXISTS incremental_authorization_processor_merchant_id_authorization_id_index;

View File

@@ -0,0 +1 @@
run_in_transaction = false

View File

@@ -0,0 +1,2 @@
-- Your SQL goes here
CREATE INDEX CONCURRENTLY IF NOT EXISTS incremental_authorization_processor_merchant_id_authorization_id_index ON incremental_authorization (processor_merchant_id, authorization_id);