feat: Kv changes for V2 feature (#8198)

Co-authored-by: Akshay S <akshay.s@juspay.in>
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
akshay-97
2025-06-26 19:15:22 +05:30
committed by GitHub
parent fb58cc9c2f
commit d2740f0322
15 changed files with 868 additions and 113 deletions

View File

@ -126,12 +126,7 @@ impl DBOperation {
)),
#[cfg(feature = "v2")]
Updateable::PaymentAttemptUpdate(a) => DBResult::PaymentAttempt(Box::new(
a.orig
.update_with_attempt_id(
conn,
PaymentAttemptUpdateInternal::from(a.update_data),
)
.await?,
a.orig.update_with_attempt_id(conn, a.update_data).await?,
)),
#[cfg(feature = "v1")]
Updateable::RefundUpdate(a) => {
@ -263,12 +258,20 @@ pub struct PaymentIntentUpdateMems {
pub update_data: PaymentIntentUpdateInternal,
}
#[cfg(feature = "v1")]
#[derive(Debug, Serialize, Deserialize)]
pub struct PaymentAttemptUpdateMems {
pub orig: PaymentAttempt,
pub update_data: PaymentAttemptUpdate,
}
#[cfg(feature = "v2")]
#[derive(Debug, Serialize, Deserialize)]
pub struct PaymentAttemptUpdateMems {
pub orig: PaymentAttempt,
pub update_data: PaymentAttemptUpdateInternal,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RefundUpdateMems {
pub orig: Refund,

View File

@ -833,7 +833,7 @@ pub enum PaymentAttemptUpdate {
// TODO: uncomment fields as and when required
#[cfg(feature = "v2")]
#[derive(Clone, Debug, AsChangeset, router_derive::DebugAsDisplay)]
#[derive(Clone, Debug, AsChangeset, router_derive::DebugAsDisplay, Serialize, Deserialize)]
#[diesel(table_name = payment_attempt)]
pub struct PaymentAttemptUpdateInternal {
pub status: Option<storage_enums::AttemptStatus>,
@ -881,6 +881,112 @@ pub struct PaymentAttemptUpdateInternal {
pub connector_request_reference_id: Option<String>,
}
#[cfg(feature = "v2")]
impl PaymentAttemptUpdateInternal {
pub fn apply_changeset(self, source: PaymentAttempt) -> PaymentAttempt {
let Self {
status,
authentication_type,
error_message,
connector_payment_id,
modified_at,
browser_info,
error_code,
connector_metadata,
error_reason,
amount_capturable,
amount_to_capture,
updated_by,
merchant_connector_id,
connector,
redirection_data,
unified_code,
unified_message,
connector_token_details,
feature_metadata,
network_decline_code,
network_advice_code,
network_error_message,
payment_method_id,
connector_request_reference_id,
} = self;
PaymentAttempt {
payment_id: source.payment_id,
merchant_id: source.merchant_id,
status: status.unwrap_or(source.status),
connector: connector.or(source.connector),
error_message: error_message.or(source.error_message),
surcharge_amount: source.surcharge_amount,
payment_method_id: payment_method_id.or(source.payment_method_id),
authentication_type: authentication_type.unwrap_or(source.authentication_type),
created_at: source.created_at,
modified_at: common_utils::date_time::now(),
last_synced: source.last_synced,
cancellation_reason: source.cancellation_reason,
amount_to_capture: amount_to_capture.or(source.amount_to_capture),
browser_info: browser_info
.and_then(|val| {
serde_json::from_value::<common_utils::types::BrowserInformation>(val).ok()
})
.or(source.browser_info),
error_code: error_code.or(source.error_code),
payment_token: source.payment_token,
connector_metadata: connector_metadata.or(source.connector_metadata),
payment_experience: source.payment_experience,
payment_method_data: source.payment_method_data,
preprocessing_step_id: source.preprocessing_step_id,
error_reason: error_reason.or(source.error_reason),
multiple_capture_count: source.multiple_capture_count,
connector_response_reference_id: source.connector_response_reference_id,
amount_capturable: amount_capturable.unwrap_or(source.amount_capturable),
updated_by,
merchant_connector_id: merchant_connector_id.or(source.merchant_connector_id),
encoded_data: source.encoded_data,
unified_code: unified_code.flatten().or(source.unified_code),
unified_message: unified_message.flatten().or(source.unified_message),
net_amount: source.net_amount,
external_three_ds_authentication_attempted: source
.external_three_ds_authentication_attempted,
authentication_connector: source.authentication_connector,
authentication_id: source.authentication_id,
fingerprint_id: source.fingerprint_id,
client_source: source.client_source,
client_version: source.client_version,
customer_acceptance: source.customer_acceptance,
profile_id: source.profile_id,
organization_id: source.organization_id,
card_network: source.card_network,
shipping_cost: source.shipping_cost,
order_tax_amount: source.order_tax_amount,
request_extended_authorization: source.request_extended_authorization,
extended_authorization_applied: source.extended_authorization_applied,
capture_before: source.capture_before,
card_discovery: source.card_discovery,
charges: source.charges,
processor_merchant_id: source.processor_merchant_id,
created_by: source.created_by,
payment_method_type_v2: source.payment_method_type_v2,
connector_payment_id: source.connector_payment_id,
payment_method_subtype: source.payment_method_subtype,
routing_result: source.routing_result,
authentication_applied: source.authentication_applied,
external_reference_id: source.external_reference_id,
tax_on_surcharge: source.tax_on_surcharge,
payment_method_billing_address: source.payment_method_billing_address,
redirection_data: redirection_data.or(source.redirection_data),
connector_payment_data: source.connector_payment_data,
connector_token_details: connector_token_details.or(source.connector_token_details),
id: source.id,
feature_metadata: feature_metadata.or(source.feature_metadata),
network_advice_code: network_advice_code.or(source.network_advice_code),
network_decline_code: network_decline_code.or(source.network_decline_code),
network_error_message: network_error_message.or(source.network_error_message),
connector_request_reference_id: connector_request_reference_id
.or(source.connector_request_reference_id),
}
}
}
#[cfg(feature = "v1")]
#[derive(Clone, Debug, AsChangeset, router_derive::DebugAsDisplay)]
#[diesel(table_name = payment_attempt)]

View File

@ -632,6 +632,122 @@ pub struct PaymentIntentUpdateInternal {
pub is_iframe_redirection_enabled: Option<bool>,
}
#[cfg(feature = "v2")]
impl PaymentIntentUpdateInternal {
pub fn apply_changeset(self, source: PaymentIntent) -> PaymentIntent {
let Self {
status,
prerouting_algorithm,
amount_captured,
modified_at: _, // This will be ignored from self
active_attempt_id,
amount,
currency,
shipping_cost,
tax_details,
skip_external_tax_calculation,
surcharge_applicable,
surcharge_amount,
tax_on_surcharge,
routing_algorithm_id,
capture_method,
authentication_type,
billing_address,
shipping_address,
customer_present,
description,
return_url,
setup_future_usage,
apply_mit_exemption,
statement_descriptor,
order_details,
allowed_payment_method_types,
metadata,
connector_metadata,
feature_metadata,
payment_link_config,
request_incremental_authorization,
session_expiry,
frm_metadata,
request_external_three_ds_authentication,
updated_by,
force_3ds_challenge,
is_iframe_redirection_enabled,
} = self;
PaymentIntent {
status: status.unwrap_or(source.status),
prerouting_algorithm: prerouting_algorithm.or(source.prerouting_algorithm),
amount_captured: amount_captured.or(source.amount_captured),
modified_at: common_utils::date_time::now(),
active_attempt_id: match active_attempt_id {
Some(v_option) => v_option,
None => source.active_attempt_id,
},
amount: amount.unwrap_or(source.amount),
currency: currency.unwrap_or(source.currency),
shipping_cost: shipping_cost.or(source.shipping_cost),
tax_details: tax_details.or(source.tax_details),
skip_external_tax_calculation: skip_external_tax_calculation
.or(source.skip_external_tax_calculation),
surcharge_applicable: surcharge_applicable.or(source.surcharge_applicable),
surcharge_amount: surcharge_amount.or(source.surcharge_amount),
tax_on_surcharge: tax_on_surcharge.or(source.tax_on_surcharge),
routing_algorithm_id: routing_algorithm_id.or(source.routing_algorithm_id),
capture_method: capture_method.or(source.capture_method),
authentication_type: authentication_type.or(source.authentication_type),
billing_address: billing_address.or(source.billing_address),
shipping_address: shipping_address.or(source.shipping_address),
customer_present: customer_present.or(source.customer_present),
description: description.or(source.description),
return_url: return_url.or(source.return_url),
setup_future_usage: setup_future_usage.or(source.setup_future_usage),
apply_mit_exemption: apply_mit_exemption.or(source.apply_mit_exemption),
statement_descriptor: statement_descriptor.or(source.statement_descriptor),
order_details: order_details.or(source.order_details),
allowed_payment_method_types: allowed_payment_method_types
.or(source.allowed_payment_method_types),
metadata: metadata.or(source.metadata),
connector_metadata: connector_metadata.or(source.connector_metadata),
feature_metadata: feature_metadata.or(source.feature_metadata),
payment_link_config: payment_link_config.or(source.payment_link_config),
request_incremental_authorization: request_incremental_authorization
.or(source.request_incremental_authorization),
session_expiry: session_expiry.unwrap_or(source.session_expiry),
frm_metadata: frm_metadata.or(source.frm_metadata),
request_external_three_ds_authentication: request_external_three_ds_authentication
.or(source.request_external_three_ds_authentication),
updated_by,
force_3ds_challenge: force_3ds_challenge.or(source.force_3ds_challenge),
is_iframe_redirection_enabled: is_iframe_redirection_enabled
.or(source.is_iframe_redirection_enabled),
// Fields from source
merchant_id: source.merchant_id,
customer_id: source.customer_id,
created_at: source.created_at,
last_synced: source.last_synced,
attempt_count: source.attempt_count,
profile_id: source.profile_id,
payment_link_id: source.payment_link_id,
authorization_count: source.authorization_count,
customer_details: source.customer_details,
organization_id: source.organization_id,
request_extended_authorization: source.request_extended_authorization,
psd2_sca_exemption_type: source.psd2_sca_exemption_type,
split_payments: source.split_payments,
platform_merchant_id: source.platform_merchant_id,
force_3ds_challenge_trigger: source.force_3ds_challenge_trigger,
processor_merchant_id: source.processor_merchant_id,
created_by: source.created_by,
merchant_reference_id: source.merchant_reference_id,
frm_merchant_decision: source.frm_merchant_decision,
enable_payment_link: source.enable_payment_link,
id: source.id,
}
}
}
#[cfg(feature = "v1")]
#[derive(Clone, Debug, AsChangeset, router_derive::DebugAsDisplay)]
#[diesel(table_name = payment_intent)]

View File

@ -11,6 +11,7 @@ license.workspace = true
release = ["vergen", "external_services/aws_kms"]
vergen = ["router_env/vergen"]
v1 = ["diesel_models/v1", "hyperswitch_interfaces/v1", "common_utils/v1"]
v2 = ["diesel_models/v2", "hyperswitch_interfaces/v2", "common_utils/v2"]
[dependencies]
actix-web = "4.11.0"

View File

@ -10,7 +10,7 @@ license.workspace = true
default = ["dummy_connector", "frm", "payouts"]
dummy_connector = []
v1 = ["hyperswitch_domain_models/v1", "api_models/v1", "common_utils/v1"]
v2 = []
v2 = ["api_models/v2", "common_utils/v2", "hyperswitch_domain_models/v2"]
payouts = ["hyperswitch_domain_models/payouts"]
frm = ["hyperswitch_domain_models/frm"]
revenue_recovery = []

View File

@ -116,7 +116,7 @@ pub fn mk_app(
> {
let mut server_app = get_application_builder(request_body_limit, state.conf.cors.clone());
#[cfg(all(feature = "dummy_connector", feature = "v1"))]
#[cfg(feature = "dummy_connector")]
{
use routes::DummyConnector;
server_app = server_app.service(DummyConnector::server(state.clone()));

View File

@ -915,6 +915,7 @@ pub async fn connector_delete(
/// Merchant Account - Toggle KV
///
/// Toggle KV mode for the Merchant Account
#[cfg(feature = "v1")]
#[instrument(skip_all)]
pub async fn merchant_account_toggle_kv(
state: web::Data<AppState>,
@ -938,6 +939,29 @@ pub async fn merchant_account_toggle_kv(
.await
}
#[cfg(feature = "v2")]
#[instrument(skip_all)]
pub async fn merchant_account_toggle_kv(
state: web::Data<AppState>,
req: HttpRequest,
path: web::Path<common_utils::id_type::MerchantId>,
json_payload: web::Json<admin::ToggleKVRequest>,
) -> HttpResponse {
let flow = Flow::ConfigKeyUpdate;
let mut payload = json_payload.into_inner();
payload.merchant_id = path.into_inner();
api::server_wrap(
flow,
state,
&req,
payload,
|state, _, payload, _| kv_for_merchant(state, payload.merchant_id, payload.kv_enabled),
&auth::V2AdminApiAuth,
api_locking::LockAction::NotApplicable,
)
.await
}
/// Merchant Account - Transfer Keys
///
/// Transfer Merchant Encryption key to keymanager
@ -965,6 +989,7 @@ pub async fn merchant_account_toggle_all_kv(
/// Merchant Account - KV Status
///
/// Toggle KV mode for the Merchant Account
#[cfg(feature = "v1")]
#[instrument(skip_all)]
pub async fn merchant_account_kv_status(
state: web::Data<AppState>,
@ -986,6 +1011,27 @@ pub async fn merchant_account_kv_status(
.await
}
#[cfg(feature = "v2")]
#[instrument(skip_all)]
pub async fn merchant_account_kv_status(
state: web::Data<AppState>,
req: HttpRequest,
path: web::Path<common_utils::id_type::MerchantId>,
) -> HttpResponse {
let flow = Flow::ConfigKeyFetch;
let merchant_id = path.into_inner();
api::server_wrap(
flow,
state,
&req,
merchant_id,
|state, _, req, _| check_merchant_account_kv_status(state, req),
&auth::V2AdminApiAuth,
api_locking::LockAction::NotApplicable,
)
.await
}
/// Merchant Account - KV Status
///
/// Toggle KV mode for the Merchant Account

View File

@ -600,6 +600,22 @@ impl DummyConnector {
}
}
#[cfg(all(feature = "dummy_connector", feature = "v2"))]
impl DummyConnector {
pub fn server(state: AppState) -> Scope {
let mut routes_with_restricted_access = web::scope("");
#[cfg(not(feature = "external_access_dc"))]
{
routes_with_restricted_access =
routes_with_restricted_access.guard(actix_web::guard::Host("localhost"));
}
routes_with_restricted_access = routes_with_restricted_access
.service(web::resource("/payment").route(web::post().to(dummy_connector_payment)));
web::scope("/dummy-connector")
.app_data(web::Data::new(state))
.service(routes_with_restricted_access)
}
}
pub struct Payments;
#[cfg(all(any(feature = "olap", feature = "oltp"), feature = "v2"))]
@ -1556,6 +1572,11 @@ impl MerchantAccount {
)
.service(
web::resource("/profiles").route(web::get().to(profiles::profiles_list)),
)
.service(
web::resource("/kv")
.route(web::post().to(admin::merchant_account_toggle_kv))
.route(web::get().to(admin::merchant_account_kv_status)),
),
)
}

View File

@ -61,7 +61,7 @@ pub async fn dummy_connector_complete_payment(
.await
}
#[cfg(all(feature = "dummy_connector", feature = "v1"))]
#[cfg(feature = "dummy_connector")]
#[instrument(skip_all, fields(flow = ?types::Flow::DummyPaymentCreate))]
pub async fn dummy_connector_payment(
state: web::Data<app::AppState>,
@ -82,7 +82,7 @@ pub async fn dummy_connector_payment(
.await
}
#[cfg(all(feature = "dummy_connector", feature = "v1"))]
#[cfg(feature = "dummy_connector")]
#[instrument(skip_all, fields(flow = ?types::Flow::DummyPaymentRetrieve))]
pub async fn dummy_connector_payment_data(
state: web::Data<app::AppState>,

View File

@ -9,7 +9,7 @@ use crate::{
utils::OptionExt,
};
#[cfg(all(feature = "dummy_connector", feature = "v1"))]
#[cfg(feature = "dummy_connector")]
pub async fn payment(
state: SessionState,
req: types::DummyConnectorPaymentRequest,

View File

@ -23,6 +23,32 @@ use crate::{
impl KvStorePartition for customers::Customer {}
#[cfg(feature = "v2")]
mod label {
use common_utils::id_type;
pub(super) const MODEL_NAME: &str = "customer_v2";
pub(super) const CLUSTER_LABEL: &str = "cust";
pub(super) fn get_global_id_label(global_customer_id: &id_type::GlobalCustomerId) -> String {
format!(
"customer_global_id_{}",
global_customer_id.get_string_repr()
)
}
pub(super) fn get_merchant_scoped_id_label(
merchant_id: &id_type::MerchantId,
merchant_reference_id: &id_type::CustomerId,
) -> String {
format!(
"customer_mid_{}_mrefid_{}",
merchant_id.get_string_repr(),
merchant_reference_id.get_string_repr()
)
}
}
#[async_trait::async_trait]
impl<T: DatabaseStore> domain::CustomerInterface for kv_router_store::KVRouterStore<T> {
type Error = StorageError;
@ -283,22 +309,32 @@ impl<T: DatabaseStore> domain::CustomerInterface for kv_router_store::KVRouterSt
.construct_new()
.await
.change_context(StorageError::EncryptionError)?;
let storage_scheme = Box::pin(decide_storage_scheme::<_, customers::Customer>(
let decided_storage_scheme = Box::pin(decide_storage_scheme::<_, customers::Customer>(
self,
storage_scheme,
Op::Insert,
))
.await;
new_customer.update_storage_scheme(storage_scheme);
new_customer.update_storage_scheme(decided_storage_scheme);
let mut reverse_lookups = Vec::new();
if let Some(ref merchant_ref_id) = new_customer.merchant_reference_id {
let reverse_lookup_merchant_scoped_id =
label::get_merchant_scoped_id_label(&new_customer.merchant_id, merchant_ref_id);
reverse_lookups.push(reverse_lookup_merchant_scoped_id);
}
self.insert_resource(
state,
key_store,
storage_scheme,
decided_storage_scheme,
new_customer.clone().insert(&conn),
new_customer.clone().into(),
kv_router_store::InsertResourceParams {
insertable: kv::Insertable::Customer(new_customer.clone()),
reverse_lookups: vec![],
reverse_lookups,
identifier,
key,
resource_type: "customer",

View File

@ -378,6 +378,16 @@ impl UniqueConstraints for diesel_models::PaymentAttempt {
}
}
#[cfg(feature = "v2")]
impl UniqueConstraints for diesel_models::PaymentAttempt {
fn unique_constraints(&self) -> Vec<String> {
vec![format!("pa_{}", self.id.get_string_repr())]
}
fn table_name(&self) -> &str {
"PaymentAttempt"
}
}
#[cfg(feature = "v1")]
impl UniqueConstraints for diesel_models::Refund {
fn unique_constraints(&self) -> Vec<String> {

View File

@ -6,16 +6,17 @@ use common_utils::{
fallback_reverse_lookup_not_found,
types::{ConnectorTransactionId, ConnectorTransactionIdTrait, CreatedBy},
};
#[cfg(feature = "v1")]
use diesel_models::payment_attempt::PaymentAttemptNew as DieselPaymentAttemptNew;
use diesel_models::{
enums::{
MandateAmountData as DieselMandateAmountData, MandateDataType as DieselMandateType,
MandateDetails as DieselMandateDetails, MerchantStorageScheme,
},
kv,
payment_attempt::PaymentAttempt as DieselPaymentAttempt,
reverse_lookup::{ReverseLookup, ReverseLookupNew},
};
#[cfg(feature = "v1")]
use diesel_models::{kv, payment_attempt::PaymentAttemptNew as DieselPaymentAttemptNew};
use error_stack::ResultExt;
#[cfg(feature = "v1")]
use hyperswitch_domain_models::payments::payment_attempt::PaymentAttemptNew;
@ -32,22 +33,21 @@ use hyperswitch_domain_models::{
use hyperswitch_domain_models::{
payments::payment_attempt::PaymentListFilters, payments::PaymentIntent,
};
#[cfg(feature = "v1")]
#[cfg(feature = "v2")]
use label::*;
use redis_interface::HsetnxReply;
use router_env::{instrument, tracing};
#[cfg(feature = "v2")]
use crate::kv_router_store::{FilterResourceParams, FindResourceBy, UpdateResourceParams};
use crate::{
diesel_error_to_data_error, errors,
errors::RedisErrorExt,
kv_router_store::KVRouterStore,
lookup::ReverseLookupInterface,
utils::{pg_connection_read, pg_connection_write},
DataModelExt, DatabaseStore, RouterStore,
};
#[cfg(feature = "v1")]
use crate::{
errors::RedisErrorExt,
redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey},
utils::try_redis_get_else_try_database_get,
utils::{pg_connection_read, pg_connection_write, try_redis_get_else_try_database_get},
DataModelExt, DatabaseStore, RouterStore,
};
#[async_trait::async_trait]
@ -747,15 +747,98 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
payment_attempt: PaymentAttempt,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
// Ignoring storage scheme for v2 implementation
self.router_store
.insert_payment_attempt(
key_manager_state,
merchant_key_store,
payment_attempt,
storage_scheme,
)
.await
let decided_storage_scheme = Box::pin(decide_storage_scheme::<_, DieselPaymentAttempt>(
self,
storage_scheme,
Op::Insert,
))
.await;
match decided_storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store
.insert_payment_attempt(
key_manager_state,
merchant_key_store,
payment_attempt,
decided_storage_scheme,
)
.await
}
MerchantStorageScheme::RedisKv => {
let key = PartitionKey::GlobalPaymentId {
id: &payment_attempt.payment_id,
};
let key_str = key.to_string();
let field = format!(
"{}_{}",
label::CLUSTER_LABEL,
payment_attempt.id.get_string_repr()
);
let diesel_payment_attempt_new = payment_attempt
.clone()
.construct_new()
.await
.change_context(errors::StorageError::EncryptionError)?;
let diesel_payment_attempt_for_redis: DieselPaymentAttempt =
Conversion::convert(payment_attempt.clone())
.await
.change_context(errors::StorageError::EncryptionError)?;
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Insert {
insertable: Box::new(kv::Insertable::PaymentAttempt(Box::new(
diesel_payment_attempt_new.clone(),
))),
},
};
let reverse_lookup_attempt_id = ReverseLookupNew {
lookup_id: label::get_global_id_label(&payment_attempt.id),
pk_id: key_str.clone(),
sk_id: field.clone(),
source: "payment_attempt".to_string(),
updated_by: decided_storage_scheme.to_string(),
};
self.insert_reverse_lookup(reverse_lookup_attempt_id, decided_storage_scheme)
.await?;
if let Some(ref conn_txn_id_val) = payment_attempt.connector_payment_id {
let reverse_lookup_conn_txn_id = ReverseLookupNew {
lookup_id: label::get_profile_id_connector_transaction_label(
payment_attempt.profile_id.get_string_repr(),
conn_txn_id_val,
),
pk_id: key_str.clone(),
sk_id: field.clone(),
source: "payment_attempt".to_string(),
updated_by: decided_storage_scheme.to_string(),
};
self.insert_reverse_lookup(reverse_lookup_conn_txn_id, decided_storage_scheme)
.await?;
}
match Box::pin(kv_wrapper::<DieselPaymentAttempt, _, _>(
self,
KvOperation::HSetNx(&field, &diesel_payment_attempt_for_redis, redis_entry),
key,
))
.await
.map_err(|err| err.to_redis_failed_response(&key_str))?
.try_into_hsetnx()
{
Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue {
entity: "payment_attempt",
key: Some(payment_attempt.id.get_string_repr().to_owned()),
}
.into()),
Ok(HsetnxReply::KeySet) => Ok(payment_attempt),
Err(error) => Err(error.change_context(errors::StorageError::KVError)),
}
}
}
}
#[cfg(feature = "v1")]
@ -889,19 +972,48 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
key_manager_state: &KeyManagerState,
merchant_key_store: &MerchantKeyStore,
this: PaymentAttempt,
payment_attempt: PaymentAttemptUpdate,
payment_attempt_update: PaymentAttemptUpdate,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
// Ignoring storage scheme for v2 implementation
self.router_store
.update_payment_attempt(
key_manager_state,
merchant_key_store,
this,
payment_attempt,
storage_scheme,
)
let payment_attempt = Conversion::convert(this.clone())
.await
.change_context(errors::StorageError::DecryptionError)?;
let key = PartitionKey::GlobalPaymentId {
id: &this.payment_id,
};
let field = format!("{}_{}", label::CLUSTER_LABEL, this.id.get_string_repr());
let conn = pg_connection_write(self).await?;
let payment_attempt_internal =
diesel_models::PaymentAttemptUpdateInternal::from(payment_attempt_update);
let updated_payment_attempt = payment_attempt_internal
.clone()
.apply_changeset(payment_attempt.clone());
let updated_by = updated_payment_attempt.updated_by.to_owned();
let updated_payment_attempt_with_id = payment_attempt
.clone()
.update_with_attempt_id(&conn, payment_attempt_internal.clone());
Box::pin(self.update_resource(
key_manager_state,
merchant_key_store,
storage_scheme,
updated_payment_attempt_with_id,
updated_payment_attempt,
UpdateResourceParams {
updateable: kv::Updateable::PaymentAttemptUpdate(Box::new(
kv::PaymentAttemptUpdateMems {
orig: payment_attempt,
update_data: payment_attempt_internal,
},
)),
operation: Op::Update(key.clone(), &field, Some(updated_by.as_str())),
},
))
.await
}
#[cfg(feature = "v1")]
@ -1095,15 +1207,69 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
payment_id: &common_utils::id_type::GlobalPaymentId,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
// Ignoring storage scheme for v2 implementation
self.router_store
.find_payment_attempt_last_successful_or_partially_captured_attempt_by_payment_id(
key_manager_state,
merchant_key_store,
payment_id,
storage_scheme,
)
.await
let database_call = || {
self.router_store
.find_payment_attempt_last_successful_or_partially_captured_attempt_by_payment_id(
key_manager_state,
merchant_key_store,
payment_id,
storage_scheme,
)
};
let decided_storage_scheme = Box::pin(decide_storage_scheme::<_, DieselPaymentAttempt>(
self,
storage_scheme,
Op::Find,
))
.await;
match decided_storage_scheme {
MerchantStorageScheme::PostgresOnly => database_call().await,
MerchantStorageScheme::RedisKv => {
let key = PartitionKey::GlobalPaymentId { id: payment_id };
let redis_fut = async {
let kv_result = kv_wrapper::<DieselPaymentAttempt, _, _>(
self,
KvOperation::<DieselPaymentAttempt>::Scan("pa_*"),
key.clone(),
)
.await?
.try_into_scan();
let payment_attempt = kv_result.and_then(|mut payment_attempts| {
payment_attempts.sort_by(|a, b| b.modified_at.cmp(&a.modified_at));
payment_attempts
.iter()
.find(|&pa| {
pa.status == diesel_models::enums::AttemptStatus::Charged
|| pa.status
== diesel_models::enums::AttemptStatus::PartialCharged
})
.cloned()
.ok_or(error_stack::report!(
redis_interface::errors::RedisError::NotFound
))
})?;
let merchant_id = payment_attempt.merchant_id.clone();
PaymentAttempt::convert_back(
key_manager_state,
payment_attempt,
merchant_key_store.key.get_inner(),
merchant_id.into(),
)
.await
.change_context(redis_interface::errors::RedisError::UnknownResult)
};
Box::pin(try_redis_get_else_try_database_get(
redis_fut,
database_call,
))
.await
}
}
}
#[cfg(feature = "v2")]
@ -1115,16 +1281,22 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
connector_transaction_id: &str,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
// Ignoring storage scheme for v2 implementation
self.router_store
.find_payment_attempt_by_profile_id_connector_transaction_id(
key_manager_state,
merchant_key_store,
let conn = pg_connection_read(self).await?;
self.find_resource_by_id(
key_manager_state,
merchant_key_store,
storage_scheme,
DieselPaymentAttempt::find_by_profile_id_connector_transaction_id(
&conn,
profile_id,
connector_transaction_id,
storage_scheme,
)
.await
),
FindResourceBy::LookupId(label::get_profile_id_connector_transaction_label(
profile_id.get_string_repr(),
connector_transaction_id,
)),
)
.await
}
#[instrument(skip_all)]
@ -1329,15 +1501,15 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
attempt_id: &common_utils::id_type::GlobalAttemptId,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
// Ignoring storage scheme for v2 implementation
self.router_store
.find_payment_attempt_by_id(
key_manager_state,
merchant_key_store,
attempt_id,
storage_scheme,
)
.await
let conn = pg_connection_read(self).await?;
self.find_resource_by_id(
key_manager_state,
merchant_key_store,
storage_scheme,
DieselPaymentAttempt::find_by_id(&conn, attempt_id),
FindResourceBy::LookupId(label::get_global_id_label(attempt_id)),
)
.await
}
#[cfg(feature = "v2")]
@ -1349,14 +1521,20 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
merchant_key_store: &MerchantKeyStore,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<PaymentAttempt>, errors::StorageError> {
self.router_store
.find_payment_attempts_by_payment_intent_id(
key_manager_state,
payment_id,
merchant_key_store,
storage_scheme,
)
.await
let conn = pg_connection_read(self).await?;
self.filter_resources(
key_manager_state,
merchant_key_store,
storage_scheme,
DieselPaymentAttempt::find_by_payment_id(&conn, payment_id),
|_| true,
FilterResourceParams {
key: PartitionKey::GlobalPaymentId { id: payment_id },
pattern: "pa_*",
limit: None,
},
)
.await
}
#[cfg(feature = "v1")]
@ -2036,3 +2214,25 @@ async fn add_preprocessing_id_to_reverse_lookup<T: DatabaseStore>(
.insert_reverse_lookup(reverse_lookup_new, storage_scheme)
.await
}
#[cfg(feature = "v2")]
mod label {
pub(super) const MODEL_NAME: &str = "payment_attempt_v2";
pub(super) const CLUSTER_LABEL: &str = "pa";
pub(super) fn get_profile_id_connector_transaction_label(
profile_id: &str,
connector_transaction_id: &str,
) -> String {
format!(
"profile_{}_conn_txn_{}",
profile_id, connector_transaction_id
)
}
pub(super) fn get_global_id_label(
attempt_id: &common_utils::id_type::GlobalAttemptId,
) -> String {
format!("attempt_global_id_{}", attempt_id.get_string_repr())
}
}

View File

@ -2,13 +2,22 @@
use api_models::payments::{AmountFilter, Order, SortBy, SortOn};
#[cfg(feature = "olap")]
use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl};
#[cfg(feature = "v1")]
use common_utils::ext_traits::Encode;
use common_utils::{ext_traits::AsyncExt, types::keymanager::KeyManagerState};
#[cfg(feature = "v2")]
use common_utils::fallback_reverse_lookup_not_found;
use common_utils::{
ext_traits::{AsyncExt, Encode},
types::keymanager::KeyManagerState,
};
#[cfg(feature = "olap")]
use diesel::{associations::HasTable, ExpressionMethods, JoinOnDsl, QueryDsl};
#[cfg(feature = "v1")]
use diesel_models::payment_intent::PaymentIntentUpdate as DieselPaymentIntentUpdate;
#[cfg(feature = "v2")]
use diesel_models::payment_intent::PaymentIntentUpdateInternal;
#[cfg(feature = "olap")]
use diesel_models::query::generics::db_metrics;
#[cfg(feature = "v2")]
use diesel_models::reverse_lookup::ReverseLookupNew;
#[cfg(all(feature = "v1", feature = "olap"))]
use diesel_models::schema::{
payment_attempt::{self as payment_attempt_schema, dsl as pa_dsl},
@ -20,10 +29,8 @@ use diesel_models::schema_v2::{
payment_intent::dsl as pi_dsl,
};
use diesel_models::{
enums::MerchantStorageScheme, payment_intent::PaymentIntent as DieselPaymentIntent,
enums::MerchantStorageScheme, kv, payment_intent::PaymentIntent as DieselPaymentIntent,
};
#[cfg(feature = "v1")]
use diesel_models::{kv, payment_intent::PaymentIntentUpdate as DieselPaymentIntentUpdate};
use error_stack::ResultExt;
#[cfg(feature = "olap")]
use hyperswitch_domain_models::payments::{
@ -37,7 +44,6 @@ use hyperswitch_domain_models::{
PaymentIntent,
},
};
#[cfg(feature = "v1")]
use redis_interface::HsetnxReply;
#[cfg(feature = "olap")]
use router_env::logger;
@ -47,17 +53,14 @@ use router_env::{instrument, tracing};
use crate::connection;
use crate::{
diesel_error_to_data_error,
errors::StorageError,
errors::{RedisErrorExt, StorageError},
kv_router_store::KVRouterStore,
utils::{pg_connection_read, pg_connection_write},
redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey},
utils::{self, pg_connection_read, pg_connection_write},
DatabaseStore,
};
#[cfg(feature = "v1")]
use crate::{
errors::RedisErrorExt,
redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey},
utils,
};
#[cfg(feature = "v2")]
use crate::{errors, lookup::ReverseLookupInterface};
#[async_trait::async_trait]
impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
@ -163,7 +166,68 @@ impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
}
MerchantStorageScheme::RedisKv => {
todo!("Implement payment intent insert for kv")
let id = payment_intent.id.clone();
let key = PartitionKey::GlobalPaymentId { id: &id };
let field = format!("pi_{}", id.get_string_repr());
let key_str = key.to_string();
let new_payment_intent = payment_intent
.clone()
.construct_new()
.await
.change_context(StorageError::EncryptionError)?;
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Insert {
insertable: Box::new(kv::Insertable::PaymentIntent(Box::new(
new_payment_intent,
))),
},
};
let diesel_payment_intent = payment_intent
.clone()
.convert()
.await
.change_context(StorageError::EncryptionError)?;
if let Some(merchant_reference_id) = &payment_intent.merchant_reference_id {
let reverse_lookup = ReverseLookupNew {
lookup_id: format!(
"pi_merchant_reference_{}_{}",
payment_intent.profile_id.get_string_repr(),
merchant_reference_id.get_string_repr()
),
pk_id: key_str.clone(),
sk_id: field.clone(),
source: "payment_intent".to_string(),
updated_by: storage_scheme.to_string(),
};
self.insert_reverse_lookup(reverse_lookup, storage_scheme)
.await?;
}
match Box::pin(kv_wrapper::<DieselPaymentIntent, _, _>(
self,
KvOperation::<DieselPaymentIntent>::HSetNx(
&field,
&diesel_payment_intent,
redis_entry,
),
key,
))
.await
.map_err(|err| err.to_redis_failed_response(&key_str))?
.try_into_hsetnx()
{
Ok(HsetnxReply::KeyNotSet) => Err(StorageError::DuplicateValue {
entity: "payment_intent",
key: Some(key_str),
}
.into()),
Ok(HsetnxReply::KeySet) => Ok(payment_intent),
Err(error) => Err(error.change_context(StorageError::KVError)),
}
}
}
}
@ -300,7 +364,59 @@ impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
.await
}
MerchantStorageScheme::RedisKv => {
todo!()
let id = this.id.clone();
let merchant_id = this.merchant_id.clone();
let key = PartitionKey::GlobalPaymentId { id: &id };
let field = format!("pi_{}", id.get_string_repr());
let key_str = key.to_string();
let diesel_intent_update =
PaymentIntentUpdateInternal::try_from(payment_intent_update)
.change_context(StorageError::DeserializationFailed)?;
let origin_diesel_intent = this
.convert()
.await
.change_context(StorageError::EncryptionError)?;
let diesel_intent = diesel_intent_update
.clone()
.apply_changeset(origin_diesel_intent.clone());
let redis_value = diesel_intent
.encode_to_string_of_json()
.change_context(StorageError::SerializationFailed)?;
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Update {
updatable: Box::new(kv::Updateable::PaymentIntentUpdate(Box::new(
kv::PaymentIntentUpdateMems {
orig: origin_diesel_intent,
update_data: diesel_intent_update,
},
))),
},
};
Box::pin(kv_wrapper::<(), _, _>(
self,
KvOperation::<DieselPaymentIntent>::Hset((&field, redis_value), redis_entry),
key,
))
.await
.map_err(|err| err.to_redis_failed_response(&key_str))?
.try_into_hset()
.change_context(StorageError::KVError)?;
let payment_intent = PaymentIntent::convert_back(
state,
diesel_intent,
merchant_key_store.key.get_inner(),
merchant_id.into(),
)
.await
.change_context(StorageError::DecryptionError)?;
Ok(payment_intent)
}
}
}
@ -372,18 +488,50 @@ impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
state: &KeyManagerState,
id: &common_utils::id_type::GlobalPaymentId,
merchant_key_store: &MerchantKeyStore,
_storage_scheme: MerchantStorageScheme,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PaymentIntent, StorageError> {
let conn: bb8::PooledConnection<
'_,
async_bb8_diesel::ConnectionManager<diesel::PgConnection>,
> = pg_connection_read(self).await?;
let diesel_payment_intent = DieselPaymentIntent::find_by_global_id(&conn, id)
.await
.map_err(|er| {
let new_err = diesel_error_to_data_error(*er.current_context());
er.change_context(new_err)
})?;
let storage_scheme = Box::pin(decide_storage_scheme::<_, DieselPaymentIntent>(
self,
storage_scheme,
Op::Find,
))
.await;
let database_call = || async {
let conn: bb8::PooledConnection<
'_,
async_bb8_diesel::ConnectionManager<diesel::PgConnection>,
> = pg_connection_read(self).await?;
DieselPaymentIntent::find_by_global_id(&conn, id)
.await
.map_err(|er| {
let new_err = diesel_error_to_data_error(*er.current_context());
er.change_context(new_err)
})
};
let diesel_payment_intent = match storage_scheme {
MerchantStorageScheme::PostgresOnly => database_call().await,
MerchantStorageScheme::RedisKv => {
let key = PartitionKey::GlobalPaymentId { id };
let field = format!("pi_{}", id.get_string_repr());
Box::pin(utils::try_redis_get_else_try_database_get(
async {
Box::pin(kv_wrapper::<DieselPaymentIntent, _, _>(
self,
KvOperation::<DieselPaymentIntent>::HGet(&field),
key,
))
.await?
.try_into_hget()
},
database_call,
))
.await
}
}?;
let merchant_id = diesel_payment_intent.merchant_id.clone();
@ -391,7 +539,7 @@ impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
state,
diesel_payment_intent,
merchant_key_store.key.get_inner(),
merchant_id.to_owned().into(),
merchant_id.into(),
)
.await
.change_context(StorageError::DecryptionError)
@ -523,7 +671,68 @@ impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
.await
}
MerchantStorageScheme::RedisKv => {
todo!()
let lookup_id = format!(
"pi_merchant_reference_{}_{}",
profile_id.get_string_repr(),
merchant_reference_id.get_string_repr()
);
let lookup = fallback_reverse_lookup_not_found!(
self.get_lookup_by_lookup_id(&lookup_id, *storage_scheme)
.await,
self.router_store
.find_payment_intent_by_merchant_reference_id_profile_id(
state,
merchant_reference_id,
profile_id,
merchant_key_store,
storage_scheme,
)
.await
);
let key = PartitionKey::CombinationKey {
combination: &lookup.pk_id,
};
let database_call = || async {
let conn = pg_connection_read(self).await?;
DieselPaymentIntent::find_by_merchant_reference_id_profile_id(
&conn,
merchant_reference_id,
profile_id,
)
.await
.map_err(|er| {
let new_err = diesel_error_to_data_error(*er.current_context());
er.change_context(new_err)
})
};
let diesel_payment_intent = Box::pin(utils::try_redis_get_else_try_database_get(
async {
Box::pin(kv_wrapper::<DieselPaymentIntent, _, _>(
self,
KvOperation::<DieselPaymentIntent>::HGet(&lookup.sk_id),
key,
))
.await?
.try_into_hget()
},
database_call,
))
.await?;
let merchant_id = diesel_payment_intent.merchant_id.clone();
PaymentIntent::convert_back(
state,
diesel_payment_intent,
merchant_key_store.key.get_inner(),
merchant_id.into(),
)
.await
.change_context(StorageError::DecryptionError)
}
}
}
@ -607,9 +816,8 @@ impl<T: DatabaseStore> PaymentIntentInterface for crate::RouterStore<T> {
_storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PaymentIntent, StorageError> {
let conn = pg_connection_write(self).await?;
let diesel_payment_intent_update =
diesel_models::PaymentIntentUpdateInternal::try_from(payment_intent)
.change_context(StorageError::DeserializationFailed)?;
let diesel_payment_intent_update = PaymentIntentUpdateInternal::try_from(payment_intent)
.change_context(StorageError::DeserializationFailed)?;
let diesel_payment_intent = this
.convert()
.await

View File

@ -55,6 +55,10 @@ pub enum PartitionKey<'a> {
GlobalId {
id: &'a str,
},
#[cfg(feature = "v2")]
GlobalPaymentId {
id: &'a common_utils::id_type::GlobalPaymentId,
},
}
// PartitionKey::MerchantIdPaymentId {merchant_id, payment_id}
impl std::fmt::Display for PartitionKey<'_> {
@ -108,7 +112,11 @@ impl std::fmt::Display for PartitionKey<'_> {
)),
#[cfg(feature = "v2")]
PartitionKey::GlobalId { id } => f.write_str(&format!("cust_{id}",)),
PartitionKey::GlobalId { id } => f.write_str(&format!("global_cust_{id}",)),
#[cfg(feature = "v2")]
PartitionKey::GlobalPaymentId { id } => {
f.write_str(&format!("global_payment_{}", id.get_string_repr()))
}
}
}
}