db: Added Reverse lookup table (#147)

This commit is contained in:
Kartikeya Hegde
2022-12-19 18:50:05 +05:30
committed by GitHub
parent d6a3e508e2
commit 87fed68519
51 changed files with 937 additions and 201 deletions

1
Cargo.lock generated
View File

@ -2600,6 +2600,7 @@ dependencies = [
"common_utils",
"error-stack",
"fred",
"futures",
"router_env",
"serde",
"thiserror",

View File

@ -249,7 +249,7 @@ impl Default for PaymentMethod {
pub enum PaymentIdType {
PaymentIntentId(String),
ConnectorTransactionId(String),
PaymentTxnId(String),
PaymentAttemptId(String),
}
impl Default for PaymentIdType {

View File

@ -25,7 +25,7 @@ async fn drainer_handler(
stream_index: u8,
max_read_count: u64,
) -> errors::DrainerResult<()> {
let stream_name = utils::get_drainer_stream(store.clone(), stream_index);
let stream_name = utils::get_drainer_stream_name(store.clone(), stream_index);
let drainer_result = drainer(store.clone(), max_read_count, stream_name.as_str()).await;
if let Err(_e) = drainer_result {
@ -70,6 +70,9 @@ async fn drainer(
kv::Insertable::PaymentAttempt(a) => {
macro_util::handle_resp!(a.insert(&conn).await, "ins", "pa")
}
kv::Insertable::Refund(a) => {
macro_util::handle_resp!(a.insert(&conn).await, "ins", "ref")
}
},
kv::DBOperation::Update { updatable } => match updatable {
kv::Updateable::PaymentIntentUpdate(a) => {
@ -78,6 +81,9 @@ async fn drainer(
kv::Updateable::PaymentAttemptUpdate(a) => {
macro_util::handle_resp!(a.orig.update(&conn, a.update_data).await, "up", "pa")
}
kv::Updateable::RefundUpdate(a) => {
macro_util::handle_resp!(a.orig.update(&conn, a.update_data).await, "up", "ref")
}
},
kv::DBOperation::Delete => todo!(),
};

View File

@ -113,9 +113,9 @@ pub fn increment_stream_index(index: u8, total_streams: u8) -> u8 {
}
pub(crate) fn get_stream_key_flag(store: Arc<router::services::Store>, stream_index: u8) -> String {
format!("{}_in_use", get_drainer_stream(store, stream_index))
format!("{}_in_use", get_drainer_stream_name(store, stream_index))
}
pub(crate) fn get_drainer_stream(store: Arc<Store>, stream_index: u8) -> String {
store.drainer_stream(format!("shard_{}", stream_index).as_str())
pub(crate) fn get_drainer_stream_name(store: Arc<Store>, stream_index: u8) -> String {
store.get_drainer_stream_name(format!("shard_{}", stream_index).as_str())
}

View File

@ -10,6 +10,7 @@ license = "Apache-2.0"
[dependencies]
error-stack = "0.2.4"
fred = { version = "5.2.0", features = ["metrics", "partial-tracing"] }
futures = "0.3"
serde = { version = "1.0.149", features = ["derive"] }
thiserror = "1.0.37"
@ -18,4 +19,4 @@ common_utils = { version = "0.1.0", path = "../common_utils" }
router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] }
[dev-dependencies]
tokio = { version = "1.23.0", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1.23.0", features = ["macros", "rt-multi-thread"] }

View File

@ -10,7 +10,7 @@ use std::fmt::Debug;
use common_utils::{
errors::CustomResult,
ext_traits::{ByteSliceExt, Encode},
ext_traits::{ByteSliceExt, Encode, StringExt},
};
use error_stack::{IntoReport, ResultExt};
use fred::{
@ -20,7 +20,8 @@ use fred::{
RedisKey, RedisMap, RedisValue, SetOptions, XCap, XReadResponse,
},
};
use router_env::{tracing, tracing::instrument};
use futures::StreamExt;
use router_env::{logger, tracing, tracing::instrument};
use crate::{
errors,
@ -245,6 +246,56 @@ impl super::RedisConnectionPool {
.await
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn hscan(
&self,
key: &str,
pattern: &str,
count: Option<u32>,
) -> CustomResult<Vec<String>, errors::RedisError> {
Ok(self
.pool
.hscan::<&str, &str>(key, pattern, count)
.filter_map(|value| async move {
match value {
Ok(mut v) => {
let v = v.take_results()?;
let v: Vec<String> =
v.iter().filter_map(|(_, val)| val.as_string()).collect();
Some(futures::stream::iter(v))
}
Err(err) => {
logger::error!(?err);
None
}
}
})
.flatten()
.collect::<Vec<_>>()
.await)
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn hscan_and_deserialize<T>(
&self,
key: &str,
pattern: &str,
count: Option<u32>,
) -> CustomResult<Vec<T>, errors::RedisError>
where
T: serde::de::DeserializeOwned,
{
let redis_results = self.hscan(key, pattern, count).await?;
Ok(redis_results
.iter()
.filter_map(|v| {
let r: T = v.parse_struct(std::any::type_name::<T>()).ok()?;
Some(r)
})
.collect())
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn get_hash_field<V>(
&self,

View File

@ -126,3 +126,18 @@ impl ConnectorErrorExt for error_stack::Report<errors::ConnectorError> {
self.change_context(errors::ApiErrorResponse::PaymentAuthorizationFailed { data })
}
}
pub(crate) trait RedisErrorExt {
fn to_redis_failed_response(self, key: &str) -> error_stack::Report<errors::StorageError>;
}
impl RedisErrorExt for error_stack::Report<errors::RedisError> {
fn to_redis_failed_response(self, key: &str) -> error_stack::Report<errors::StorageError> {
match self.current_context() {
errors::RedisError::NotFound => self.change_context(
errors::StorageError::ValueNotFound(format!("Data does not exist for key {key}",)),
),
_ => self.change_context(errors::StorageError::KVError),
}
}
}

View File

@ -98,7 +98,7 @@ where
.make_pm_data(
state,
payment_data.payment_attempt.payment_method,
&payment_data.payment_attempt.txn_id,
&payment_data.payment_attempt.attempt_id,
&payment_data.payment_attempt,
&payment_data.payment_method_data,
&payment_data.token,
@ -594,7 +594,7 @@ pub async fn add_process_sync_task(
force_sync: true,
merchant_id: Some(payment_attempt.merchant_id.clone()),
resource_id: api::PaymentIdType::PaymentTxnId(payment_attempt.txn_id.clone()),
resource_id: api::PaymentIdType::PaymentAttemptId(payment_attempt.attempt_id.clone()),
param: None,
connector: None,
};
@ -603,7 +603,7 @@ pub async fn add_process_sync_task(
let process_tracker_id = pt_utils::get_process_tracker_id(
runner,
task,
&payment_attempt.txn_id,
&payment_attempt.attempt_id,
&payment_attempt.merchant_id,
);
let process_tracker_entry =

View File

@ -323,7 +323,7 @@ pub fn create_startpay_url(
server.base_url,
payment_intent.payment_id,
payment_intent.merchant_id,
payment_attempt.txn_id
payment_attempt.attempt_id
)
}

View File

@ -67,7 +67,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsCancelRequest>
.find_connector_response_by_payment_id_merchant_id_txn_id(
&payment_attempt.payment_id,
&payment_attempt.merchant_id,
&payment_attempt.txn_id,
&payment_attempt.attempt_id,
storage_scheme,
)
.await

View File

@ -89,7 +89,7 @@ impl<F: Send + Clone> GetTracker<F, payments::PaymentData<F>, api::PaymentsCaptu
.find_connector_response_by_payment_id_merchant_id_txn_id(
&payment_attempt.payment_id,
&payment_attempt.merchant_id,
&payment_attempt.txn_id,
&payment_attempt.attempt_id,
storage_scheme,
)
.await

View File

@ -97,7 +97,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
.find_connector_response_by_payment_id_merchant_id_txn_id(
&payment_attempt.payment_id,
&payment_attempt.merchant_id,
&payment_attempt.txn_id,
&payment_attempt.attempt_id,
storage_scheme,
)
.await

View File

@ -434,7 +434,7 @@ impl PaymentCreate {
storage::PaymentAttemptNew {
payment_id: payment_id.to_string(),
merchant_id: merchant_id.to_string(),
txn_id: Uuid::new_v4().to_string(),
attempt_id: Uuid::new_v4().to_string(),
status,
amount: amount.into(),
currency,
@ -495,7 +495,7 @@ impl PaymentCreate {
storage::ConnectorResponseNew {
payment_id: payment_attempt.payment_id.clone(),
merchant_id: payment_attempt.merchant_id.clone(),
txn_id: payment_attempt.txn_id.clone(),
txn_id: payment_attempt.attempt_id.clone(),
created_at: payment_attempt.created_at,
modified_at: payment_attempt.modified_at,
connector_name: payment_attempt.connector.clone(),

View File

@ -285,7 +285,7 @@ impl PaymentMethodValidate {
storage::PaymentAttemptNew {
payment_id: payment_id.to_string(),
merchant_id: merchant_id.to_string(),
txn_id: Uuid::new_v4().to_string(),
attempt_id: Uuid::new_v4().to_string(),
status,
// Amount & Currency will be zero in this case
amount: 0,

View File

@ -106,7 +106,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsSessionRequest>
.find_connector_response_by_payment_id_merchant_id_txn_id(
&payment_intent.payment_id,
&payment_intent.merchant_id,
&payment_attempt.txn_id,
&payment_attempt.attempt_id,
storage_scheme,
)
.await

View File

@ -103,7 +103,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsStartRequest> f
.find_connector_response_by_payment_id_merchant_id_txn_id(
&payment_intent.payment_id,
&payment_intent.merchant_id,
&payment_attempt.txn_id,
&payment_attempt.attempt_id,
storage_scheme,
)
.await

View File

@ -215,8 +215,8 @@ async fn get_tracker_for_sync<
api::PaymentIdType::ConnectorTransactionId(ref id) => {
db.find_payment_attempt_by_merchant_id_connector_txn_id(merchant_id, id, storage_scheme)
}
api::PaymentIdType::PaymentTxnId(ref id) => {
db.find_payment_attempt_by_merchant_id_txn_id(merchant_id, id, storage_scheme)
api::PaymentIdType::PaymentAttemptId(ref id) => {
db.find_payment_attempt_by_merchant_id_attempt_id(merchant_id, id, storage_scheme)
}
}
.await
@ -233,7 +233,7 @@ async fn get_tracker_for_sync<
.find_connector_response_by_payment_id_merchant_id_txn_id(
&payment_intent.payment_id,
&payment_intent.merchant_id,
&payment_attempt.txn_id,
&payment_attempt.attempt_id,
storage_scheme,
)
.await

View File

@ -116,7 +116,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
.find_connector_response_by_payment_id_merchant_id_txn_id(
&payment_intent.payment_id,
&payment_intent.merchant_id,
&payment_attempt.txn_id,
&payment_attempt.attempt_id,
storage_scheme,
)
.await

View File

@ -490,6 +490,7 @@ fn mk_new_refund(
refund_status: enums::RefundStatus::Pending,
metadata: request.metadata,
description: request.reason,
attempt_id: payment_attempt.attempt_id.clone(),
..storage::RefundNew::default()
}
}

View File

@ -51,9 +51,11 @@ pub async fn construct_refund_router_data<'a, F>(
.get_required_value("payment_method_type")?;
let payment_method_data = match payment_method_data.cloned() {
Some(v) => v,
None => helpers::Vault::get_payment_method_data_from_locker(state, &payment_attempt.txn_id)
.await?
.get_required_value("payment_method_data")?,
None => {
helpers::Vault::get_payment_method_data_from_locker(state, &payment_attempt.attempt_id)
.await?
.get_required_value("payment_method_data")?
}
};
let router_data = types::RouterData {

View File

@ -14,6 +14,7 @@ pub mod payment_method;
pub mod process_tracker;
pub mod queue;
pub mod refund;
pub mod reverse_lookup;
pub mod temp_card;
use std::sync::Arc;
@ -51,6 +52,7 @@ pub trait StorageInterface:
+ queue::QueueInterface
+ ephemeral_key::EphemeralKeyInterface
+ connector_response::ConnectorResponseInterface
+ reverse_lookup::ReverseLookupInterface
+ 'static
{
async fn close(&mut self) {}

View File

@ -48,10 +48,10 @@ pub trait PaymentAttemptInterface {
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<types::PaymentAttempt, errors::StorageError>;
async fn find_payment_attempt_by_merchant_id_txn_id(
async fn find_payment_attempt_by_merchant_id_attempt_id(
&self,
merchant_id: &str,
txn_id: &str,
attempt_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<types::PaymentAttempt, errors::StorageError>;
}
@ -164,15 +164,15 @@ mod storage {
.into_report()
}
async fn find_payment_attempt_by_merchant_id_txn_id(
async fn find_payment_attempt_by_merchant_id_attempt_id(
&self,
merchant_id: &str,
txn_id: &str,
attempt_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
PaymentAttempt::find_by_merchant_id_transaction_id(&conn, merchant_id, txn_id)
PaymentAttempt::find_by_merchant_id_attempt_id(&conn, merchant_id, attempt_id)
.await
.map_err(Into::into)
.into_report()
@ -182,10 +182,10 @@ mod storage {
#[async_trait::async_trait]
impl PaymentAttemptInterface for MockDb {
async fn find_payment_attempt_by_merchant_id_txn_id(
async fn find_payment_attempt_by_merchant_id_attempt_id(
&self,
_merchant_id: &str,
_txn_id: &str,
_attempt_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<types::PaymentAttempt, errors::StorageError> {
todo!()
@ -214,7 +214,7 @@ impl PaymentAttemptInterface for MockDb {
id,
payment_id: payment_attempt.payment_id,
merchant_id: payment_attempt.merchant_id,
txn_id: payment_attempt.txn_id,
attempt_id: payment_attempt.attempt_id,
status: payment_attempt.status,
amount: payment_attempt.amount,
currency: payment_attempt.currency,
@ -312,9 +312,10 @@ mod storage {
use super::PaymentAttemptInterface;
use crate::{
connection::pg_connection,
core::errors::{self, CustomResult},
core::errors::{self, utils::RedisErrorExt, CustomResult},
db::reverse_lookup::ReverseLookupInterface,
services::Store,
types::storage::{enums, kv, payment_attempt::*},
types::storage::{enums, kv, payment_attempt::*, ReverseLookupNew},
utils::storage_partitioning::KvStorePartition,
};
@ -338,7 +339,7 @@ mod storage {
enums::MerchantStorageScheme::RedisKv => {
let key = format!(
"{}_{}",
payment_attempt.payment_id, payment_attempt.merchant_id
payment_attempt.merchant_id, payment_attempt.payment_id
);
// TODO: need to add an application generated payment attempt id to distinguish between multiple attempts for the same payment id
// Check for database presence as well Maybe use a read replica here ?
@ -346,7 +347,7 @@ mod storage {
id: 0i32,
payment_id: payment_attempt.payment_id.clone(),
merchant_id: payment_attempt.merchant_id.clone(),
txn_id: payment_attempt.txn_id.clone(),
attempt_id: payment_attempt.attempt_id.clone(),
status: payment_attempt.status,
amount: payment_attempt.amount,
currency: payment_attempt.currency,
@ -376,9 +377,10 @@ mod storage {
error_code: payment_attempt.error_code.clone(),
};
let field = format!("pa_{}", created_attempt.attempt_id);
match self
.redis_conn
.serialize_and_set_hash_field_if_not_exist(&key, "pa", &created_attempt)
.serialize_and_set_hash_field_if_not_exist(&key, &field, &created_attempt)
.await
{
Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue(
@ -386,12 +388,29 @@ mod storage {
))
.into_report(),
Ok(HsetnxReply::KeySet) => {
let conn = pg_connection(&self.master_pool).await;
//Reverse lookup for attempt_id
ReverseLookupNew {
lookup_id: format!(
"{}_{}",
&created_attempt.merchant_id, &created_attempt.attempt_id,
),
pk_id: key,
sk_id: field,
source: "payment_attempt".to_string(),
}
.insert(&conn)
.await
.map_err(Into::<errors::StorageError>::into)
.into_report()?;
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Insert {
insertable: kv::Insertable::PaymentAttempt(payment_attempt),
},
};
let stream_name = self.drainer_stream(&PaymentAttempt::shard_key(
let stream_name = self.get_drainer_stream_name(&PaymentAttempt::shard_key(
crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId {
merchant_id: &created_attempt.merchant_id,
payment_id: &created_attempt.payment_id,
@ -432,20 +451,42 @@ mod storage {
}
enums::MerchantStorageScheme::RedisKv => {
let key = format!("{}_{}", this.payment_id, this.merchant_id);
let key = format!("{}_{}", this.merchant_id, this.payment_id);
let updated_attempt = payment_attempt.clone().apply_changeset(this.clone());
// Check for database presence as well Maybe use a read replica here ?
let redis_value = serde_json::to_string(&updated_attempt)
.into_report()
.change_context(errors::StorageError::KVError)?;
let field = format!("pa_{}", updated_attempt.attempt_id);
let updated_attempt = self
.redis_conn
.set_hash_fields(&key, ("pa", &redis_value))
.set_hash_fields(&key, (&field, &redis_value))
.await
.map(|_| updated_attempt)
.change_context(errors::StorageError::KVError)?;
let conn = pg_connection(&self.master_pool).await;
// Reverse lookup for connector_transaction_id
if let Some(ref connector_transaction_id) =
updated_attempt.connector_transaction_id
{
let field = format!("pa_{}", updated_attempt.attempt_id);
ReverseLookupNew {
lookup_id: format!(
"{}_{}",
&updated_attempt.merchant_id, connector_transaction_id
),
pk_id: key.clone(),
sk_id: field.clone(),
source: "payment_attempt".to_string(),
}
.insert(&conn)
.await
.map_err(Into::<errors::StorageError>::into)
.into_report()?;
}
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Update {
updatable: kv::Updateable::PaymentAttemptUpdate(
@ -457,7 +498,7 @@ mod storage {
},
};
let stream_name = self.drainer_stream(&PaymentAttempt::shard_key(
let stream_name = self.get_drainer_stream_name(&PaymentAttempt::shard_key(
crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId {
merchant_id: &updated_attempt.merchant_id,
payment_id: &updated_attempt.payment_id,
@ -495,21 +536,20 @@ mod storage {
}
enums::MerchantStorageScheme::RedisKv => {
let key = format!("{}_{}", payment_id, merchant_id);
let key = format!("{}_{}", merchant_id, payment_id);
let lookup = self
.get_lookup_by_lookup_id(&key)
.await
.map_err(Into::<errors::StorageError>::into)
.into_report()?;
self.redis_conn
.get_hash_field_and_deserialize::<PaymentAttempt>(
&key,
"pa",
&lookup.sk_id,
"PaymentAttempt",
)
.await
.map_err(|error| match error.current_context() {
errors::RedisError::NotFound => errors::StorageError::ValueNotFound(
format!("Payment Attempt does not exist for {}", key),
)
.into(),
_ => error.change_context(errors::StorageError::KVError),
})
.map_err(|error| error.to_redis_failed_response(&key))
// Check for database presence as well Maybe use a read replica here ?
}
}
@ -518,28 +558,26 @@ mod storage {
async fn find_payment_attempt_by_transaction_id_payment_id_merchant_id(
&self,
transaction_id: &str,
payment_id: &str,
_payment_id: &str,
merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
// We assume that PaymentAttempt <=> PaymentIntent is a one-to-one relation for now
self.find_payment_attempt_by_payment_id_merchant_id(
payment_id,
merchant_id,
storage_scheme,
)
.await
.and_then(|attempt| {
if attempt.connector_transaction_id.as_deref() == Some(transaction_id) {
Ok(attempt)
} else {
Err(errors::StorageError::ValueNotFound(format!(
"Successful payment attempt does not exist for {}_{}",
payment_id, merchant_id
)))
.into_report()
}
})
let lookup_id = format!("{merchant_id}_{transaction_id}");
let lookup = self
.get_lookup_by_lookup_id(&lookup_id)
.await
.map_err(Into::<errors::StorageError>::into)
.into_report()?;
let key = &lookup.pk_id;
self.redis_conn
.get_hash_field_and_deserialize::<PaymentAttempt>(
key,
&lookup.sk_id,
"PaymentAttempt",
)
.await
.map_err(|error| error.to_redis_failed_response(key))
}
async fn find_payment_attempt_last_successful_attempt_by_payment_id_merchant_id(
@ -586,28 +624,57 @@ mod storage {
}
enums::MerchantStorageScheme::RedisKv => {
Err(errors::StorageError::KVError).into_report()
let lookup_id = format!("{merchant_id}_{connector_txn_id}");
let lookup = self
.get_lookup_by_lookup_id(&lookup_id)
.await
.map_err(Into::<errors::StorageError>::into)
.into_report()?;
let key = &lookup.pk_id;
self.redis_conn
.get_hash_field_and_deserialize::<PaymentAttempt>(
key,
&lookup.sk_id,
"PaymentAttempt",
)
.await
.map_err(|error| error.to_redis_failed_response(key))
}
}
}
async fn find_payment_attempt_by_merchant_id_txn_id(
async fn find_payment_attempt_by_merchant_id_attempt_id(
&self,
merchant_id: &str,
txn_id: &str,
attempt_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
PaymentAttempt::find_by_merchant_id_transaction_id(&conn, merchant_id, txn_id)
PaymentAttempt::find_by_merchant_id_attempt_id(&conn, merchant_id, attempt_id)
.await
.map_err(Into::into)
.into_report()
}
enums::MerchantStorageScheme::RedisKv => {
Err(errors::StorageError::KVError).into_report()
let lookup_id = format!("{merchant_id}_{attempt_id}");
let lookup = self
.get_lookup_by_lookup_id(&lookup_id)
.await
.map_err(Into::<errors::StorageError>::into)
.into_report()?;
let key = &lookup.pk_id;
self.redis_conn
.get_hash_field_and_deserialize::<PaymentAttempt>(
key,
&lookup.sk_id,
"PaymentAttempt",
)
.await
.map_err(|error| error.to_redis_failed_response(key))
}
}
}

View File

@ -69,7 +69,7 @@ mod storage {
}
enums::MerchantStorageScheme::RedisKv => {
let key = format!("{}_{}", new.payment_id, new.merchant_id);
let key = format!("{}_{}", new.merchant_id, new.payment_id);
let created_intent = PaymentIntent {
id: 0i32,
payment_id: new.payment_id.clone(),
@ -110,7 +110,7 @@ mod storage {
insertable: kv::Insertable::PaymentIntent(new),
},
};
let stream_name = self.drainer_stream(&PaymentIntent::shard_key(
let stream_name = self.get_drainer_stream_name(&PaymentIntent::shard_key(
crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId {
merchant_id: &created_intent.merchant_id,
payment_id: &created_intent.payment_id,
@ -151,7 +151,7 @@ mod storage {
}
enums::MerchantStorageScheme::RedisKv => {
let key = format!("{}_{}", this.payment_id, this.merchant_id);
let key = format!("{}_{}", this.merchant_id, this.payment_id);
let updated_intent = payment_intent.clone().apply_changeset(this.clone());
// Check for database presence as well Maybe use a read replica here ?
@ -176,7 +176,7 @@ mod storage {
},
};
let stream_name = self.drainer_stream(&PaymentIntent::shard_key(
let stream_name = self.get_drainer_stream_name(&PaymentIntent::shard_key(
crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId {
merchant_id: &updated_intent.merchant_id,
payment_id: &updated_intent.payment_id,
@ -214,7 +214,7 @@ mod storage {
}
enums::MerchantStorageScheme::RedisKv => {
let key = format!("{}_{}", payment_id, merchant_id);
let key = format!("{}_{}", merchant_id, payment_id);
self.redis_conn
.get_hash_field_and_deserialize::<PaymentIntent>(
&key,

View File

@ -1,11 +1,10 @@
use error_stack::{IntoReport, Report};
use error_stack::Report;
use storage_models::errors::DatabaseError;
use super::MockDb;
use crate::{
connection::pg_connection,
core::errors::{self, CustomResult, StorageError},
types::storage::{self, enums},
types::storage::{self as storage_types, enums},
};
#[async_trait::async_trait]
@ -15,14 +14,14 @@ pub trait RefundInterface {
internal_reference_id: &str,
merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::Refund, errors::StorageError>;
) -> CustomResult<storage_types::Refund, errors::StorageError>;
async fn find_refund_by_payment_id_merchant_id(
&self,
payment_id: &str,
merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<storage::Refund>, errors::StorageError>;
) -> CustomResult<Vec<storage_types::Refund>, errors::StorageError>;
// async fn find_refund_by_payment_id_merchant_id_refund_id(
// &self,
@ -36,117 +35,513 @@ pub trait RefundInterface {
merchant_id: &str,
refund_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::Refund, errors::StorageError>;
) -> CustomResult<storage_types::Refund, errors::StorageError>;
async fn update_refund(
&self,
this: storage::Refund,
refund: storage::RefundUpdate,
this: storage_types::Refund,
refund: storage_types::RefundUpdate,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::Refund, errors::StorageError>;
) -> CustomResult<storage_types::Refund, errors::StorageError>;
async fn find_refund_by_merchant_id_transaction_id(
&self,
merchant_id: &str,
txn_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<storage::Refund>, errors::StorageError>;
) -> CustomResult<Vec<storage_types::Refund>, errors::StorageError>;
async fn insert_refund(
&self,
new: storage::RefundNew,
new: storage_types::RefundNew,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::Refund, errors::StorageError>;
) -> CustomResult<storage_types::Refund, errors::StorageError>;
}
#[async_trait::async_trait]
impl RefundInterface for super::Store {
async fn find_refund_by_internal_reference_id_merchant_id(
&self,
internal_reference_id: &str,
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
storage::Refund::find_by_internal_reference_id_merchant_id(
&conn,
internal_reference_id,
merchant_id,
)
.await
.map_err(Into::into)
.into_report()
}
#[cfg(not(feature = "kv_store"))]
mod storage {
use error_stack::IntoReport;
async fn insert_refund(
&self,
new: storage::RefundNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
new.insert(&conn).await.map_err(Into::into).into_report()
}
async fn find_refund_by_merchant_id_transaction_id(
&self,
merchant_id: &str,
txn_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<storage::Refund>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
storage::Refund::find_by_merchant_id_transaction_id(&conn, merchant_id, txn_id)
use super::RefundInterface;
use crate::{
connection::pg_connection,
core::errors::{self, CustomResult},
services::Store,
types::storage::{self as storage_types, enums},
};
#[async_trait::async_trait]
impl RefundInterface for Store {
async fn find_refund_by_internal_reference_id_merchant_id(
&self,
internal_reference_id: &str,
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
storage_types::Refund::find_by_internal_reference_id_merchant_id(
&conn,
internal_reference_id,
merchant_id,
)
.await
.map_err(Into::into)
.into_report()
}
async fn insert_refund(
&self,
new: storage_types::RefundNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
new.insert(&conn).await.map_err(Into::into).into_report()
}
async fn find_refund_by_merchant_id_transaction_id(
&self,
merchant_id: &str,
txn_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<storage_types::Refund>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
storage_types::Refund::find_by_merchant_id_transaction_id(&conn, merchant_id, txn_id)
.await
.map_err(Into::into)
.into_report()
}
async fn update_refund(
&self,
this: storage_types::Refund,
refund: storage_types::RefundUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
this.update(&conn, refund)
.await
.map_err(Into::into)
.into_report()
}
async fn find_refund_by_merchant_id_refund_id(
&self,
merchant_id: &str,
refund_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
storage_types::Refund::find_by_merchant_id_refund_id(&conn, merchant_id, refund_id)
.await
.map_err(Into::into)
.into_report()
}
// async fn find_refund_by_payment_id_merchant_id_refund_id(
// &self,
// payment_id: &str,
// merchant_id: &str,
// refund_id: &str,
// ) -> CustomResult<Refund, errors::StorageError> {
// let conn = pg_connection(&self.master_pool).await;
// Refund::find_by_payment_id_merchant_id_refund_id(&conn, payment_id, merchant_id, refund_id)
// .await
// }
async fn find_refund_by_payment_id_merchant_id(
&self,
payment_id: &str,
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<storage_types::Refund>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
storage_types::Refund::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id)
.await
.map_err(Into::into)
.into_report()
}
}
}
async fn update_refund(
&self,
this: storage::Refund,
refund: storage::RefundUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
this.update(&conn, refund)
.await
.map_err(Into::into)
.into_report()
}
#[cfg(feature = "kv_store")]
mod storage {
use common_utils::date_time;
use error_stack::{IntoReport, ResultExt};
use redis_interface::{HsetnxReply, RedisEntryId};
async fn find_refund_by_merchant_id_refund_id(
&self,
merchant_id: &str,
refund_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
storage::Refund::find_by_merchant_id_refund_id(&conn, merchant_id, refund_id)
.await
.map_err(Into::into)
.into_report()
}
use super::RefundInterface;
use crate::{
connection::pg_connection,
core::errors::{self, utils::RedisErrorExt, CustomResult},
db::reverse_lookup::ReverseLookupInterface,
logger,
services::Store,
types::storage::{self as storage_types, enums, kv},
utils::{
self, db_utils,
storage_partitioning::{KvStorePartition, PartitionKey},
},
};
#[async_trait::async_trait]
impl RefundInterface for Store {
async fn find_refund_by_internal_reference_id_merchant_id(
&self,
internal_reference_id: &str,
merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
storage_types::Refund::find_by_internal_reference_id_merchant_id(
&conn,
internal_reference_id,
merchant_id,
)
.await
.map_err(Into::into)
.into_report()
}
enums::MerchantStorageScheme::RedisKv => {
let lookup_id = format!("{}_{}", merchant_id, internal_reference_id);
let lookup = self
.get_lookup_by_lookup_id(&lookup_id)
.await
.map_err(Into::<errors::StorageError>::into)
.into_report()?;
// async fn find_refund_by_payment_id_merchant_id_refund_id(
// &self,
// payment_id: &str,
// merchant_id: &str,
// refund_id: &str,
// ) -> CustomResult<Refund, errors::StorageError> {
// let conn = pg_connection(&self.master_pool).await;
// Refund::find_by_payment_id_merchant_id_refund_id(&conn, payment_id, merchant_id, refund_id)
// .await
// }
let key = &lookup.pk_id;
self.redis_conn
.get_hash_field_and_deserialize::<storage_types::Refund>(
key,
&lookup.sk_id,
"Refund",
)
.await
.map_err(|error| error.to_redis_failed_response(key))
}
}
}
async fn find_refund_by_payment_id_merchant_id(
&self,
payment_id: &str,
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<storage::Refund>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
storage::Refund::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id)
.await
.map_err(Into::into)
.into_report()
async fn insert_refund(
&self,
new: storage_types::RefundNew,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
new.insert(&conn).await.map_err(Into::into).into_report()
}
enums::MerchantStorageScheme::RedisKv => {
let key = format!("{}_{}", new.merchant_id, new.payment_id);
// TODO: need to add an application generated payment attempt id to distinguish between multiple attempts for the same payment id
// Check for database presence as well Maybe use a read replica here ?
let created_refund = storage_types::Refund {
id: 0i32,
refund_id: new.refund_id.clone(),
merchant_id: new.merchant_id.clone(),
attempt_id: new.attempt_id.clone(),
internal_reference_id: new.internal_reference_id.clone(),
payment_id: new.payment_id.clone(),
transaction_id: new.transaction_id.clone(),
connector: new.connector.clone(),
pg_refund_id: new.pg_refund_id.clone(),
external_reference_id: new.external_reference_id.clone(),
refund_type: new.refund_type,
total_amount: new.total_amount,
currency: new.currency,
refund_amount: new.refund_amount,
refund_status: new.refund_status,
sent_to_gateway: new.sent_to_gateway,
refund_error_message: new.refund_error_message.clone(),
metadata: new.metadata.clone(),
refund_arn: new.refund_arn.clone(),
created_at: new.created_at.unwrap_or_else(date_time::now),
updated_at: new.created_at.unwrap_or_else(date_time::now),
description: new.description.clone(),
};
let field = format!(
"pa_{}_ref_{}",
&created_refund.attempt_id, &created_refund.refund_id
);
match self
.redis_conn
.serialize_and_set_hash_field_if_not_exist(&key, &field, &created_refund)
.await
{
Ok(HsetnxReply::KeyNotSet) => {
Err(errors::StorageError::DuplicateValue(format!(
"Refund already exists refund_id: {}",
&created_refund.refund_id
)))
.into_report()
}
Ok(HsetnxReply::KeySet) => {
let conn = pg_connection(&self.master_pool).await;
let reverse_lookups = vec![
storage_types::ReverseLookupNew {
sk_id: field.clone(),
lookup_id: format!(
"{}_{}",
created_refund.merchant_id, created_refund.refund_id
),
pk_id: key.clone(),
source: "refund".to_string(),
},
storage_types::ReverseLookupNew {
sk_id: field.clone(),
lookup_id: format!(
"{}_{}",
created_refund.merchant_id, created_refund.transaction_id
),
pk_id: key.clone(),
source: "refund".to_string(),
},
storage_types::ReverseLookupNew {
sk_id: field.clone(),
lookup_id: format!(
"{}_{}",
created_refund.merchant_id,
created_refund.internal_reference_id
),
pk_id: key,
source: "refund".to_string(),
},
];
storage_types::ReverseLookupNew::batch_insert(reverse_lookups, &conn)
.await
.change_context(errors::StorageError::KVError)?;
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Insert {
insertable: kv::Insertable::Refund(new),
},
};
let stream_name =
self.get_drainer_stream_name(&storage_types::Refund::shard_key(
PartitionKey::MerchantIdPaymentId {
merchant_id: &created_refund.merchant_id,
payment_id: &created_refund.payment_id,
},
self.config.drainer_num_partitions,
));
self.redis_conn
.stream_append_entry(
&stream_name,
&RedisEntryId::AutoGeneratedID,
redis_entry
.to_field_value_pairs()
.change_context(errors::StorageError::KVError)?,
)
.await
.change_context(errors::StorageError::KVError)?;
Ok(created_refund)
}
Err(er) => Err(er).change_context(errors::StorageError::KVError),
}
}
}
}
async fn find_refund_by_merchant_id_transaction_id(
&self,
merchant_id: &str,
txn_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<storage_types::Refund>, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
storage_types::Refund::find_by_merchant_id_transaction_id(
&conn,
merchant_id,
txn_id,
)
.await
.map_err(Into::into)
.into_report()
}
enums::MerchantStorageScheme::RedisKv => {
let lookup_id = format!("{merchant_id}_{txn_id}");
let lookup = match self.get_lookup_by_lookup_id(&lookup_id).await {
Ok(l) => l,
Err(err) => {
logger::error!(?err);
return Ok(vec![]);
}
};
let key = &lookup.pk_id;
let pattern = db_utils::generate_hscan_pattern_for_refund(&lookup.sk_id);
self.redis_conn
.hscan_and_deserialize(key, &pattern, None)
.await
.change_context(errors::StorageError::KVError)
}
}
}
async fn update_refund(
&self,
this: storage_types::Refund,
refund: storage_types::RefundUpdate,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
this.update(&conn, refund)
.await
.map_err(Into::into)
.into_report()
}
enums::MerchantStorageScheme::RedisKv => {
let key = format!("{}_{}", this.merchant_id, this.payment_id);
let updated_refund = refund.clone().apply_changeset(this.clone());
// Check for database presence as well Maybe use a read replica here ?
// TODO: Add a proper error for serialization failure
let lookup = self
.get_lookup_by_lookup_id(&key)
.await
.map_err(Into::<errors::StorageError>::into)
.into_report()?;
let field = &lookup.sk_id;
let redis_value =
utils::Encode::<storage_types::Refund>::encode_to_string_of_json(
&updated_refund,
)
.change_context(errors::StorageError::KVError)?;
self.redis_conn
.set_hash_fields(&key, (field, redis_value))
.await
.change_context(errors::StorageError::KVError)?;
let stream_name =
self.get_drainer_stream_name(&storage_types::Refund::shard_key(
PartitionKey::MerchantIdPaymentId {
merchant_id: &updated_refund.merchant_id,
payment_id: &updated_refund.payment_id,
},
self.config.drainer_num_partitions,
));
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Update {
updatable: kv::Updateable::RefundUpdate(kv::RefundUpdateMems {
orig: this,
update_data: refund,
}),
},
};
self.redis_conn
.stream_append_entry(
&stream_name,
&RedisEntryId::AutoGeneratedID,
redis_entry
.to_field_value_pairs()
.change_context(errors::StorageError::KVError)?,
)
.await
.change_context(errors::StorageError::KVError)?;
Ok(updated_refund)
}
}
}
async fn find_refund_by_merchant_id_refund_id(
&self,
merchant_id: &str,
refund_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
storage_types::Refund::find_by_merchant_id_refund_id(
&conn,
merchant_id,
refund_id,
)
.await
.map_err(Into::into)
.into_report()
}
enums::MerchantStorageScheme::RedisKv => {
let lookup_id = format!("{merchant_id}_{refund_id}");
let lookup = self
.get_lookup_by_lookup_id(&lookup_id)
.await
.map_err(Into::<errors::StorageError>::into)
.into_report()?;
let key = &lookup.pk_id;
self.redis_conn
.get_hash_field_and_deserialize::<storage_types::Refund>(
key,
&lookup.sk_id,
"Refund",
)
.await
.map_err(|error| error.to_redis_failed_response(key))
}
}
}
// async fn find_refund_by_payment_id_merchant_id_refund_id(
// &self,
// payment_id: &str,
// merchant_id: &str,
// refund_id: &str,
// ) -> CustomResult<Refund, errors::StorageError> {
// let conn = pg_connection(&self.master_pool).await;
// Refund::find_by_payment_id_merchant_id_refund_id(&conn, payment_id, merchant_id, refund_id)
// .await
// }
async fn find_refund_by_payment_id_merchant_id(
&self,
payment_id: &str,
merchant_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<storage_types::Refund>, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
storage_types::Refund::find_by_payment_id_merchant_id(
&conn,
payment_id,
merchant_id,
)
.await
.map_err(Into::into)
.into_report()
}
enums::MerchantStorageScheme::RedisKv => {
let key = format!("{}_{}", merchant_id, payment_id);
let lookup = self
.get_lookup_by_lookup_id(&key)
.await
.map_err(Into::<errors::StorageError>::into)
.into_report()?;
let pattern = db_utils::generate_hscan_pattern_for_refund(&lookup.sk_id);
self.redis_conn
.hscan_and_deserialize(&key, &pattern, None)
.await
.change_context(errors::StorageError::KVError)
}
}
}
}
}
@ -157,24 +552,25 @@ impl RefundInterface for MockDb {
_internal_reference_id: &str,
_merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::Refund, errors::StorageError> {
) -> CustomResult<storage_types::Refund, errors::StorageError> {
todo!()
}
async fn insert_refund(
&self,
new: storage::RefundNew,
new: storage_types::RefundNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::Refund, errors::StorageError> {
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let mut refunds = self.refunds.lock().await;
let current_time = common_utils::date_time::now();
let refund = storage::Refund {
let refund = storage_types::Refund {
id: refunds.len() as i32,
internal_reference_id: new.internal_reference_id,
refund_id: new.refund_id,
payment_id: new.payment_id,
merchant_id: new.merchant_id,
attempt_id: new.attempt_id,
transaction_id: new.transaction_id,
connector: new.connector,
pg_refund_id: new.pg_refund_id,
@ -187,7 +583,7 @@ impl RefundInterface for MockDb {
sent_to_gateway: new.sent_to_gateway,
refund_error_message: new.refund_error_message,
metadata: new.metadata,
refund_arn: new.refund_arn,
refund_arn: new.refund_arn.clone(),
created_at: new.created_at.unwrap_or(current_time),
updated_at: current_time,
description: new.description,
@ -200,7 +596,7 @@ impl RefundInterface for MockDb {
merchant_id: &str,
txn_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<storage::Refund>, errors::StorageError> {
) -> CustomResult<Vec<storage_types::Refund>, errors::StorageError> {
let refunds = self.refunds.lock().await;
Ok(refunds
@ -214,10 +610,10 @@ impl RefundInterface for MockDb {
async fn update_refund(
&self,
_this: storage::Refund,
_refund: storage::RefundUpdate,
_this: storage_types::Refund,
_refund: storage_types::RefundUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::Refund, errors::StorageError> {
) -> CustomResult<storage_types::Refund, errors::StorageError> {
todo!()
}
@ -226,7 +622,7 @@ impl RefundInterface for MockDb {
merchant_id: &str,
refund_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::Refund, errors::StorageError> {
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let refunds = self.refunds.lock().await;
refunds
@ -245,7 +641,7 @@ impl RefundInterface for MockDb {
_payment_id: &str,
_merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<storage::Refund>, errors::StorageError> {
) -> CustomResult<Vec<storage_types::Refund>, errors::StorageError> {
todo!()
}
}

View File

@ -0,0 +1,36 @@
use storage_models::{errors, StorageResult};
use super::{MockDb, Store};
use crate::{
connection::pg_connection,
types::storage::reverse_lookup::{ReverseLookup, ReverseLookupNew},
};
#[async_trait::async_trait]
pub trait ReverseLookupInterface {
async fn insert_reverse_lookup(&self, _new: ReverseLookupNew) -> StorageResult<ReverseLookup>;
async fn get_lookup_by_lookup_id(&self, _id: &str) -> StorageResult<ReverseLookup>;
}
#[async_trait::async_trait]
impl ReverseLookupInterface for Store {
async fn insert_reverse_lookup(&self, new: ReverseLookupNew) -> StorageResult<ReverseLookup> {
let conn = pg_connection(&self.master_pool).await;
new.insert(&conn).await
}
async fn get_lookup_by_lookup_id(&self, id: &str) -> StorageResult<ReverseLookup> {
let conn = pg_connection(&self.master_pool).await;
ReverseLookup::find_by_lookup_id(id, &conn).await
}
}
#[async_trait::async_trait]
impl ReverseLookupInterface for MockDb {
async fn insert_reverse_lookup(&self, _new: ReverseLookupNew) -> StorageResult<ReverseLookup> {
Err(errors::DatabaseError::NotFound.into())
}
async fn get_lookup_by_lookup_id(&self, _id: &str) -> StorageResult<ReverseLookup> {
Err(errors::DatabaseError::NotFound.into())
}
}

View File

@ -40,7 +40,7 @@ impl Store {
}
#[cfg(feature = "kv_store")]
pub fn drainer_stream(&self, shard_key: &str) -> String {
pub fn get_drainer_stream_name(&self, shard_key: &str) -> String {
// Example: {shard_5}_drainer_stream
format!("{{{}}}_{}", shard_key, self.config.drainer_stream_name,)
}

View File

@ -348,7 +348,9 @@ impl From<&storage::PaymentAttempt> for BachRedirectResponse {
Self {
url: format!(
"/payments/start/{}/{}/{}",
&payment_attempt.payment_id, &payment_attempt.merchant_id, &payment_attempt.txn_id
&payment_attempt.payment_id,
&payment_attempt.merchant_id,
&payment_attempt.attempt_id
),
}
}

View File

@ -87,7 +87,7 @@ impl PaymentIdTypeExt for PaymentIdType {
fn get_payment_intent_id(&self) -> errors::CustomResult<String, errors::ValidationError> {
match self {
Self::PaymentIntentId(id) => Ok(id.clone()),
Self::ConnectorTransactionId(_) | Self::PaymentTxnId(_) => {
Self::ConnectorTransactionId(_) | Self::PaymentAttemptId(_) => {
Err(errors::ValidationError::IncorrectValueProvided {
field_name: "payment_id",
})

View File

@ -13,6 +13,7 @@ pub mod payment_attempt;
pub mod payment_intent;
pub mod payment_method;
pub mod process_tracker;
pub mod reverse_lookup;
mod query;
pub mod refund;
@ -24,5 +25,6 @@ pub mod kv;
pub use self::{
address::*, configs::*, connector_response::*, customers::*, events::*, locker_mock_up::*,
mandate::*, merchant_account::*, merchant_connector_account::*, payment_attempt::*,
payment_intent::*, payment_method::*, process_tracker::*, refund::*, temp_card::*,
payment_intent::*, payment_method::*, process_tracker::*, refund::*, reverse_lookup::*,
temp_card::*,
};

View File

@ -1,4 +1,4 @@
pub use storage_models::kv::{
DBOperation, Insertable, PaymentAttemptUpdateMems, PaymentIntentUpdateMems, TypedSql,
Updateable,
DBOperation, Insertable, PaymentAttemptUpdateMems, PaymentIntentUpdateMems, RefundUpdateMems,
TypedSql, Updateable,
};

View File

@ -1,3 +1,6 @@
pub use storage_models::refund::{
Refund, RefundCoreWorkflow, RefundNew, RefundUpdate, RefundUpdateInternal,
};
#[cfg(feature = "kv_store")]
impl crate::utils::storage_partitioning::KvStorePartition for Refund {}

View File

@ -0,0 +1 @@
pub use storage_models::reverse_lookup::{ReverseLookup, ReverseLookupNew};

View File

@ -1,7 +1,9 @@
pub(crate) mod crypto;
pub(crate) mod custom_serde;
pub(crate) mod db_utils;
mod ext_traits;
mod fp_utils;
#[cfg(feature = "kv_store")]
pub(crate) mod storage_partitioning;

View File

@ -0,0 +1,10 @@
#[cfg(feature = "kv_store")]
/// Generates hscan field pattern. Suppose the field is pa_1234_ref_1211 it will generate
/// pa_1234_ref_*
pub fn generate_hscan_pattern_for_refund(sk: &str) -> String {
sk.split('_')
.take(3)
.chain(["*"])
.collect::<Vec<&str>>()
.join("_")
}

View File

@ -562,6 +562,8 @@ pub enum ProcessTrackerStatus {
Default,
Eq,
PartialEq,
serde::Serialize,
serde::Deserialize,
strum::Display,
strum::EnumString,
router_derive::DieselEnum,
@ -585,6 +587,8 @@ pub enum RefundStatus {
Default,
Eq,
PartialEq,
serde::Serialize,
serde::Deserialize,
strum::Display,
strum::EnumString,
router_derive::DieselEnum,

View File

@ -5,6 +5,7 @@ use crate::{
errors,
payment_attempt::{PaymentAttempt, PaymentAttemptNew, PaymentAttemptUpdate},
payment_intent::{PaymentIntent, PaymentIntentNew, PaymentIntentUpdate},
refund::{Refund, RefundNew, RefundUpdate},
};
#[derive(Debug, Serialize, Deserialize)]
@ -37,6 +38,7 @@ impl TypedSql {
pub enum Insertable {
PaymentIntent(PaymentIntentNew),
PaymentAttempt(PaymentAttemptNew),
Refund(RefundNew),
}
#[derive(Debug, Serialize, Deserialize)]
@ -44,6 +46,7 @@ pub enum Insertable {
pub enum Updateable {
PaymentIntentUpdate(PaymentIntentUpdateMems),
PaymentAttemptUpdate(PaymentAttemptUpdateMems),
RefundUpdate(RefundUpdateMems),
}
#[derive(Debug, Serialize, Deserialize)]
@ -57,3 +60,9 @@ pub struct PaymentAttemptUpdateMems {
pub orig: PaymentAttempt,
pub update_data: PaymentAttemptUpdate,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RefundUpdateMems {
pub orig: Refund,
pub update_data: RefundUpdate,
}

View File

@ -19,6 +19,7 @@ pub mod payment_method;
pub mod process_tracker;
pub mod query;
pub mod refund;
pub mod reverse_lookup;
pub mod schema;
pub mod temp_card;

View File

@ -10,7 +10,7 @@ pub struct PaymentAttempt {
pub id: i32,
pub payment_id: String,
pub merchant_id: String,
pub txn_id: String,
pub attempt_id: String,
pub status: storage_enums::AttemptStatus,
pub amount: i64,
pub currency: Option<storage_enums::Currency>,
@ -47,7 +47,7 @@ pub struct PaymentAttempt {
pub struct PaymentAttemptNew {
pub payment_id: String,
pub merchant_id: String,
pub txn_id: String,
pub attempt_id: String,
pub status: storage_enums::AttemptStatus,
pub amount: i64,
pub currency: Option<storage_enums::Currency>,

View File

@ -13,4 +13,5 @@ pub mod payment_intent;
pub mod payment_method;
pub mod process_tracker;
pub mod refund;
pub mod reverse_lookup;
pub mod temp_card;

View File

@ -133,16 +133,16 @@ impl PaymentAttempt {
}
#[instrument(skip(conn))]
pub async fn find_by_merchant_id_transaction_id(
pub async fn find_by_merchant_id_attempt_id(
conn: &PgPooledConn,
merchant_id: &str,
txn_id: &str,
attempt_id: &str,
) -> StorageResult<Self> {
generics::generic_find_one::<<Self as HasTable>::Table, _, _>(
conn,
dsl::merchant_id
.eq(merchant_id.to_owned())
.and(dsl::txn_id.eq(txn_id.to_owned())),
.and(dsl::attempt_id.eq(attempt_id.to_owned())),
)
.await
}

View File

@ -9,8 +9,6 @@ use crate::{
PgPooledConn, StorageResult,
};
// FIXME: Find by partition key : Review
impl RefundNew {
#[instrument(skip(conn))]
pub async fn insert(self, conn: &PgPooledConn) -> StorageResult<Refund> {

View File

@ -0,0 +1,33 @@
use diesel::{associations::HasTable, ExpressionMethods};
use router_env::{tracing, tracing::instrument};
use super::generics;
use crate::{
reverse_lookup::{ReverseLookup, ReverseLookupNew},
schema::reverse_lookup::dsl,
PgPooledConn, StorageResult,
};
impl ReverseLookupNew {
#[instrument(skip(conn))]
pub async fn insert(self, conn: &PgPooledConn) -> StorageResult<ReverseLookup> {
generics::generic_insert(conn, self).await
}
#[instrument(skip(conn))]
pub async fn batch_insert(
reverse_lookups: Vec<Self>,
conn: &PgPooledConn,
) -> StorageResult<()> {
generics::generic_insert::<_, _, ReverseLookup>(conn, reverse_lookups).await?;
Ok(())
}
}
impl ReverseLookup {
pub async fn find_by_lookup_id(lookup_id: &str, conn: &PgPooledConn) -> StorageResult<Self> {
generics::generic_find_one::<<Self as HasTable>::Table, _, _>(
conn,
dsl::lookup_id.eq(lookup_id.to_owned()),
)
.await
}
}

View File

@ -4,7 +4,9 @@ use time::PrimitiveDateTime;
use crate::{enums as storage_enums, schema::refund};
#[derive(Clone, Debug, Eq, Identifiable, Queryable, PartialEq)]
#[derive(
Clone, Debug, Eq, Identifiable, Queryable, PartialEq, serde::Serialize, serde::Deserialize,
)]
#[diesel(table_name = refund)]
pub struct Refund {
pub id: i32,
@ -28,9 +30,20 @@ pub struct Refund {
pub created_at: PrimitiveDateTime,
pub updated_at: PrimitiveDateTime,
pub description: Option<String>,
pub attempt_id: String,
}
#[derive(Clone, Debug, Default, Eq, PartialEq, Insertable, router_derive::DebugAsDisplay)]
#[derive(
Clone,
Debug,
Default,
Eq,
PartialEq,
Insertable,
router_derive::DebugAsDisplay,
serde::Serialize,
serde::Deserialize,
)]
#[diesel(table_name = refund)]
pub struct RefundNew {
pub refund_id: String,
@ -53,9 +66,10 @@ pub struct RefundNew {
pub created_at: Option<PrimitiveDateTime>,
pub modified_at: Option<PrimitiveDateTime>,
pub description: Option<String>,
pub attempt_id: String,
}
#[derive(Debug)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum RefundUpdate {
Update {
pg_refund_id: String,
@ -132,6 +146,21 @@ impl From<RefundUpdate> for RefundUpdateInternal {
}
}
impl RefundUpdate {
pub fn apply_changeset(self, source: Refund) -> Refund {
let pa_update: RefundUpdateInternal = self.into();
Refund {
pg_refund_id: pa_update.pg_refund_id,
refund_status: pa_update.refund_status.unwrap_or(source.refund_status),
sent_to_gateway: pa_update.sent_to_gateway.unwrap_or(source.sent_to_gateway),
refund_error_message: pa_update.refund_error_message,
refund_arn: pa_update.refund_arn,
metadata: pa_update.metadata,
..source
}
}
}
#[derive(Debug, Eq, PartialEq, Deserialize, Serialize)]
pub struct RefundCoreWorkflow {
pub refund_internal_reference_id: String,

View File

@ -0,0 +1,33 @@
use diesel::{Identifiable, Insertable, Queryable};
use crate::schema::reverse_lookup;
///
/// This reverse lookup table basically looks up id's and get result_id that you want. This is
/// useful for KV where you can't lookup without key
#[derive(
Clone, Debug, serde::Serialize, serde::Deserialize, Identifiable, Queryable, Eq, PartialEq,
)]
#[diesel(table_name = reverse_lookup)]
#[diesel(primary_key(lookup_id))]
pub struct ReverseLookup {
/// Primary key. The key id.
pub lookup_id: String,
/// the value id. i.e the id you want to access KV table.
pub pk_id: String,
/// the `field` in KV database. Which is used to differentiate between two same keys
pub sk_id: String,
/// the source of insertion for reference
pub source: String,
}
#[derive(
Clone, Debug, Insertable, router_derive::DebugAsDisplay, Eq, PartialEq, serde::Serialize,
)]
#[diesel(table_name = reverse_lookup)]
pub struct ReverseLookupNew {
pub lookup_id: String,
pub pk_id: String,
pub sk_id: String,
pub source: String,
}

View File

@ -187,7 +187,7 @@ diesel::table! {
id -> Int4,
payment_id -> Varchar,
merchant_id -> Varchar,
txn_id -> Varchar,
attempt_id -> Varchar,
status -> AttemptStatus,
amount -> Int8,
currency -> Nullable<Currency>,
@ -325,6 +325,19 @@ diesel::table! {
created_at -> Timestamp,
modified_at -> Timestamp,
description -> Nullable<Varchar>,
attempt_id -> Varchar,
}
}
diesel::table! {
use diesel::sql_types::*;
use crate::enums::diesel_exports::*;
reverse_lookup (lookup_id) {
lookup_id -> Varchar,
sk_id -> Varchar,
pk_id -> Varchar,
source -> Varchar,
}
}
@ -355,5 +368,6 @@ diesel::allow_tables_to_appear_in_same_query!(
payment_methods,
process_tracker,
refund,
reverse_lookup,
temp_card,
);

View File

@ -0,0 +1,2 @@
DROP TABLE IF EXISTS reverse_lookup;

View File

@ -0,0 +1,8 @@
CREATE TABLE reverse_lookup (
lookup_id VARCHAR(255) NOT NULL PRIMARY KEY,
sk_id VARCHAR(50) NOT NULL,
pk_id VARCHAR(255) NOT NULL,
source VARCHAR(30) NOT NULL
);
CREATE INDEX lookup_id_index ON reverse_lookup (lookup_id);

View File

@ -0,0 +1,2 @@
ALTER TABLE payment_id
RENAME attempt_id to txn_id;

View File

@ -0,0 +1,2 @@
ALTER TABLE payment_attempt
RENAME COLUMN txn_id to attempt_id;

View File

@ -0,0 +1 @@
ALTER TABLE refund DROP COLUMN attempt_id;

View File

@ -0,0 +1 @@
ALTER TABLE refund ADD COLUMN attempt_id VARCHAR(64) NOT NULL;