feat(migration): add connector_customer updation support to batch PM update API (#9791)

Co-authored-by: Mrudul Vajpayee <mrudul.vajpayee@juspay.in>
This commit is contained in:
Kashif
2025-10-11 18:23:16 +05:30
committed by GitHub
parent c2a9ce788d
commit c4720c806b
2 changed files with 127 additions and 32 deletions

View File

@ -295,6 +295,7 @@ pub struct PaymentMethodRecordUpdateResponse {
pub network_transaction_id: Option<String>, pub network_transaction_id: Option<String>,
pub connector_mandate_details: Option<pii::SecretSerdeValue>, pub connector_mandate_details: Option<pii::SecretSerdeValue>,
pub updated_payment_method_data: Option<bool>, pub updated_payment_method_data: Option<bool>,
pub connector_customer: Option<pii::SecretSerdeValue>,
} }
#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)] #[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
@ -2690,6 +2691,7 @@ pub struct UpdatePaymentMethodRecord {
pub network_transaction_id: Option<String>, pub network_transaction_id: Option<String>,
pub line_number: Option<i64>, pub line_number: Option<i64>,
pub payment_instrument_id: Option<masking::Secret<String>>, pub payment_instrument_id: Option<masking::Secret<String>>,
pub connector_customer_id: Option<String>,
pub merchant_connector_ids: Option<String>, pub merchant_connector_ids: Option<String>,
pub card_expiry_month: Option<masking::Secret<String>>, pub card_expiry_month: Option<masking::Secret<String>>,
pub card_expiry_year: Option<masking::Secret<String>>, pub card_expiry_year: Option<masking::Secret<String>>,
@ -2705,6 +2707,7 @@ pub struct PaymentMethodUpdateResponse {
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub update_error: Option<String>, pub update_error: Option<String>,
pub updated_payment_method_data: Option<bool>, pub updated_payment_method_data: Option<bool>,
pub connector_customer: Option<pii::SecretSerdeValue>,
pub line_number: Option<i64>, pub line_number: Option<i64>,
} }
@ -2846,6 +2849,7 @@ impl From<PaymentMethodUpdateResponseType> for PaymentMethodUpdateResponse {
network_transaction_id: res.network_transaction_id, network_transaction_id: res.network_transaction_id,
connector_mandate_details: res.connector_mandate_details, connector_mandate_details: res.connector_mandate_details,
updated_payment_method_data: res.updated_payment_method_data, updated_payment_method_data: res.updated_payment_method_data,
connector_customer: res.connector_customer,
update_status: UpdateStatus::Success, update_status: UpdateStatus::Success,
update_error: None, update_error: None,
line_number: record.line_number, line_number: record.line_number,
@ -2856,6 +2860,7 @@ impl From<PaymentMethodUpdateResponseType> for PaymentMethodUpdateResponse {
network_transaction_id: record.network_transaction_id, network_transaction_id: record.network_transaction_id,
connector_mandate_details: None, connector_mandate_details: None,
updated_payment_method_data: None, updated_payment_method_data: None,
connector_customer: None,
update_status: UpdateStatus::Failed, update_status: UpdateStatus::Failed,
update_error: Some(e), update_error: Some(e),
line_number: record.line_number, line_number: record.line_number,

View File

@ -123,14 +123,56 @@ pub async fn update_payment_method_record(
} }
.transpose()?; .transpose()?;
// Process mandate details when both payment_instrument_id and merchant_connector_ids are present let mca_data_cache = if let Some(merchant_connector_ids) = &req.merchant_connector_ids {
let pm_update = match ( let parsed_mca_ids =
&req.payment_instrument_id, MerchantConnectorValidator::parse_comma_separated_ids(merchant_connector_ids)?;
&req.merchant_connector_ids.clone(), let mut cache = HashMap::new();
) {
(Some(payment_instrument_id), Some(merchant_connector_ids)) => { for merchant_connector_id in parsed_mca_ids {
let parsed_mca_ids = let mca = db
MerchantConnectorValidator::parse_comma_separated_ids(merchant_connector_ids)?; .find_by_merchant_connector_account_merchant_id_merchant_connector_id(
&state.into(),
merchant_context.get_merchant_account().get_id(),
&merchant_connector_id,
merchant_context.get_merchant_key_store(),
)
.await
.to_not_found_response(
errors::ApiErrorResponse::MerchantConnectorAccountNotFound {
id: merchant_connector_id.get_string_repr().to_string(),
},
)?;
cache.insert(merchant_connector_id, mca);
}
Some(cache)
} else {
None
};
let (customer, updated_customer) = match (&req.connector_customer_id, &mca_data_cache) {
(Some(connector_customer_id), Some(cache)) => {
let customer = db
.find_customer_by_customer_id_merchant_id(
&state.into(),
&payment_method.customer_id,
merchant_id,
merchant_context.get_merchant_key_store(),
merchant_context.get_merchant_account().storage_scheme,
)
.await
.to_not_found_response(errors::ApiErrorResponse::CustomerNotFound)?;
let customer_update =
build_connector_customer_update(&customer, connector_customer_id, cache)?;
(Some(customer), Some(customer_update))
}
_ => (None, None),
};
let pm_update = match (&req.payment_instrument_id, &mca_data_cache) {
(Some(payment_instrument_id), Some(cache)) => {
let mandate_details = payment_method let mandate_details = payment_method
.get_common_mandate_reference() .get_common_mandate_reference()
.change_context(errors::ApiErrorResponse::InternalServerError) .change_context(errors::ApiErrorResponse::InternalServerError)
@ -145,31 +187,15 @@ pub async fn update_payment_method_record(
.clone() .clone()
.unwrap_or(PayoutsMandateReference(HashMap::new())); .unwrap_or(PayoutsMandateReference(HashMap::new()));
for merchant_connector_id in parsed_mca_ids { for (merchant_connector_id, mca) in cache.iter() {
let mca = db
.find_by_merchant_connector_account_merchant_id_merchant_connector_id(
&state.into(),
merchant_context.get_merchant_account().get_id(),
&merchant_connector_id,
merchant_context.get_merchant_key_store(),
)
.await
.to_not_found_response(
errors::ApiErrorResponse::MerchantConnectorAccountNotFound {
id: merchant_connector_id.get_string_repr().to_string(),
},
)?;
match mca.connector_type { match mca.connector_type {
enums::ConnectorType::PayoutProcessor => { enums::ConnectorType::PayoutProcessor => {
// Handle PayoutsMandateReference
let new_payout_record = PayoutsMandateReferenceRecord { let new_payout_record = PayoutsMandateReferenceRecord {
transfer_method_id: Some(payment_instrument_id.peek().to_string()), transfer_method_id: Some(payment_instrument_id.peek().to_string()),
}; };
// Check if record exists for this merchant_connector_id
if let Some(existing_record) = if let Some(existing_record) =
existing_payouts_mandate.0.get_mut(&merchant_connector_id) existing_payouts_mandate.0.get_mut(merchant_connector_id)
{ {
if let Some(transfer_method_id) = &new_payout_record.transfer_method_id if let Some(transfer_method_id) = &new_payout_record.transfer_method_id
{ {
@ -177,22 +203,18 @@ pub async fn update_payment_method_record(
Some(transfer_method_id.clone()); Some(transfer_method_id.clone());
} }
} else { } else {
// Insert new record in connector_mandate_details
existing_payouts_mandate existing_payouts_mandate
.0 .0
.insert(merchant_connector_id.clone(), new_payout_record); .insert(merchant_connector_id.clone(), new_payout_record);
} }
} }
_ => { _ => {
// Handle PaymentsMandateReference
// Check if record exists for this merchant_connector_id
if let Some(existing_record) = if let Some(existing_record) =
existing_payments_mandate.0.get_mut(&merchant_connector_id) existing_payments_mandate.0.get_mut(merchant_connector_id)
{ {
existing_record.connector_mandate_id = existing_record.connector_mandate_id =
payment_instrument_id.peek().to_string(); payment_instrument_id.peek().to_string();
} else { } else {
// Insert new record in connector_mandate_details
existing_payments_mandate.0.insert( existing_payments_mandate.0.insert(
merchant_connector_id.clone(), merchant_connector_id.clone(),
PaymentsMandateReferenceRecord { PaymentsMandateReferenceRecord {
@ -267,7 +289,26 @@ pub async fn update_payment_method_record(
"Failed to update payment method for existing pm_id: {payment_method_id:?} in db", "Failed to update payment method for existing pm_id: {payment_method_id:?} in db",
))?; ))?;
logger::debug!("Payment method updated in db"); let connector_customer_response =
if let (Some(customer_data), Some(customer_update)) = (customer, updated_customer) {
let updated_customer = db
.update_customer_by_customer_id_merchant_id(
&state.into(),
response.customer_id.clone(),
merchant_id.clone(),
customer_data,
customer_update,
merchant_context.get_merchant_key_store(),
merchant_context.get_merchant_account().storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to update customer connector data")?;
updated_customer.connector_customer
} else {
None
};
Ok(ApplicationResponse::Json( Ok(ApplicationResponse::Json(
pm_api::PaymentMethodRecordUpdateResponse { pm_api::PaymentMethodRecordUpdateResponse {
@ -278,6 +319,7 @@ pub async fn update_payment_method_record(
.connector_mandate_details .connector_mandate_details
.map(pii::SecretSerdeValue::new), .map(pii::SecretSerdeValue::new),
updated_payment_method_data: Some(updated_card_expiry), updated_payment_method_data: Some(updated_card_expiry),
connector_customer: connector_customer_response,
}, },
)) ))
} }
@ -316,3 +358,51 @@ impl PaymentMethodsUpdateForm {
Ok((self.merchant_id.clone(), records)) Ok((self.merchant_id.clone(), records))
} }
} }
#[cfg(feature = "v1")]
fn build_connector_customer_update(
customer: &hyperswitch_domain_models::customer::Customer,
connector_customer_id: &str,
mca_cache: &std::collections::HashMap<
id_type::MerchantConnectorAccountId,
hyperswitch_domain_models::merchant_connector_account::MerchantConnectorAccount,
>,
) -> CustomResult<hyperswitch_domain_models::customer::CustomerUpdate, errors::ApiErrorResponse> {
use common_enums::enums;
use common_utils::pii;
let mut updated_connector_customer_data: std::collections::HashMap<String, serde_json::Value> =
customer
.connector_customer
.as_ref()
.and_then(|cc| serde_json::from_value(cc.peek().clone()).ok())
.unwrap_or_default();
for (_, mca) in mca_cache.iter() {
let key = match mca.connector_type {
enums::ConnectorType::PayoutProcessor => {
format!(
"{}_{}",
mca.profile_id.get_string_repr(),
mca.connector_name
)
}
_ => mca.merchant_connector_id.get_string_repr().to_string(),
};
updated_connector_customer_data.insert(
key,
serde_json::Value::String(connector_customer_id.to_string()),
);
}
Ok(
hyperswitch_domain_models::customer::CustomerUpdate::ConnectorCustomer {
connector_customer: Some(pii::SecretSerdeValue::new(
serde_json::to_value(updated_connector_customer_data)
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to serialize connector customer data")?,
)),
},
)
}