diff --git a/Cargo.lock b/Cargo.lock index 03cba78971..6ded306fd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2600,6 +2600,7 @@ dependencies = [ "common_utils", "error-stack", "fred", + "futures", "router_env", "serde", "thiserror", diff --git a/crates/api_models/src/payments.rs b/crates/api_models/src/payments.rs index f4d2131a80..643b14c37c 100644 --- a/crates/api_models/src/payments.rs +++ b/crates/api_models/src/payments.rs @@ -249,7 +249,7 @@ impl Default for PaymentMethod { pub enum PaymentIdType { PaymentIntentId(String), ConnectorTransactionId(String), - PaymentTxnId(String), + PaymentAttemptId(String), } impl Default for PaymentIdType { diff --git a/crates/drainer/src/lib.rs b/crates/drainer/src/lib.rs index 2e881b813b..f52fbd1820 100644 --- a/crates/drainer/src/lib.rs +++ b/crates/drainer/src/lib.rs @@ -25,7 +25,7 @@ async fn drainer_handler( stream_index: u8, max_read_count: u64, ) -> errors::DrainerResult<()> { - let stream_name = utils::get_drainer_stream(store.clone(), stream_index); + let stream_name = utils::get_drainer_stream_name(store.clone(), stream_index); let drainer_result = drainer(store.clone(), max_read_count, stream_name.as_str()).await; if let Err(_e) = drainer_result { @@ -70,6 +70,9 @@ async fn drainer( kv::Insertable::PaymentAttempt(a) => { macro_util::handle_resp!(a.insert(&conn).await, "ins", "pa") } + kv::Insertable::Refund(a) => { + macro_util::handle_resp!(a.insert(&conn).await, "ins", "ref") + } }, kv::DBOperation::Update { updatable } => match updatable { kv::Updateable::PaymentIntentUpdate(a) => { @@ -78,6 +81,9 @@ async fn drainer( kv::Updateable::PaymentAttemptUpdate(a) => { macro_util::handle_resp!(a.orig.update(&conn, a.update_data).await, "up", "pa") } + kv::Updateable::RefundUpdate(a) => { + macro_util::handle_resp!(a.orig.update(&conn, a.update_data).await, "up", "ref") + } }, kv::DBOperation::Delete => todo!(), }; diff --git a/crates/drainer/src/utils.rs b/crates/drainer/src/utils.rs index c931a1bbfd..44b76659b2 100644 --- a/crates/drainer/src/utils.rs +++ b/crates/drainer/src/utils.rs @@ -113,9 +113,9 @@ pub fn increment_stream_index(index: u8, total_streams: u8) -> u8 { } pub(crate) fn get_stream_key_flag(store: Arc, stream_index: u8) -> String { - format!("{}_in_use", get_drainer_stream(store, stream_index)) + format!("{}_in_use", get_drainer_stream_name(store, stream_index)) } -pub(crate) fn get_drainer_stream(store: Arc, stream_index: u8) -> String { - store.drainer_stream(format!("shard_{}", stream_index).as_str()) +pub(crate) fn get_drainer_stream_name(store: Arc, stream_index: u8) -> String { + store.get_drainer_stream_name(format!("shard_{}", stream_index).as_str()) } diff --git a/crates/redis_interface/Cargo.toml b/crates/redis_interface/Cargo.toml index 2e10c2950e..59f91e2913 100644 --- a/crates/redis_interface/Cargo.toml +++ b/crates/redis_interface/Cargo.toml @@ -10,6 +10,7 @@ license = "Apache-2.0" [dependencies] error-stack = "0.2.4" fred = { version = "5.2.0", features = ["metrics", "partial-tracing"] } +futures = "0.3" serde = { version = "1.0.149", features = ["derive"] } thiserror = "1.0.37" @@ -18,4 +19,4 @@ common_utils = { version = "0.1.0", path = "../common_utils" } router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] } [dev-dependencies] -tokio = { version = "1.23.0", features = ["macros", "rt-multi-thread"] } \ No newline at end of file +tokio = { version = "1.23.0", features = ["macros", "rt-multi-thread"] } diff --git a/crates/redis_interface/src/commands.rs b/crates/redis_interface/src/commands.rs index ef39f59653..7e733473df 100644 --- a/crates/redis_interface/src/commands.rs +++ b/crates/redis_interface/src/commands.rs @@ -10,7 +10,7 @@ use std::fmt::Debug; use common_utils::{ errors::CustomResult, - ext_traits::{ByteSliceExt, Encode}, + ext_traits::{ByteSliceExt, Encode, StringExt}, }; use error_stack::{IntoReport, ResultExt}; use fred::{ @@ -20,7 +20,8 @@ use fred::{ RedisKey, RedisMap, RedisValue, SetOptions, XCap, XReadResponse, }, }; -use router_env::{tracing, tracing::instrument}; +use futures::StreamExt; +use router_env::{logger, tracing, tracing::instrument}; use crate::{ errors, @@ -245,6 +246,56 @@ impl super::RedisConnectionPool { .await } + #[instrument(level = "DEBUG", skip(self))] + pub async fn hscan( + &self, + key: &str, + pattern: &str, + count: Option, + ) -> CustomResult, errors::RedisError> { + Ok(self + .pool + .hscan::<&str, &str>(key, pattern, count) + .filter_map(|value| async move { + match value { + Ok(mut v) => { + let v = v.take_results()?; + + let v: Vec = + v.iter().filter_map(|(_, val)| val.as_string()).collect(); + Some(futures::stream::iter(v)) + } + Err(err) => { + logger::error!(?err); + None + } + } + }) + .flatten() + .collect::>() + .await) + } + + #[instrument(level = "DEBUG", skip(self))] + pub async fn hscan_and_deserialize( + &self, + key: &str, + pattern: &str, + count: Option, + ) -> CustomResult, errors::RedisError> + where + T: serde::de::DeserializeOwned, + { + let redis_results = self.hscan(key, pattern, count).await?; + Ok(redis_results + .iter() + .filter_map(|v| { + let r: T = v.parse_struct(std::any::type_name::()).ok()?; + Some(r) + }) + .collect()) + } + #[instrument(level = "DEBUG", skip(self))] pub async fn get_hash_field( &self, diff --git a/crates/router/src/core/errors/utils.rs b/crates/router/src/core/errors/utils.rs index a7bac854fb..80806bf4a6 100644 --- a/crates/router/src/core/errors/utils.rs +++ b/crates/router/src/core/errors/utils.rs @@ -126,3 +126,18 @@ impl ConnectorErrorExt for error_stack::Report { self.change_context(errors::ApiErrorResponse::PaymentAuthorizationFailed { data }) } } + +pub(crate) trait RedisErrorExt { + fn to_redis_failed_response(self, key: &str) -> error_stack::Report; +} + +impl RedisErrorExt for error_stack::Report { + fn to_redis_failed_response(self, key: &str) -> error_stack::Report { + match self.current_context() { + errors::RedisError::NotFound => self.change_context( + errors::StorageError::ValueNotFound(format!("Data does not exist for key {key}",)), + ), + _ => self.change_context(errors::StorageError::KVError), + } + } +} diff --git a/crates/router/src/core/payments.rs b/crates/router/src/core/payments.rs index 337a0c8400..ffc1bc4012 100644 --- a/crates/router/src/core/payments.rs +++ b/crates/router/src/core/payments.rs @@ -98,7 +98,7 @@ where .make_pm_data( state, payment_data.payment_attempt.payment_method, - &payment_data.payment_attempt.txn_id, + &payment_data.payment_attempt.attempt_id, &payment_data.payment_attempt, &payment_data.payment_method_data, &payment_data.token, @@ -594,7 +594,7 @@ pub async fn add_process_sync_task( force_sync: true, merchant_id: Some(payment_attempt.merchant_id.clone()), - resource_id: api::PaymentIdType::PaymentTxnId(payment_attempt.txn_id.clone()), + resource_id: api::PaymentIdType::PaymentAttemptId(payment_attempt.attempt_id.clone()), param: None, connector: None, }; @@ -603,7 +603,7 @@ pub async fn add_process_sync_task( let process_tracker_id = pt_utils::get_process_tracker_id( runner, task, - &payment_attempt.txn_id, + &payment_attempt.attempt_id, &payment_attempt.merchant_id, ); let process_tracker_entry = diff --git a/crates/router/src/core/payments/helpers.rs b/crates/router/src/core/payments/helpers.rs index 8ce4becbca..53ca72ae37 100644 --- a/crates/router/src/core/payments/helpers.rs +++ b/crates/router/src/core/payments/helpers.rs @@ -323,7 +323,7 @@ pub fn create_startpay_url( server.base_url, payment_intent.payment_id, payment_intent.merchant_id, - payment_attempt.txn_id + payment_attempt.attempt_id ) } diff --git a/crates/router/src/core/payments/operations/payment_cancel.rs b/crates/router/src/core/payments/operations/payment_cancel.rs index 6286cc59a1..030e879601 100644 --- a/crates/router/src/core/payments/operations/payment_cancel.rs +++ b/crates/router/src/core/payments/operations/payment_cancel.rs @@ -67,7 +67,7 @@ impl GetTracker, api::PaymentsCancelRequest> .find_connector_response_by_payment_id_merchant_id_txn_id( &payment_attempt.payment_id, &payment_attempt.merchant_id, - &payment_attempt.txn_id, + &payment_attempt.attempt_id, storage_scheme, ) .await diff --git a/crates/router/src/core/payments/operations/payment_capture.rs b/crates/router/src/core/payments/operations/payment_capture.rs index d12856aefd..bfff5627bd 100644 --- a/crates/router/src/core/payments/operations/payment_capture.rs +++ b/crates/router/src/core/payments/operations/payment_capture.rs @@ -89,7 +89,7 @@ impl GetTracker, api::PaymentsCaptu .find_connector_response_by_payment_id_merchant_id_txn_id( &payment_attempt.payment_id, &payment_attempt.merchant_id, - &payment_attempt.txn_id, + &payment_attempt.attempt_id, storage_scheme, ) .await diff --git a/crates/router/src/core/payments/operations/payment_confirm.rs b/crates/router/src/core/payments/operations/payment_confirm.rs index 55550948c0..c77c1b1dd4 100644 --- a/crates/router/src/core/payments/operations/payment_confirm.rs +++ b/crates/router/src/core/payments/operations/payment_confirm.rs @@ -97,7 +97,7 @@ impl GetTracker, api::PaymentsRequest> for Pa .find_connector_response_by_payment_id_merchant_id_txn_id( &payment_attempt.payment_id, &payment_attempt.merchant_id, - &payment_attempt.txn_id, + &payment_attempt.attempt_id, storage_scheme, ) .await diff --git a/crates/router/src/core/payments/operations/payment_create.rs b/crates/router/src/core/payments/operations/payment_create.rs index 91eae8f7cd..926e12eb43 100644 --- a/crates/router/src/core/payments/operations/payment_create.rs +++ b/crates/router/src/core/payments/operations/payment_create.rs @@ -434,7 +434,7 @@ impl PaymentCreate { storage::PaymentAttemptNew { payment_id: payment_id.to_string(), merchant_id: merchant_id.to_string(), - txn_id: Uuid::new_v4().to_string(), + attempt_id: Uuid::new_v4().to_string(), status, amount: amount.into(), currency, @@ -495,7 +495,7 @@ impl PaymentCreate { storage::ConnectorResponseNew { payment_id: payment_attempt.payment_id.clone(), merchant_id: payment_attempt.merchant_id.clone(), - txn_id: payment_attempt.txn_id.clone(), + txn_id: payment_attempt.attempt_id.clone(), created_at: payment_attempt.created_at, modified_at: payment_attempt.modified_at, connector_name: payment_attempt.connector.clone(), diff --git a/crates/router/src/core/payments/operations/payment_method_validate.rs b/crates/router/src/core/payments/operations/payment_method_validate.rs index 56291719c1..bc3aa5e89b 100644 --- a/crates/router/src/core/payments/operations/payment_method_validate.rs +++ b/crates/router/src/core/payments/operations/payment_method_validate.rs @@ -285,7 +285,7 @@ impl PaymentMethodValidate { storage::PaymentAttemptNew { payment_id: payment_id.to_string(), merchant_id: merchant_id.to_string(), - txn_id: Uuid::new_v4().to_string(), + attempt_id: Uuid::new_v4().to_string(), status, // Amount & Currency will be zero in this case amount: 0, diff --git a/crates/router/src/core/payments/operations/payment_session.rs b/crates/router/src/core/payments/operations/payment_session.rs index 1f68feeb16..b673f7c7dd 100644 --- a/crates/router/src/core/payments/operations/payment_session.rs +++ b/crates/router/src/core/payments/operations/payment_session.rs @@ -106,7 +106,7 @@ impl GetTracker, api::PaymentsSessionRequest> .find_connector_response_by_payment_id_merchant_id_txn_id( &payment_intent.payment_id, &payment_intent.merchant_id, - &payment_attempt.txn_id, + &payment_attempt.attempt_id, storage_scheme, ) .await diff --git a/crates/router/src/core/payments/operations/payment_start.rs b/crates/router/src/core/payments/operations/payment_start.rs index 4afc084372..b9c7dff01d 100644 --- a/crates/router/src/core/payments/operations/payment_start.rs +++ b/crates/router/src/core/payments/operations/payment_start.rs @@ -103,7 +103,7 @@ impl GetTracker, api::PaymentsStartRequest> f .find_connector_response_by_payment_id_merchant_id_txn_id( &payment_intent.payment_id, &payment_intent.merchant_id, - &payment_attempt.txn_id, + &payment_attempt.attempt_id, storage_scheme, ) .await diff --git a/crates/router/src/core/payments/operations/payment_status.rs b/crates/router/src/core/payments/operations/payment_status.rs index 68b5c03db1..cfa69bc1d1 100644 --- a/crates/router/src/core/payments/operations/payment_status.rs +++ b/crates/router/src/core/payments/operations/payment_status.rs @@ -215,8 +215,8 @@ async fn get_tracker_for_sync< api::PaymentIdType::ConnectorTransactionId(ref id) => { db.find_payment_attempt_by_merchant_id_connector_txn_id(merchant_id, id, storage_scheme) } - api::PaymentIdType::PaymentTxnId(ref id) => { - db.find_payment_attempt_by_merchant_id_txn_id(merchant_id, id, storage_scheme) + api::PaymentIdType::PaymentAttemptId(ref id) => { + db.find_payment_attempt_by_merchant_id_attempt_id(merchant_id, id, storage_scheme) } } .await @@ -233,7 +233,7 @@ async fn get_tracker_for_sync< .find_connector_response_by_payment_id_merchant_id_txn_id( &payment_intent.payment_id, &payment_intent.merchant_id, - &payment_attempt.txn_id, + &payment_attempt.attempt_id, storage_scheme, ) .await diff --git a/crates/router/src/core/payments/operations/payment_update.rs b/crates/router/src/core/payments/operations/payment_update.rs index 4340dee17c..e828b275e7 100644 --- a/crates/router/src/core/payments/operations/payment_update.rs +++ b/crates/router/src/core/payments/operations/payment_update.rs @@ -116,7 +116,7 @@ impl GetTracker, api::PaymentsRequest> for Pa .find_connector_response_by_payment_id_merchant_id_txn_id( &payment_intent.payment_id, &payment_intent.merchant_id, - &payment_attempt.txn_id, + &payment_attempt.attempt_id, storage_scheme, ) .await diff --git a/crates/router/src/core/refunds.rs b/crates/router/src/core/refunds.rs index bef2c60964..1d7eb7c941 100644 --- a/crates/router/src/core/refunds.rs +++ b/crates/router/src/core/refunds.rs @@ -490,6 +490,7 @@ fn mk_new_refund( refund_status: enums::RefundStatus::Pending, metadata: request.metadata, description: request.reason, + attempt_id: payment_attempt.attempt_id.clone(), ..storage::RefundNew::default() } } diff --git a/crates/router/src/core/utils.rs b/crates/router/src/core/utils.rs index 9534cc54d6..31b559ba13 100644 --- a/crates/router/src/core/utils.rs +++ b/crates/router/src/core/utils.rs @@ -51,9 +51,11 @@ pub async fn construct_refund_router_data<'a, F>( .get_required_value("payment_method_type")?; let payment_method_data = match payment_method_data.cloned() { Some(v) => v, - None => helpers::Vault::get_payment_method_data_from_locker(state, &payment_attempt.txn_id) - .await? - .get_required_value("payment_method_data")?, + None => { + helpers::Vault::get_payment_method_data_from_locker(state, &payment_attempt.attempt_id) + .await? + .get_required_value("payment_method_data")? + } }; let router_data = types::RouterData { diff --git a/crates/router/src/db.rs b/crates/router/src/db.rs index 1ca80680b9..767fee28bc 100644 --- a/crates/router/src/db.rs +++ b/crates/router/src/db.rs @@ -14,6 +14,7 @@ pub mod payment_method; pub mod process_tracker; pub mod queue; pub mod refund; +pub mod reverse_lookup; pub mod temp_card; use std::sync::Arc; @@ -51,6 +52,7 @@ pub trait StorageInterface: + queue::QueueInterface + ephemeral_key::EphemeralKeyInterface + connector_response::ConnectorResponseInterface + + reverse_lookup::ReverseLookupInterface + 'static { async fn close(&mut self) {} diff --git a/crates/router/src/db/payment_attempt.rs b/crates/router/src/db/payment_attempt.rs index e6cb06fc70..8afe83519f 100644 --- a/crates/router/src/db/payment_attempt.rs +++ b/crates/router/src/db/payment_attempt.rs @@ -48,10 +48,10 @@ pub trait PaymentAttemptInterface { storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; - async fn find_payment_attempt_by_merchant_id_txn_id( + async fn find_payment_attempt_by_merchant_id_attempt_id( &self, merchant_id: &str, - txn_id: &str, + attempt_id: &str, storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; } @@ -164,15 +164,15 @@ mod storage { .into_report() } - async fn find_payment_attempt_by_merchant_id_txn_id( + async fn find_payment_attempt_by_merchant_id_attempt_id( &self, merchant_id: &str, - txn_id: &str, + attempt_id: &str, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let conn = pg_connection(&self.master_pool).await; - PaymentAttempt::find_by_merchant_id_transaction_id(&conn, merchant_id, txn_id) + PaymentAttempt::find_by_merchant_id_attempt_id(&conn, merchant_id, attempt_id) .await .map_err(Into::into) .into_report() @@ -182,10 +182,10 @@ mod storage { #[async_trait::async_trait] impl PaymentAttemptInterface for MockDb { - async fn find_payment_attempt_by_merchant_id_txn_id( + async fn find_payment_attempt_by_merchant_id_attempt_id( &self, _merchant_id: &str, - _txn_id: &str, + _attempt_id: &str, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { todo!() @@ -214,7 +214,7 @@ impl PaymentAttemptInterface for MockDb { id, payment_id: payment_attempt.payment_id, merchant_id: payment_attempt.merchant_id, - txn_id: payment_attempt.txn_id, + attempt_id: payment_attempt.attempt_id, status: payment_attempt.status, amount: payment_attempt.amount, currency: payment_attempt.currency, @@ -312,9 +312,10 @@ mod storage { use super::PaymentAttemptInterface; use crate::{ connection::pg_connection, - core::errors::{self, CustomResult}, + core::errors::{self, utils::RedisErrorExt, CustomResult}, + db::reverse_lookup::ReverseLookupInterface, services::Store, - types::storage::{enums, kv, payment_attempt::*}, + types::storage::{enums, kv, payment_attempt::*, ReverseLookupNew}, utils::storage_partitioning::KvStorePartition, }; @@ -338,7 +339,7 @@ mod storage { enums::MerchantStorageScheme::RedisKv => { let key = format!( "{}_{}", - payment_attempt.payment_id, payment_attempt.merchant_id + payment_attempt.merchant_id, payment_attempt.payment_id ); // TODO: need to add an application generated payment attempt id to distinguish between multiple attempts for the same payment id // Check for database presence as well Maybe use a read replica here ? @@ -346,7 +347,7 @@ mod storage { id: 0i32, payment_id: payment_attempt.payment_id.clone(), merchant_id: payment_attempt.merchant_id.clone(), - txn_id: payment_attempt.txn_id.clone(), + attempt_id: payment_attempt.attempt_id.clone(), status: payment_attempt.status, amount: payment_attempt.amount, currency: payment_attempt.currency, @@ -376,9 +377,10 @@ mod storage { error_code: payment_attempt.error_code.clone(), }; + let field = format!("pa_{}", created_attempt.attempt_id); match self .redis_conn - .serialize_and_set_hash_field_if_not_exist(&key, "pa", &created_attempt) + .serialize_and_set_hash_field_if_not_exist(&key, &field, &created_attempt) .await { Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue( @@ -386,12 +388,29 @@ mod storage { )) .into_report(), Ok(HsetnxReply::KeySet) => { + let conn = pg_connection(&self.master_pool).await; + + //Reverse lookup for attempt_id + ReverseLookupNew { + lookup_id: format!( + "{}_{}", + &created_attempt.merchant_id, &created_attempt.attempt_id, + ), + pk_id: key, + sk_id: field, + source: "payment_attempt".to_string(), + } + .insert(&conn) + .await + .map_err(Into::::into) + .into_report()?; + let redis_entry = kv::TypedSql { op: kv::DBOperation::Insert { insertable: kv::Insertable::PaymentAttempt(payment_attempt), }, }; - let stream_name = self.drainer_stream(&PaymentAttempt::shard_key( + let stream_name = self.get_drainer_stream_name(&PaymentAttempt::shard_key( crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId { merchant_id: &created_attempt.merchant_id, payment_id: &created_attempt.payment_id, @@ -432,20 +451,42 @@ mod storage { } enums::MerchantStorageScheme::RedisKv => { - let key = format!("{}_{}", this.payment_id, this.merchant_id); + let key = format!("{}_{}", this.merchant_id, this.payment_id); let updated_attempt = payment_attempt.clone().apply_changeset(this.clone()); // Check for database presence as well Maybe use a read replica here ? let redis_value = serde_json::to_string(&updated_attempt) .into_report() .change_context(errors::StorageError::KVError)?; + let field = format!("pa_{}", updated_attempt.attempt_id); let updated_attempt = self .redis_conn - .set_hash_fields(&key, ("pa", &redis_value)) + .set_hash_fields(&key, (&field, &redis_value)) .await .map(|_| updated_attempt) .change_context(errors::StorageError::KVError)?; + let conn = pg_connection(&self.master_pool).await; + // Reverse lookup for connector_transaction_id + if let Some(ref connector_transaction_id) = + updated_attempt.connector_transaction_id + { + let field = format!("pa_{}", updated_attempt.attempt_id); + ReverseLookupNew { + lookup_id: format!( + "{}_{}", + &updated_attempt.merchant_id, connector_transaction_id + ), + pk_id: key.clone(), + sk_id: field.clone(), + source: "payment_attempt".to_string(), + } + .insert(&conn) + .await + .map_err(Into::::into) + .into_report()?; + } + let redis_entry = kv::TypedSql { op: kv::DBOperation::Update { updatable: kv::Updateable::PaymentAttemptUpdate( @@ -457,7 +498,7 @@ mod storage { }, }; - let stream_name = self.drainer_stream(&PaymentAttempt::shard_key( + let stream_name = self.get_drainer_stream_name(&PaymentAttempt::shard_key( crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId { merchant_id: &updated_attempt.merchant_id, payment_id: &updated_attempt.payment_id, @@ -495,21 +536,20 @@ mod storage { } enums::MerchantStorageScheme::RedisKv => { - let key = format!("{}_{}", payment_id, merchant_id); + let key = format!("{}_{}", merchant_id, payment_id); + let lookup = self + .get_lookup_by_lookup_id(&key) + .await + .map_err(Into::::into) + .into_report()?; self.redis_conn .get_hash_field_and_deserialize::( &key, - "pa", + &lookup.sk_id, "PaymentAttempt", ) .await - .map_err(|error| match error.current_context() { - errors::RedisError::NotFound => errors::StorageError::ValueNotFound( - format!("Payment Attempt does not exist for {}", key), - ) - .into(), - _ => error.change_context(errors::StorageError::KVError), - }) + .map_err(|error| error.to_redis_failed_response(&key)) // Check for database presence as well Maybe use a read replica here ? } } @@ -518,28 +558,26 @@ mod storage { async fn find_payment_attempt_by_transaction_id_payment_id_merchant_id( &self, transaction_id: &str, - payment_id: &str, + _payment_id: &str, merchant_id: &str, - storage_scheme: enums::MerchantStorageScheme, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { // We assume that PaymentAttempt <=> PaymentIntent is a one-to-one relation for now - self.find_payment_attempt_by_payment_id_merchant_id( - payment_id, - merchant_id, - storage_scheme, - ) - .await - .and_then(|attempt| { - if attempt.connector_transaction_id.as_deref() == Some(transaction_id) { - Ok(attempt) - } else { - Err(errors::StorageError::ValueNotFound(format!( - "Successful payment attempt does not exist for {}_{}", - payment_id, merchant_id - ))) - .into_report() - } - }) + let lookup_id = format!("{merchant_id}_{transaction_id}"); + let lookup = self + .get_lookup_by_lookup_id(&lookup_id) + .await + .map_err(Into::::into) + .into_report()?; + let key = &lookup.pk_id; + self.redis_conn + .get_hash_field_and_deserialize::( + key, + &lookup.sk_id, + "PaymentAttempt", + ) + .await + .map_err(|error| error.to_redis_failed_response(key)) } async fn find_payment_attempt_last_successful_attempt_by_payment_id_merchant_id( @@ -586,28 +624,57 @@ mod storage { } enums::MerchantStorageScheme::RedisKv => { - Err(errors::StorageError::KVError).into_report() + let lookup_id = format!("{merchant_id}_{connector_txn_id}"); + let lookup = self + .get_lookup_by_lookup_id(&lookup_id) + .await + .map_err(Into::::into) + .into_report()?; + + let key = &lookup.pk_id; + self.redis_conn + .get_hash_field_and_deserialize::( + key, + &lookup.sk_id, + "PaymentAttempt", + ) + .await + .map_err(|error| error.to_redis_failed_response(key)) } } } - async fn find_payment_attempt_by_merchant_id_txn_id( + async fn find_payment_attempt_by_merchant_id_attempt_id( &self, merchant_id: &str, - txn_id: &str, + attempt_id: &str, storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => { let conn = pg_connection(&self.master_pool).await; - PaymentAttempt::find_by_merchant_id_transaction_id(&conn, merchant_id, txn_id) + PaymentAttempt::find_by_merchant_id_attempt_id(&conn, merchant_id, attempt_id) .await .map_err(Into::into) .into_report() } enums::MerchantStorageScheme::RedisKv => { - Err(errors::StorageError::KVError).into_report() + let lookup_id = format!("{merchant_id}_{attempt_id}"); + let lookup = self + .get_lookup_by_lookup_id(&lookup_id) + .await + .map_err(Into::::into) + .into_report()?; + let key = &lookup.pk_id; + self.redis_conn + .get_hash_field_and_deserialize::( + key, + &lookup.sk_id, + "PaymentAttempt", + ) + .await + .map_err(|error| error.to_redis_failed_response(key)) } } } diff --git a/crates/router/src/db/payment_intent.rs b/crates/router/src/db/payment_intent.rs index 494337714f..0dcad4d7e7 100644 --- a/crates/router/src/db/payment_intent.rs +++ b/crates/router/src/db/payment_intent.rs @@ -69,7 +69,7 @@ mod storage { } enums::MerchantStorageScheme::RedisKv => { - let key = format!("{}_{}", new.payment_id, new.merchant_id); + let key = format!("{}_{}", new.merchant_id, new.payment_id); let created_intent = PaymentIntent { id: 0i32, payment_id: new.payment_id.clone(), @@ -110,7 +110,7 @@ mod storage { insertable: kv::Insertable::PaymentIntent(new), }, }; - let stream_name = self.drainer_stream(&PaymentIntent::shard_key( + let stream_name = self.get_drainer_stream_name(&PaymentIntent::shard_key( crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId { merchant_id: &created_intent.merchant_id, payment_id: &created_intent.payment_id, @@ -151,7 +151,7 @@ mod storage { } enums::MerchantStorageScheme::RedisKv => { - let key = format!("{}_{}", this.payment_id, this.merchant_id); + let key = format!("{}_{}", this.merchant_id, this.payment_id); let updated_intent = payment_intent.clone().apply_changeset(this.clone()); // Check for database presence as well Maybe use a read replica here ? @@ -176,7 +176,7 @@ mod storage { }, }; - let stream_name = self.drainer_stream(&PaymentIntent::shard_key( + let stream_name = self.get_drainer_stream_name(&PaymentIntent::shard_key( crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId { merchant_id: &updated_intent.merchant_id, payment_id: &updated_intent.payment_id, @@ -214,7 +214,7 @@ mod storage { } enums::MerchantStorageScheme::RedisKv => { - let key = format!("{}_{}", payment_id, merchant_id); + let key = format!("{}_{}", merchant_id, payment_id); self.redis_conn .get_hash_field_and_deserialize::( &key, diff --git a/crates/router/src/db/refund.rs b/crates/router/src/db/refund.rs index 967720c3e4..9215010455 100644 --- a/crates/router/src/db/refund.rs +++ b/crates/router/src/db/refund.rs @@ -1,11 +1,10 @@ -use error_stack::{IntoReport, Report}; +use error_stack::Report; use storage_models::errors::DatabaseError; use super::MockDb; use crate::{ - connection::pg_connection, core::errors::{self, CustomResult, StorageError}, - types::storage::{self, enums}, + types::storage::{self as storage_types, enums}, }; #[async_trait::async_trait] @@ -15,14 +14,14 @@ pub trait RefundInterface { internal_reference_id: &str, merchant_id: &str, storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult; + ) -> CustomResult; async fn find_refund_by_payment_id_merchant_id( &self, payment_id: &str, merchant_id: &str, storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult, errors::StorageError>; + ) -> CustomResult, errors::StorageError>; // async fn find_refund_by_payment_id_merchant_id_refund_id( // &self, @@ -36,117 +35,513 @@ pub trait RefundInterface { merchant_id: &str, refund_id: &str, storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult; + ) -> CustomResult; async fn update_refund( &self, - this: storage::Refund, - refund: storage::RefundUpdate, + this: storage_types::Refund, + refund: storage_types::RefundUpdate, storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult; + ) -> CustomResult; async fn find_refund_by_merchant_id_transaction_id( &self, merchant_id: &str, txn_id: &str, storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult, errors::StorageError>; + ) -> CustomResult, errors::StorageError>; async fn insert_refund( &self, - new: storage::RefundNew, + new: storage_types::RefundNew, storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult; + ) -> CustomResult; } -#[async_trait::async_trait] -impl RefundInterface for super::Store { - async fn find_refund_by_internal_reference_id_merchant_id( - &self, - internal_reference_id: &str, - merchant_id: &str, - _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await; - storage::Refund::find_by_internal_reference_id_merchant_id( - &conn, - internal_reference_id, - merchant_id, - ) - .await - .map_err(Into::into) - .into_report() - } +#[cfg(not(feature = "kv_store"))] +mod storage { + use error_stack::IntoReport; - async fn insert_refund( - &self, - new: storage::RefundNew, - _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await; - new.insert(&conn).await.map_err(Into::into).into_report() - } - async fn find_refund_by_merchant_id_transaction_id( - &self, - merchant_id: &str, - txn_id: &str, - _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let conn = pg_connection(&self.master_pool).await; - storage::Refund::find_by_merchant_id_transaction_id(&conn, merchant_id, txn_id) + use super::RefundInterface; + use crate::{ + connection::pg_connection, + core::errors::{self, CustomResult}, + services::Store, + types::storage::{self as storage_types, enums}, + }; + + #[async_trait::async_trait] + impl RefundInterface for Store { + async fn find_refund_by_internal_reference_id_merchant_id( + &self, + internal_reference_id: &str, + merchant_id: &str, + _storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection(&self.master_pool).await; + storage_types::Refund::find_by_internal_reference_id_merchant_id( + &conn, + internal_reference_id, + merchant_id, + ) .await .map_err(Into::into) .into_report() + } + + async fn insert_refund( + &self, + new: storage_types::RefundNew, + _storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection(&self.master_pool).await; + new.insert(&conn).await.map_err(Into::into).into_report() + } + + async fn find_refund_by_merchant_id_transaction_id( + &self, + merchant_id: &str, + txn_id: &str, + _storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult, errors::StorageError> { + let conn = pg_connection(&self.master_pool).await; + storage_types::Refund::find_by_merchant_id_transaction_id(&conn, merchant_id, txn_id) + .await + .map_err(Into::into) + .into_report() + } + + async fn update_refund( + &self, + this: storage_types::Refund, + refund: storage_types::RefundUpdate, + _storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection(&self.master_pool).await; + this.update(&conn, refund) + .await + .map_err(Into::into) + .into_report() + } + + async fn find_refund_by_merchant_id_refund_id( + &self, + merchant_id: &str, + refund_id: &str, + _storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection(&self.master_pool).await; + storage_types::Refund::find_by_merchant_id_refund_id(&conn, merchant_id, refund_id) + .await + .map_err(Into::into) + .into_report() + } + + // async fn find_refund_by_payment_id_merchant_id_refund_id( + // &self, + // payment_id: &str, + // merchant_id: &str, + // refund_id: &str, + // ) -> CustomResult { + // let conn = pg_connection(&self.master_pool).await; + // Refund::find_by_payment_id_merchant_id_refund_id(&conn, payment_id, merchant_id, refund_id) + // .await + // } + + async fn find_refund_by_payment_id_merchant_id( + &self, + payment_id: &str, + merchant_id: &str, + _storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult, errors::StorageError> { + let conn = pg_connection(&self.master_pool).await; + storage_types::Refund::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id) + .await + .map_err(Into::into) + .into_report() + } } +} - async fn update_refund( - &self, - this: storage::Refund, - refund: storage::RefundUpdate, - _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await; - this.update(&conn, refund) - .await - .map_err(Into::into) - .into_report() - } +#[cfg(feature = "kv_store")] +mod storage { + use common_utils::date_time; + use error_stack::{IntoReport, ResultExt}; + use redis_interface::{HsetnxReply, RedisEntryId}; - async fn find_refund_by_merchant_id_refund_id( - &self, - merchant_id: &str, - refund_id: &str, - _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await; - storage::Refund::find_by_merchant_id_refund_id(&conn, merchant_id, refund_id) - .await - .map_err(Into::into) - .into_report() - } + use super::RefundInterface; + use crate::{ + connection::pg_connection, + core::errors::{self, utils::RedisErrorExt, CustomResult}, + db::reverse_lookup::ReverseLookupInterface, + logger, + services::Store, + types::storage::{self as storage_types, enums, kv}, + utils::{ + self, db_utils, + storage_partitioning::{KvStorePartition, PartitionKey}, + }, + }; + #[async_trait::async_trait] + impl RefundInterface for Store { + async fn find_refund_by_internal_reference_id_merchant_id( + &self, + internal_reference_id: &str, + merchant_id: &str, + storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + match storage_scheme { + enums::MerchantStorageScheme::PostgresOnly => { + let conn = pg_connection(&self.master_pool).await; + storage_types::Refund::find_by_internal_reference_id_merchant_id( + &conn, + internal_reference_id, + merchant_id, + ) + .await + .map_err(Into::into) + .into_report() + } + enums::MerchantStorageScheme::RedisKv => { + let lookup_id = format!("{}_{}", merchant_id, internal_reference_id); + let lookup = self + .get_lookup_by_lookup_id(&lookup_id) + .await + .map_err(Into::::into) + .into_report()?; - // async fn find_refund_by_payment_id_merchant_id_refund_id( - // &self, - // payment_id: &str, - // merchant_id: &str, - // refund_id: &str, - // ) -> CustomResult { - // let conn = pg_connection(&self.master_pool).await; - // Refund::find_by_payment_id_merchant_id_refund_id(&conn, payment_id, merchant_id, refund_id) - // .await - // } + let key = &lookup.pk_id; + self.redis_conn + .get_hash_field_and_deserialize::( + key, + &lookup.sk_id, + "Refund", + ) + .await + .map_err(|error| error.to_redis_failed_response(key)) + } + } + } - async fn find_refund_by_payment_id_merchant_id( - &self, - payment_id: &str, - merchant_id: &str, - _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let conn = pg_connection(&self.master_pool).await; - storage::Refund::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id) - .await - .map_err(Into::into) - .into_report() + async fn insert_refund( + &self, + new: storage_types::RefundNew, + storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + match storage_scheme { + enums::MerchantStorageScheme::PostgresOnly => { + let conn = pg_connection(&self.master_pool).await; + new.insert(&conn).await.map_err(Into::into).into_report() + } + enums::MerchantStorageScheme::RedisKv => { + let key = format!("{}_{}", new.merchant_id, new.payment_id); + // TODO: need to add an application generated payment attempt id to distinguish between multiple attempts for the same payment id + // Check for database presence as well Maybe use a read replica here ? + let created_refund = storage_types::Refund { + id: 0i32, + refund_id: new.refund_id.clone(), + merchant_id: new.merchant_id.clone(), + attempt_id: new.attempt_id.clone(), + internal_reference_id: new.internal_reference_id.clone(), + payment_id: new.payment_id.clone(), + transaction_id: new.transaction_id.clone(), + connector: new.connector.clone(), + pg_refund_id: new.pg_refund_id.clone(), + external_reference_id: new.external_reference_id.clone(), + refund_type: new.refund_type, + total_amount: new.total_amount, + currency: new.currency, + refund_amount: new.refund_amount, + refund_status: new.refund_status, + sent_to_gateway: new.sent_to_gateway, + refund_error_message: new.refund_error_message.clone(), + metadata: new.metadata.clone(), + refund_arn: new.refund_arn.clone(), + created_at: new.created_at.unwrap_or_else(date_time::now), + updated_at: new.created_at.unwrap_or_else(date_time::now), + description: new.description.clone(), + }; + + let field = format!( + "pa_{}_ref_{}", + &created_refund.attempt_id, &created_refund.refund_id + ); + match self + .redis_conn + .serialize_and_set_hash_field_if_not_exist(&key, &field, &created_refund) + .await + { + Ok(HsetnxReply::KeyNotSet) => { + Err(errors::StorageError::DuplicateValue(format!( + "Refund already exists refund_id: {}", + &created_refund.refund_id + ))) + .into_report() + } + Ok(HsetnxReply::KeySet) => { + let conn = pg_connection(&self.master_pool).await; + + let reverse_lookups = vec![ + storage_types::ReverseLookupNew { + sk_id: field.clone(), + lookup_id: format!( + "{}_{}", + created_refund.merchant_id, created_refund.refund_id + ), + pk_id: key.clone(), + source: "refund".to_string(), + }, + storage_types::ReverseLookupNew { + sk_id: field.clone(), + lookup_id: format!( + "{}_{}", + created_refund.merchant_id, created_refund.transaction_id + ), + pk_id: key.clone(), + source: "refund".to_string(), + }, + storage_types::ReverseLookupNew { + sk_id: field.clone(), + lookup_id: format!( + "{}_{}", + created_refund.merchant_id, + created_refund.internal_reference_id + ), + pk_id: key, + source: "refund".to_string(), + }, + ]; + storage_types::ReverseLookupNew::batch_insert(reverse_lookups, &conn) + .await + .change_context(errors::StorageError::KVError)?; + + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Insert { + insertable: kv::Insertable::Refund(new), + }, + }; + + let stream_name = + self.get_drainer_stream_name(&storage_types::Refund::shard_key( + PartitionKey::MerchantIdPaymentId { + merchant_id: &created_refund.merchant_id, + payment_id: &created_refund.payment_id, + }, + self.config.drainer_num_partitions, + )); + self.redis_conn + .stream_append_entry( + &stream_name, + &RedisEntryId::AutoGeneratedID, + redis_entry + .to_field_value_pairs() + .change_context(errors::StorageError::KVError)?, + ) + .await + .change_context(errors::StorageError::KVError)?; + Ok(created_refund) + } + Err(er) => Err(er).change_context(errors::StorageError::KVError), + } + } + } + } + + async fn find_refund_by_merchant_id_transaction_id( + &self, + merchant_id: &str, + txn_id: &str, + storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult, errors::StorageError> { + match storage_scheme { + enums::MerchantStorageScheme::PostgresOnly => { + let conn = pg_connection(&self.master_pool).await; + storage_types::Refund::find_by_merchant_id_transaction_id( + &conn, + merchant_id, + txn_id, + ) + .await + .map_err(Into::into) + .into_report() + } + enums::MerchantStorageScheme::RedisKv => { + let lookup_id = format!("{merchant_id}_{txn_id}"); + let lookup = match self.get_lookup_by_lookup_id(&lookup_id).await { + Ok(l) => l, + Err(err) => { + logger::error!(?err); + return Ok(vec![]); + } + }; + let key = &lookup.pk_id; + + let pattern = db_utils::generate_hscan_pattern_for_refund(&lookup.sk_id); + + self.redis_conn + .hscan_and_deserialize(key, &pattern, None) + .await + .change_context(errors::StorageError::KVError) + } + } + } + + async fn update_refund( + &self, + this: storage_types::Refund, + refund: storage_types::RefundUpdate, + storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + match storage_scheme { + enums::MerchantStorageScheme::PostgresOnly => { + let conn = pg_connection(&self.master_pool).await; + this.update(&conn, refund) + .await + .map_err(Into::into) + .into_report() + } + enums::MerchantStorageScheme::RedisKv => { + let key = format!("{}_{}", this.merchant_id, this.payment_id); + + let updated_refund = refund.clone().apply_changeset(this.clone()); + // Check for database presence as well Maybe use a read replica here ? + // TODO: Add a proper error for serialization failure + + let lookup = self + .get_lookup_by_lookup_id(&key) + .await + .map_err(Into::::into) + .into_report()?; + + let field = &lookup.sk_id; + + let redis_value = + utils::Encode::::encode_to_string_of_json( + &updated_refund, + ) + .change_context(errors::StorageError::KVError)?; + + self.redis_conn + .set_hash_fields(&key, (field, redis_value)) + .await + .change_context(errors::StorageError::KVError)?; + + let stream_name = + self.get_drainer_stream_name(&storage_types::Refund::shard_key( + PartitionKey::MerchantIdPaymentId { + merchant_id: &updated_refund.merchant_id, + payment_id: &updated_refund.payment_id, + }, + self.config.drainer_num_partitions, + )); + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Update { + updatable: kv::Updateable::RefundUpdate(kv::RefundUpdateMems { + orig: this, + update_data: refund, + }), + }, + }; + self.redis_conn + .stream_append_entry( + &stream_name, + &RedisEntryId::AutoGeneratedID, + redis_entry + .to_field_value_pairs() + .change_context(errors::StorageError::KVError)?, + ) + .await + .change_context(errors::StorageError::KVError)?; + Ok(updated_refund) + } + } + } + + async fn find_refund_by_merchant_id_refund_id( + &self, + merchant_id: &str, + refund_id: &str, + storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + match storage_scheme { + enums::MerchantStorageScheme::PostgresOnly => { + let conn = pg_connection(&self.master_pool).await; + storage_types::Refund::find_by_merchant_id_refund_id( + &conn, + merchant_id, + refund_id, + ) + .await + .map_err(Into::into) + .into_report() + } + enums::MerchantStorageScheme::RedisKv => { + let lookup_id = format!("{merchant_id}_{refund_id}"); + let lookup = self + .get_lookup_by_lookup_id(&lookup_id) + .await + .map_err(Into::::into) + .into_report()?; + + let key = &lookup.pk_id; + self.redis_conn + .get_hash_field_and_deserialize::( + key, + &lookup.sk_id, + "Refund", + ) + .await + .map_err(|error| error.to_redis_failed_response(key)) + } + } + } + + // async fn find_refund_by_payment_id_merchant_id_refund_id( + // &self, + // payment_id: &str, + // merchant_id: &str, + // refund_id: &str, + // ) -> CustomResult { + // let conn = pg_connection(&self.master_pool).await; + // Refund::find_by_payment_id_merchant_id_refund_id(&conn, payment_id, merchant_id, refund_id) + // .await + // } + + async fn find_refund_by_payment_id_merchant_id( + &self, + payment_id: &str, + merchant_id: &str, + storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult, errors::StorageError> { + match storage_scheme { + enums::MerchantStorageScheme::PostgresOnly => { + let conn = pg_connection(&self.master_pool).await; + storage_types::Refund::find_by_payment_id_merchant_id( + &conn, + payment_id, + merchant_id, + ) + .await + .map_err(Into::into) + .into_report() + } + enums::MerchantStorageScheme::RedisKv => { + let key = format!("{}_{}", merchant_id, payment_id); + let lookup = self + .get_lookup_by_lookup_id(&key) + .await + .map_err(Into::::into) + .into_report()?; + + let pattern = db_utils::generate_hscan_pattern_for_refund(&lookup.sk_id); + + self.redis_conn + .hscan_and_deserialize(&key, &pattern, None) + .await + .change_context(errors::StorageError::KVError) + } + } + } } } @@ -157,24 +552,25 @@ impl RefundInterface for MockDb { _internal_reference_id: &str, _merchant_id: &str, _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { + ) -> CustomResult { todo!() } async fn insert_refund( &self, - new: storage::RefundNew, + new: storage_types::RefundNew, _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { + ) -> CustomResult { let mut refunds = self.refunds.lock().await; let current_time = common_utils::date_time::now(); - let refund = storage::Refund { + let refund = storage_types::Refund { id: refunds.len() as i32, internal_reference_id: new.internal_reference_id, refund_id: new.refund_id, payment_id: new.payment_id, merchant_id: new.merchant_id, + attempt_id: new.attempt_id, transaction_id: new.transaction_id, connector: new.connector, pg_refund_id: new.pg_refund_id, @@ -187,7 +583,7 @@ impl RefundInterface for MockDb { sent_to_gateway: new.sent_to_gateway, refund_error_message: new.refund_error_message, metadata: new.metadata, - refund_arn: new.refund_arn, + refund_arn: new.refund_arn.clone(), created_at: new.created_at.unwrap_or(current_time), updated_at: current_time, description: new.description, @@ -200,7 +596,7 @@ impl RefundInterface for MockDb { merchant_id: &str, txn_id: &str, _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { + ) -> CustomResult, errors::StorageError> { let refunds = self.refunds.lock().await; Ok(refunds @@ -214,10 +610,10 @@ impl RefundInterface for MockDb { async fn update_refund( &self, - _this: storage::Refund, - _refund: storage::RefundUpdate, + _this: storage_types::Refund, + _refund: storage_types::RefundUpdate, _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { + ) -> CustomResult { todo!() } @@ -226,7 +622,7 @@ impl RefundInterface for MockDb { merchant_id: &str, refund_id: &str, _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult { + ) -> CustomResult { let refunds = self.refunds.lock().await; refunds @@ -245,7 +641,7 @@ impl RefundInterface for MockDb { _payment_id: &str, _merchant_id: &str, _storage_scheme: enums::MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { + ) -> CustomResult, errors::StorageError> { todo!() } } diff --git a/crates/router/src/db/reverse_lookup.rs b/crates/router/src/db/reverse_lookup.rs new file mode 100644 index 0000000000..3081f7b567 --- /dev/null +++ b/crates/router/src/db/reverse_lookup.rs @@ -0,0 +1,36 @@ +use storage_models::{errors, StorageResult}; + +use super::{MockDb, Store}; +use crate::{ + connection::pg_connection, + types::storage::reverse_lookup::{ReverseLookup, ReverseLookupNew}, +}; + +#[async_trait::async_trait] +pub trait ReverseLookupInterface { + async fn insert_reverse_lookup(&self, _new: ReverseLookupNew) -> StorageResult; + async fn get_lookup_by_lookup_id(&self, _id: &str) -> StorageResult; +} + +#[async_trait::async_trait] +impl ReverseLookupInterface for Store { + async fn insert_reverse_lookup(&self, new: ReverseLookupNew) -> StorageResult { + let conn = pg_connection(&self.master_pool).await; + new.insert(&conn).await + } + + async fn get_lookup_by_lookup_id(&self, id: &str) -> StorageResult { + let conn = pg_connection(&self.master_pool).await; + ReverseLookup::find_by_lookup_id(id, &conn).await + } +} + +#[async_trait::async_trait] +impl ReverseLookupInterface for MockDb { + async fn insert_reverse_lookup(&self, _new: ReverseLookupNew) -> StorageResult { + Err(errors::DatabaseError::NotFound.into()) + } + async fn get_lookup_by_lookup_id(&self, _id: &str) -> StorageResult { + Err(errors::DatabaseError::NotFound.into()) + } +} diff --git a/crates/router/src/services.rs b/crates/router/src/services.rs index 97adc84e86..571e3d50a4 100644 --- a/crates/router/src/services.rs +++ b/crates/router/src/services.rs @@ -40,7 +40,7 @@ impl Store { } #[cfg(feature = "kv_store")] - pub fn drainer_stream(&self, shard_key: &str) -> String { + pub fn get_drainer_stream_name(&self, shard_key: &str) -> String { // Example: {shard_5}_drainer_stream format!("{{{}}}_{}", shard_key, self.config.drainer_stream_name,) } diff --git a/crates/router/src/services/api.rs b/crates/router/src/services/api.rs index e5196ca08a..d0f439069a 100644 --- a/crates/router/src/services/api.rs +++ b/crates/router/src/services/api.rs @@ -348,7 +348,9 @@ impl From<&storage::PaymentAttempt> for BachRedirectResponse { Self { url: format!( "/payments/start/{}/{}/{}", - &payment_attempt.payment_id, &payment_attempt.merchant_id, &payment_attempt.txn_id + &payment_attempt.payment_id, + &payment_attempt.merchant_id, + &payment_attempt.attempt_id ), } } diff --git a/crates/router/src/types/api/payments.rs b/crates/router/src/types/api/payments.rs index c9b898b18f..d68f2f3ce7 100644 --- a/crates/router/src/types/api/payments.rs +++ b/crates/router/src/types/api/payments.rs @@ -87,7 +87,7 @@ impl PaymentIdTypeExt for PaymentIdType { fn get_payment_intent_id(&self) -> errors::CustomResult { match self { Self::PaymentIntentId(id) => Ok(id.clone()), - Self::ConnectorTransactionId(_) | Self::PaymentTxnId(_) => { + Self::ConnectorTransactionId(_) | Self::PaymentAttemptId(_) => { Err(errors::ValidationError::IncorrectValueProvided { field_name: "payment_id", }) diff --git a/crates/router/src/types/storage.rs b/crates/router/src/types/storage.rs index 74f3c3493c..43e84cb87d 100644 --- a/crates/router/src/types/storage.rs +++ b/crates/router/src/types/storage.rs @@ -13,6 +13,7 @@ pub mod payment_attempt; pub mod payment_intent; pub mod payment_method; pub mod process_tracker; +pub mod reverse_lookup; mod query; pub mod refund; @@ -24,5 +25,6 @@ pub mod kv; pub use self::{ address::*, configs::*, connector_response::*, customers::*, events::*, locker_mock_up::*, mandate::*, merchant_account::*, merchant_connector_account::*, payment_attempt::*, - payment_intent::*, payment_method::*, process_tracker::*, refund::*, temp_card::*, + payment_intent::*, payment_method::*, process_tracker::*, refund::*, reverse_lookup::*, + temp_card::*, }; diff --git a/crates/router/src/types/storage/kv.rs b/crates/router/src/types/storage/kv.rs index cf30f4d2f5..67db8e6454 100644 --- a/crates/router/src/types/storage/kv.rs +++ b/crates/router/src/types/storage/kv.rs @@ -1,4 +1,4 @@ pub use storage_models::kv::{ - DBOperation, Insertable, PaymentAttemptUpdateMems, PaymentIntentUpdateMems, TypedSql, - Updateable, + DBOperation, Insertable, PaymentAttemptUpdateMems, PaymentIntentUpdateMems, RefundUpdateMems, + TypedSql, Updateable, }; diff --git a/crates/router/src/types/storage/refund.rs b/crates/router/src/types/storage/refund.rs index 26992d18e4..a278d2fe13 100644 --- a/crates/router/src/types/storage/refund.rs +++ b/crates/router/src/types/storage/refund.rs @@ -1,3 +1,6 @@ pub use storage_models::refund::{ Refund, RefundCoreWorkflow, RefundNew, RefundUpdate, RefundUpdateInternal, }; + +#[cfg(feature = "kv_store")] +impl crate::utils::storage_partitioning::KvStorePartition for Refund {} diff --git a/crates/router/src/types/storage/reverse_lookup.rs b/crates/router/src/types/storage/reverse_lookup.rs new file mode 100644 index 0000000000..6b3304e11e --- /dev/null +++ b/crates/router/src/types/storage/reverse_lookup.rs @@ -0,0 +1 @@ +pub use storage_models::reverse_lookup::{ReverseLookup, ReverseLookupNew}; diff --git a/crates/router/src/utils.rs b/crates/router/src/utils.rs index ef0b24a69e..c4aa0ff13f 100644 --- a/crates/router/src/utils.rs +++ b/crates/router/src/utils.rs @@ -1,7 +1,9 @@ pub(crate) mod crypto; pub(crate) mod custom_serde; +pub(crate) mod db_utils; mod ext_traits; mod fp_utils; + #[cfg(feature = "kv_store")] pub(crate) mod storage_partitioning; diff --git a/crates/router/src/utils/db_utils.rs b/crates/router/src/utils/db_utils.rs new file mode 100644 index 0000000000..a08f333b81 --- /dev/null +++ b/crates/router/src/utils/db_utils.rs @@ -0,0 +1,10 @@ +#[cfg(feature = "kv_store")] +/// Generates hscan field pattern. Suppose the field is pa_1234_ref_1211 it will generate +/// pa_1234_ref_* +pub fn generate_hscan_pattern_for_refund(sk: &str) -> String { + sk.split('_') + .take(3) + .chain(["*"]) + .collect::>() + .join("_") +} diff --git a/crates/storage_models/src/enums.rs b/crates/storage_models/src/enums.rs index b49055f268..5786b61fc7 100644 --- a/crates/storage_models/src/enums.rs +++ b/crates/storage_models/src/enums.rs @@ -562,6 +562,8 @@ pub enum ProcessTrackerStatus { Default, Eq, PartialEq, + serde::Serialize, + serde::Deserialize, strum::Display, strum::EnumString, router_derive::DieselEnum, @@ -585,6 +587,8 @@ pub enum RefundStatus { Default, Eq, PartialEq, + serde::Serialize, + serde::Deserialize, strum::Display, strum::EnumString, router_derive::DieselEnum, diff --git a/crates/storage_models/src/kv.rs b/crates/storage_models/src/kv.rs index 62862f1628..9a416d6eb0 100644 --- a/crates/storage_models/src/kv.rs +++ b/crates/storage_models/src/kv.rs @@ -5,6 +5,7 @@ use crate::{ errors, payment_attempt::{PaymentAttempt, PaymentAttemptNew, PaymentAttemptUpdate}, payment_intent::{PaymentIntent, PaymentIntentNew, PaymentIntentUpdate}, + refund::{Refund, RefundNew, RefundUpdate}, }; #[derive(Debug, Serialize, Deserialize)] @@ -37,6 +38,7 @@ impl TypedSql { pub enum Insertable { PaymentIntent(PaymentIntentNew), PaymentAttempt(PaymentAttemptNew), + Refund(RefundNew), } #[derive(Debug, Serialize, Deserialize)] @@ -44,6 +46,7 @@ pub enum Insertable { pub enum Updateable { PaymentIntentUpdate(PaymentIntentUpdateMems), PaymentAttemptUpdate(PaymentAttemptUpdateMems), + RefundUpdate(RefundUpdateMems), } #[derive(Debug, Serialize, Deserialize)] @@ -57,3 +60,9 @@ pub struct PaymentAttemptUpdateMems { pub orig: PaymentAttempt, pub update_data: PaymentAttemptUpdate, } + +#[derive(Debug, Serialize, Deserialize)] +pub struct RefundUpdateMems { + pub orig: Refund, + pub update_data: RefundUpdate, +} diff --git a/crates/storage_models/src/lib.rs b/crates/storage_models/src/lib.rs index 4e1e0a6c3f..98d4a68c49 100644 --- a/crates/storage_models/src/lib.rs +++ b/crates/storage_models/src/lib.rs @@ -19,6 +19,7 @@ pub mod payment_method; pub mod process_tracker; pub mod query; pub mod refund; +pub mod reverse_lookup; pub mod schema; pub mod temp_card; diff --git a/crates/storage_models/src/payment_attempt.rs b/crates/storage_models/src/payment_attempt.rs index b7bc28d849..e36f06c1a5 100644 --- a/crates/storage_models/src/payment_attempt.rs +++ b/crates/storage_models/src/payment_attempt.rs @@ -10,7 +10,7 @@ pub struct PaymentAttempt { pub id: i32, pub payment_id: String, pub merchant_id: String, - pub txn_id: String, + pub attempt_id: String, pub status: storage_enums::AttemptStatus, pub amount: i64, pub currency: Option, @@ -47,7 +47,7 @@ pub struct PaymentAttempt { pub struct PaymentAttemptNew { pub payment_id: String, pub merchant_id: String, - pub txn_id: String, + pub attempt_id: String, pub status: storage_enums::AttemptStatus, pub amount: i64, pub currency: Option, diff --git a/crates/storage_models/src/query.rs b/crates/storage_models/src/query.rs index 196cc16943..850f04ebf3 100644 --- a/crates/storage_models/src/query.rs +++ b/crates/storage_models/src/query.rs @@ -13,4 +13,5 @@ pub mod payment_intent; pub mod payment_method; pub mod process_tracker; pub mod refund; +pub mod reverse_lookup; pub mod temp_card; diff --git a/crates/storage_models/src/query/payment_attempt.rs b/crates/storage_models/src/query/payment_attempt.rs index dafe15ca91..3f37f42f2e 100644 --- a/crates/storage_models/src/query/payment_attempt.rs +++ b/crates/storage_models/src/query/payment_attempt.rs @@ -133,16 +133,16 @@ impl PaymentAttempt { } #[instrument(skip(conn))] - pub async fn find_by_merchant_id_transaction_id( + pub async fn find_by_merchant_id_attempt_id( conn: &PgPooledConn, merchant_id: &str, - txn_id: &str, + attempt_id: &str, ) -> StorageResult { generics::generic_find_one::<::Table, _, _>( conn, dsl::merchant_id .eq(merchant_id.to_owned()) - .and(dsl::txn_id.eq(txn_id.to_owned())), + .and(dsl::attempt_id.eq(attempt_id.to_owned())), ) .await } diff --git a/crates/storage_models/src/query/refund.rs b/crates/storage_models/src/query/refund.rs index bb50e9d1f7..8a30c9a152 100644 --- a/crates/storage_models/src/query/refund.rs +++ b/crates/storage_models/src/query/refund.rs @@ -9,8 +9,6 @@ use crate::{ PgPooledConn, StorageResult, }; -// FIXME: Find by partition key : Review - impl RefundNew { #[instrument(skip(conn))] pub async fn insert(self, conn: &PgPooledConn) -> StorageResult { diff --git a/crates/storage_models/src/query/reverse_lookup.rs b/crates/storage_models/src/query/reverse_lookup.rs new file mode 100644 index 0000000000..bee3641464 --- /dev/null +++ b/crates/storage_models/src/query/reverse_lookup.rs @@ -0,0 +1,33 @@ +use diesel::{associations::HasTable, ExpressionMethods}; +use router_env::{tracing, tracing::instrument}; + +use super::generics; +use crate::{ + reverse_lookup::{ReverseLookup, ReverseLookupNew}, + schema::reverse_lookup::dsl, + PgPooledConn, StorageResult, +}; + +impl ReverseLookupNew { + #[instrument(skip(conn))] + pub async fn insert(self, conn: &PgPooledConn) -> StorageResult { + generics::generic_insert(conn, self).await + } + #[instrument(skip(conn))] + pub async fn batch_insert( + reverse_lookups: Vec, + conn: &PgPooledConn, + ) -> StorageResult<()> { + generics::generic_insert::<_, _, ReverseLookup>(conn, reverse_lookups).await?; + Ok(()) + } +} +impl ReverseLookup { + pub async fn find_by_lookup_id(lookup_id: &str, conn: &PgPooledConn) -> StorageResult { + generics::generic_find_one::<::Table, _, _>( + conn, + dsl::lookup_id.eq(lookup_id.to_owned()), + ) + .await + } +} diff --git a/crates/storage_models/src/refund.rs b/crates/storage_models/src/refund.rs index e277f1575d..2cc322f37c 100644 --- a/crates/storage_models/src/refund.rs +++ b/crates/storage_models/src/refund.rs @@ -4,7 +4,9 @@ use time::PrimitiveDateTime; use crate::{enums as storage_enums, schema::refund}; -#[derive(Clone, Debug, Eq, Identifiable, Queryable, PartialEq)] +#[derive( + Clone, Debug, Eq, Identifiable, Queryable, PartialEq, serde::Serialize, serde::Deserialize, +)] #[diesel(table_name = refund)] pub struct Refund { pub id: i32, @@ -28,9 +30,20 @@ pub struct Refund { pub created_at: PrimitiveDateTime, pub updated_at: PrimitiveDateTime, pub description: Option, + pub attempt_id: String, } -#[derive(Clone, Debug, Default, Eq, PartialEq, Insertable, router_derive::DebugAsDisplay)] +#[derive( + Clone, + Debug, + Default, + Eq, + PartialEq, + Insertable, + router_derive::DebugAsDisplay, + serde::Serialize, + serde::Deserialize, +)] #[diesel(table_name = refund)] pub struct RefundNew { pub refund_id: String, @@ -53,9 +66,10 @@ pub struct RefundNew { pub created_at: Option, pub modified_at: Option, pub description: Option, + pub attempt_id: String, } -#[derive(Debug)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum RefundUpdate { Update { pg_refund_id: String, @@ -132,6 +146,21 @@ impl From for RefundUpdateInternal { } } +impl RefundUpdate { + pub fn apply_changeset(self, source: Refund) -> Refund { + let pa_update: RefundUpdateInternal = self.into(); + Refund { + pg_refund_id: pa_update.pg_refund_id, + refund_status: pa_update.refund_status.unwrap_or(source.refund_status), + sent_to_gateway: pa_update.sent_to_gateway.unwrap_or(source.sent_to_gateway), + refund_error_message: pa_update.refund_error_message, + refund_arn: pa_update.refund_arn, + metadata: pa_update.metadata, + ..source + } + } +} + #[derive(Debug, Eq, PartialEq, Deserialize, Serialize)] pub struct RefundCoreWorkflow { pub refund_internal_reference_id: String, diff --git a/crates/storage_models/src/reverse_lookup.rs b/crates/storage_models/src/reverse_lookup.rs new file mode 100644 index 0000000000..2fd2b9ad8e --- /dev/null +++ b/crates/storage_models/src/reverse_lookup.rs @@ -0,0 +1,33 @@ +use diesel::{Identifiable, Insertable, Queryable}; + +use crate::schema::reverse_lookup; + +/// +/// This reverse lookup table basically looks up id's and get result_id that you want. This is +/// useful for KV where you can't lookup without key +#[derive( + Clone, Debug, serde::Serialize, serde::Deserialize, Identifiable, Queryable, Eq, PartialEq, +)] +#[diesel(table_name = reverse_lookup)] +#[diesel(primary_key(lookup_id))] +pub struct ReverseLookup { + /// Primary key. The key id. + pub lookup_id: String, + /// the value id. i.e the id you want to access KV table. + pub pk_id: String, + /// the `field` in KV database. Which is used to differentiate between two same keys + pub sk_id: String, + /// the source of insertion for reference + pub source: String, +} + +#[derive( + Clone, Debug, Insertable, router_derive::DebugAsDisplay, Eq, PartialEq, serde::Serialize, +)] +#[diesel(table_name = reverse_lookup)] +pub struct ReverseLookupNew { + pub lookup_id: String, + pub pk_id: String, + pub sk_id: String, + pub source: String, +} diff --git a/crates/storage_models/src/schema.rs b/crates/storage_models/src/schema.rs index 0408af243f..6f6bba56b8 100644 --- a/crates/storage_models/src/schema.rs +++ b/crates/storage_models/src/schema.rs @@ -187,7 +187,7 @@ diesel::table! { id -> Int4, payment_id -> Varchar, merchant_id -> Varchar, - txn_id -> Varchar, + attempt_id -> Varchar, status -> AttemptStatus, amount -> Int8, currency -> Nullable, @@ -325,6 +325,19 @@ diesel::table! { created_at -> Timestamp, modified_at -> Timestamp, description -> Nullable, + attempt_id -> Varchar, + } +} + +diesel::table! { + use diesel::sql_types::*; + use crate::enums::diesel_exports::*; + + reverse_lookup (lookup_id) { + lookup_id -> Varchar, + sk_id -> Varchar, + pk_id -> Varchar, + source -> Varchar, } } @@ -355,5 +368,6 @@ diesel::allow_tables_to_appear_in_same_query!( payment_methods, process_tracker, refund, + reverse_lookup, temp_card, ); diff --git a/migrations/2022-12-12-132936_reverse_lookup/down.sql b/migrations/2022-12-12-132936_reverse_lookup/down.sql new file mode 100644 index 0000000000..5790004d20 --- /dev/null +++ b/migrations/2022-12-12-132936_reverse_lookup/down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS reverse_lookup; + diff --git a/migrations/2022-12-12-132936_reverse_lookup/up.sql b/migrations/2022-12-12-132936_reverse_lookup/up.sql new file mode 100644 index 0000000000..b19e34e236 --- /dev/null +++ b/migrations/2022-12-12-132936_reverse_lookup/up.sql @@ -0,0 +1,8 @@ +CREATE TABLE reverse_lookup ( + lookup_id VARCHAR(255) NOT NULL PRIMARY KEY, + sk_id VARCHAR(50) NOT NULL, + pk_id VARCHAR(255) NOT NULL, + source VARCHAR(30) NOT NULL +); + +CREATE INDEX lookup_id_index ON reverse_lookup (lookup_id); diff --git a/migrations/2022-12-19-085322_rename_txn_id_to_attempt_id/down.sql b/migrations/2022-12-19-085322_rename_txn_id_to_attempt_id/down.sql new file mode 100644 index 0000000000..49469c01d2 --- /dev/null +++ b/migrations/2022-12-19-085322_rename_txn_id_to_attempt_id/down.sql @@ -0,0 +1,2 @@ +ALTER TABLE payment_id +RENAME attempt_id to txn_id; diff --git a/migrations/2022-12-19-085322_rename_txn_id_to_attempt_id/up.sql b/migrations/2022-12-19-085322_rename_txn_id_to_attempt_id/up.sql new file mode 100644 index 0000000000..f8b82d3f97 --- /dev/null +++ b/migrations/2022-12-19-085322_rename_txn_id_to_attempt_id/up.sql @@ -0,0 +1,2 @@ +ALTER TABLE payment_attempt +RENAME COLUMN txn_id to attempt_id; diff --git a/migrations/2022-12-19-085739_add_attempt_id_to_refund/down.sql b/migrations/2022-12-19-085739_add_attempt_id_to_refund/down.sql new file mode 100644 index 0000000000..e94a4b8493 --- /dev/null +++ b/migrations/2022-12-19-085739_add_attempt_id_to_refund/down.sql @@ -0,0 +1 @@ +ALTER TABLE refund DROP COLUMN attempt_id; diff --git a/migrations/2022-12-19-085739_add_attempt_id_to_refund/up.sql b/migrations/2022-12-19-085739_add_attempt_id_to_refund/up.sql new file mode 100644 index 0000000000..9fe648508d --- /dev/null +++ b/migrations/2022-12-19-085739_add_attempt_id_to_refund/up.sql @@ -0,0 +1 @@ +ALTER TABLE refund ADD COLUMN attempt_id VARCHAR(64) NOT NULL;