fix: revert payment method kv changes (#4351)

Co-authored-by: Akshay S <akshay.s@Akshay-Subramanian-D66TQ6D97K.local>
This commit is contained in:
akshay-97
2024-04-12 17:37:03 +05:30
committed by GitHub
parent 986ed2a923
commit bb202e39bf

View File

@ -63,330 +63,329 @@ pub trait PaymentMethodInterface {
) -> CustomResult<storage_types::PaymentMethod, errors::StorageError>; ) -> CustomResult<storage_types::PaymentMethod, errors::StorageError>;
} }
#[cfg(feature = "kv_store")] // #[cfg(feature = "kv_store")]
mod storage { // mod storage {
use common_utils::fallback_reverse_lookup_not_found; // use common_utils::fallback_reverse_lookup_not_found;
use diesel_models::{kv, PaymentMethodUpdateInternal}; // use diesel_models::{kv, PaymentMethodUpdateInternal};
use error_stack::{report, ResultExt}; // use error_stack::{report, ResultExt};
use redis_interface::HsetnxReply; // use redis_interface::HsetnxReply;
use router_env::{instrument, tracing}; // use router_env::{instrument, tracing};
use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}; // use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey};
use super::PaymentMethodInterface; // use super::PaymentMethodInterface;
use crate::{ // use crate::{
connection, // connection,
core::errors::{self, utils::RedisErrorExt, CustomResult}, // core::errors::{self, utils::RedisErrorExt, CustomResult},
db::reverse_lookup::ReverseLookupInterface, // db::reverse_lookup::ReverseLookupInterface,
services::Store, // services::Store,
types::storage::{self as storage_types, enums::MerchantStorageScheme}, // types::storage::{self as storage_types, enums::MerchantStorageScheme},
utils::db_utils, // utils::db_utils,
}; // };
#[async_trait::async_trait] // #[async_trait::async_trait]
impl PaymentMethodInterface for Store { // impl PaymentMethodInterface for Store {
#[instrument(skip_all)] // #[instrument(skip_all)]
async fn find_payment_method( // async fn find_payment_method(
&self, // &self,
payment_method_id: &str, // payment_method_id: &str,
storage_scheme: MerchantStorageScheme, // storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage_types::PaymentMethod, errors::StorageError> { // ) -> CustomResult<storage_types::PaymentMethod, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?; // let conn = connection::pg_connection_read(self).await?;
let database_call = || async { // let database_call = || async {
storage_types::PaymentMethod::find_by_payment_method_id(&conn, payment_method_id) // storage_types::PaymentMethod::find_by_payment_method_id(&conn, payment_method_id)
.await // .await
.map_err(|error| report!(errors::StorageError::from(error))) // .map_err(|error| report!(errors::StorageError::from(error)))
}; // };
match storage_scheme { // match storage_scheme {
MerchantStorageScheme::PostgresOnly => database_call().await, // MerchantStorageScheme::PostgresOnly => database_call().await,
MerchantStorageScheme::RedisKv => { // MerchantStorageScheme::RedisKv => {
let lookup_id = format!("payment_method_{}", payment_method_id); // let lookup_id = format!("payment_method_{}", payment_method_id);
let lookup = fallback_reverse_lookup_not_found!( // let lookup = fallback_reverse_lookup_not_found!(
self.get_lookup_by_lookup_id(&lookup_id, storage_scheme) // self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
.await, // .await,
database_call().await // database_call().await
); // );
let key = PartitionKey::CombinationKey { // let key = PartitionKey::CombinationKey {
combination: &lookup.pk_id, // combination: &lookup.pk_id,
}; // };
Box::pin(db_utils::try_redis_get_else_try_database_get( // Box::pin(db_utils::try_redis_get_else_try_database_get(
async { // async {
kv_wrapper( // kv_wrapper(
self, // self,
KvOperation::<storage_types::PaymentMethod>::HGet(&lookup.sk_id), // KvOperation::<storage_types::PaymentMethod>::HGet(&lookup.sk_id),
key, // key,
) // )
.await? // .await?
.try_into_hget() // .try_into_hget()
}, // },
database_call, // database_call,
)) // ))
.await // .await
} // }
} // }
} // }
#[instrument(skip_all)] // #[instrument(skip_all)]
async fn find_payment_method_by_locker_id( // async fn find_payment_method_by_locker_id(
&self, // &self,
locker_id: &str, // locker_id: &str,
storage_scheme: MerchantStorageScheme, // storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage_types::PaymentMethod, errors::StorageError> { // ) -> CustomResult<storage_types::PaymentMethod, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?; // let conn = connection::pg_connection_read(self).await?;
let database_call = || async { // let database_call = || async {
storage_types::PaymentMethod::find_by_locker_id(&conn, locker_id) // storage_types::PaymentMethod::find_by_locker_id(&conn, locker_id)
.await // .await
.map_err(|error| report!(errors::StorageError::from(error))) // .map_err(|error| report!(errors::StorageError::from(error)))
}; // };
match storage_scheme { // match storage_scheme {
MerchantStorageScheme::PostgresOnly => database_call().await, // MerchantStorageScheme::PostgresOnly => database_call().await,
MerchantStorageScheme::RedisKv => { // MerchantStorageScheme::RedisKv => {
let lookup_id = format!("payment_method_locker_{}", locker_id); // let lookup_id = format!("payment_method_locker_{}", locker_id);
let lookup = fallback_reverse_lookup_not_found!( // let lookup = fallback_reverse_lookup_not_found!(
self.get_lookup_by_lookup_id(&lookup_id, storage_scheme) // self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
.await, // .await,
database_call().await // database_call().await
); // );
let key = PartitionKey::CombinationKey { // let key = PartitionKey::CombinationKey {
combination: &lookup.pk_id, // combination: &lookup.pk_id,
}; // };
Box::pin(db_utils::try_redis_get_else_try_database_get( // Box::pin(db_utils::try_redis_get_else_try_database_get(
async { // async {
kv_wrapper( // kv_wrapper(
self, // self,
KvOperation::<storage_types::PaymentMethod>::HGet(&lookup.sk_id), // KvOperation::<storage_types::PaymentMethod>::HGet(&lookup.sk_id),
key, // key,
) // )
.await? // .await?
.try_into_hget() // .try_into_hget()
}, // },
database_call, // database_call,
)) // ))
.await // .await
} // }
} // }
} // }
// not supported in kv // // not supported in kv
#[instrument(skip_all)] // #[instrument(skip_all)]
async fn get_payment_method_count_by_customer_id_merchant_id_status( // async fn get_payment_method_count_by_customer_id_merchant_id_status(
&self, // &self,
customer_id: &str, // customer_id: &str,
merchant_id: &str, // merchant_id: &str,
status: common_enums::PaymentMethodStatus, // status: common_enums::PaymentMethodStatus,
) -> CustomResult<i64, errors::StorageError> { // ) -> CustomResult<i64, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?; // let conn = connection::pg_connection_read(self).await?;
storage_types::PaymentMethod::get_count_by_customer_id_merchant_id_status( // storage_types::PaymentMethod::get_count_by_customer_id_merchant_id_status(
&conn, // &conn,
customer_id, // customer_id,
merchant_id, // merchant_id,
status, // status,
) // )
.await // .await
.map_err(|error| report!(errors::StorageError::from(error))) // .map_err(|error| report!(errors::StorageError::from(error)))
} // }
#[instrument(skip_all)] // #[instrument(skip_all)]
async fn insert_payment_method( // async fn insert_payment_method(
&self, // &self,
payment_method_new: storage_types::PaymentMethodNew, // payment_method_new: storage_types::PaymentMethodNew,
storage_scheme: MerchantStorageScheme, // storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage_types::PaymentMethod, errors::StorageError> { // ) -> CustomResult<storage_types::PaymentMethod, errors::StorageError> {
match storage_scheme { // match storage_scheme {
MerchantStorageScheme::PostgresOnly => { // MerchantStorageScheme::PostgresOnly => {
let conn = connection::pg_connection_write(self).await?; // let conn = connection::pg_connection_write(self).await?;
payment_method_new // payment_method_new
.insert(&conn) // .insert(&conn)
.await // .await
.map_err(|error| report!(errors::StorageError::from(error))) // .map_err(|error| report!(errors::StorageError::from(error)))
} // }
MerchantStorageScheme::RedisKv => { // MerchantStorageScheme::RedisKv => {
let merchant_id = payment_method_new.merchant_id.clone(); // let merchant_id = payment_method_new.merchant_id.clone();
let customer_id = payment_method_new.customer_id.clone(); // let customer_id = payment_method_new.customer_id.clone();
let key = PartitionKey::MerchantIdCustomerId { // let key = PartitionKey::MerchantIdCustomerId {
merchant_id: &merchant_id, // merchant_id: &merchant_id,
customer_id: &customer_id, // customer_id: &customer_id,
}; // };
let key_str = key.to_string(); // let key_str = key.to_string();
let field = // let field =
format!("payment_method_id_{}", payment_method_new.payment_method_id); // format!("payment_method_id_{}", payment_method_new.payment_method_id);
let reverse_lookup_entry = |v: String| diesel_models::ReverseLookupNew { // let reverse_lookup_entry = |v: String| diesel_models::ReverseLookupNew {
sk_id: field.clone(), // sk_id: field.clone(),
pk_id: key_str.clone(), // pk_id: key_str.clone(),
lookup_id: v, // lookup_id: v,
source: "payment_method".to_string(), // source: "payment_method".to_string(),
updated_by: storage_scheme.to_string(), // updated_by: storage_scheme.to_string(),
}; // };
let lookup_id1 = // let lookup_id1 =
format!("payment_method_{}", &payment_method_new.payment_method_id); // format!("payment_method_{}", &payment_method_new.payment_method_id);
let mut reverse_lookups = vec![lookup_id1]; // let mut reverse_lookups = vec![lookup_id1];
if let Some(locker_id) = &payment_method_new.locker_id { // if let Some(locker_id) = &payment_method_new.locker_id {
reverse_lookups.push(format!("payment_method_locker_{}", locker_id)) // reverse_lookups.push(format!("payment_method_locker_{}", locker_id))
} // }
let results = reverse_lookups.into_iter().map(|v| { // let results = reverse_lookups.into_iter().map(|v| {
self.insert_reverse_lookup(reverse_lookup_entry(v), storage_scheme) // self.insert_reverse_lookup(reverse_lookup_entry(v), storage_scheme)
}); // });
futures::future::try_join_all(results).await?; // futures::future::try_join_all(results).await?;
let storage_payment_method = (&payment_method_new).into(); // let storage_payment_method = (&payment_method_new).into();
let redis_entry = kv::TypedSql { // let redis_entry = kv::TypedSql {
op: kv::DBOperation::Insert { // op: kv::DBOperation::Insert {
insertable: kv::Insertable::PaymentMethod(payment_method_new), // insertable: kv::Insertable::PaymentMethod(payment_method_new),
}, // },
}; // };
match kv_wrapper::<diesel_models::PaymentMethod, _, _>( // match kv_wrapper::<diesel_models::PaymentMethod, _, _>(
self, // self,
KvOperation::<diesel_models::PaymentMethod>::HSetNx( // KvOperation::<diesel_models::PaymentMethod>::HSetNx(
&field, // &field,
&storage_payment_method, // &storage_payment_method,
redis_entry, // redis_entry,
), // ),
key, // key,
) // )
.await // .await
.map_err(|err| err.to_redis_failed_response(&key_str))? // .map_err(|err| err.to_redis_failed_response(&key_str))?
.try_into_hsetnx() // .try_into_hsetnx()
{ // {
Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue { // Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue {
entity: "payment_method", // entity: "payment_method",
key: Some(storage_payment_method.payment_method_id), // key: Some(storage_payment_method.payment_method_id),
} // }
.into()), // .into()),
Ok(HsetnxReply::KeySet) => Ok(storage_payment_method), // Ok(HsetnxReply::KeySet) => Ok(storage_payment_method),
Err(er) => Err(er).change_context(errors::StorageError::KVError), // Err(er) => Err(er).change_context(errors::StorageError::KVError),
} // }
} // }
} // }
} // }
#[instrument(skip_all)] // #[instrument(skip_all)]
async fn update_payment_method( // async fn update_payment_method(
&self, // &self,
payment_method: storage_types::PaymentMethod, // payment_method: storage_types::PaymentMethod,
payment_method_update: storage_types::PaymentMethodUpdate, // payment_method_update: storage_types::PaymentMethodUpdate,
storage_scheme: MerchantStorageScheme, // storage_scheme: MerchantStorageScheme,
) -> CustomResult<storage_types::PaymentMethod, errors::StorageError> { // ) -> CustomResult<storage_types::PaymentMethod, errors::StorageError> {
match storage_scheme { // match storage_scheme {
MerchantStorageScheme::PostgresOnly => { // MerchantStorageScheme::PostgresOnly => {
let conn = connection::pg_connection_write(self).await?; // let conn = connection::pg_connection_write(self).await?;
payment_method // payment_method
.update_with_payment_method_id(&conn, payment_method_update.into()) // .update_with_payment_method_id(&conn, payment_method_update.into())
.await // .await
.map_err(|error| report!(errors::StorageError::from(error))) // .map_err(|error| report!(errors::StorageError::from(error)))
} // }
MerchantStorageScheme::RedisKv => { // MerchantStorageScheme::RedisKv => {
let merchant_id = payment_method.merchant_id.clone(); // let merchant_id = payment_method.merchant_id.clone();
let customer_id = payment_method.customer_id.clone(); // let customer_id = payment_method.customer_id.clone();
let key = PartitionKey::MerchantIdCustomerId { // let key = PartitionKey::MerchantIdCustomerId {
merchant_id: &merchant_id, // merchant_id: &merchant_id,
customer_id: &customer_id, // customer_id: &customer_id,
}; // };
let key_str = key.to_string(); // let key_str = key.to_string();
let field = format!("payment_method_id_{}", payment_method.payment_method_id); // let field = format!("payment_method_id_{}", payment_method.payment_method_id);
let p_update: PaymentMethodUpdateInternal = payment_method_update.into(); // let p_update: PaymentMethodUpdateInternal = payment_method_update.into();
let updated_payment_method = // let updated_payment_method =
p_update.clone().apply_changeset(payment_method.clone()); // p_update.clone().apply_changeset(payment_method.clone());
let redis_value = serde_json::to_string(&updated_payment_method) // let redis_value = serde_json::to_string(&updated_payment_method)
.change_context(errors::StorageError::SerializationFailed)?; // .change_context(errors::StorageError::SerializationFailed)?;
let redis_entry = kv::TypedSql { // let redis_entry = kv::TypedSql {
op: kv::DBOperation::Update { // op: kv::DBOperation::Update {
updatable: kv::Updateable::PaymentMethodUpdate( // updatable: kv::Updateable::PaymentMethodUpdate(
kv::PaymentMethodUpdateMems { // kv::PaymentMethodUpdateMems {
orig: payment_method, // orig: payment_method,
update_data: p_update, // update_data: p_update,
}, // },
), // ),
}, // },
}; // };
kv_wrapper::<(), _, _>( // kv_wrapper::<(), _, _>(
self, // self,
KvOperation::<diesel_models::PaymentMethod>::Hset( // KvOperation::<diesel_models::PaymentMethod>::Hset(
(&field, redis_value), // (&field, redis_value),
redis_entry, // redis_entry,
), // ),
key, // key,
) // )
.await // .await
.map_err(|err| err.to_redis_failed_response(&key_str))? // .map_err(|err| err.to_redis_failed_response(&key_str))?
.try_into_hset() // .try_into_hset()
.change_context(errors::StorageError::KVError)?; // .change_context(errors::StorageError::KVError)?;
Ok(updated_payment_method) // Ok(updated_payment_method)
} // }
} // }
} // }
#[instrument(skip_all)] // #[instrument(skip_all)]
async fn find_payment_method_by_customer_id_merchant_id_list( // async fn find_payment_method_by_customer_id_merchant_id_list(
&self, // &self,
customer_id: &str, // customer_id: &str,
merchant_id: &str, // merchant_id: &str,
limit: Option<i64>, // limit: Option<i64>,
) -> CustomResult<Vec<storage_types::PaymentMethod>, errors::StorageError> { // ) -> CustomResult<Vec<storage_types::PaymentMethod>, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?; // let conn = connection::pg_connection_read(self).await?;
storage_types::PaymentMethod::find_by_customer_id_merchant_id( // storage_types::PaymentMethod::find_by_customer_id_merchant_id(
&conn, // &conn,
customer_id, // customer_id,
merchant_id, // merchant_id,
limit, // limit,
) // )
.await // .await
.map_err(|error| report!(errors::StorageError::from(error))) // .map_err(|error| report!(errors::StorageError::from(error)))
} // }
#[instrument(skip_all)] // #[instrument(skip_all)]
async fn find_payment_method_by_customer_id_merchant_id_status( // async fn find_payment_method_by_customer_id_merchant_id_status(
&self, // &self,
customer_id: &str, // customer_id: &str,
merchant_id: &str, // merchant_id: &str,
status: common_enums::PaymentMethodStatus, // status: common_enums::PaymentMethodStatus,
limit: Option<i64>, // limit: Option<i64>,
) -> CustomResult<Vec<storage_types::PaymentMethod>, errors::StorageError> { // ) -> CustomResult<Vec<storage_types::PaymentMethod>, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?; // let conn = connection::pg_connection_read(self).await?;
storage_types::PaymentMethod::find_by_customer_id_merchant_id_status( // storage_types::PaymentMethod::find_by_customer_id_merchant_id_status(
&conn, // &conn,
customer_id, // customer_id,
merchant_id, // merchant_id,
status, // status,
limit, // limit,
) // )
.await // .await
.map_err(|error| report!(errors::StorageError::from(error))) // .map_err(|error| report!(errors::StorageError::from(error)))
} // }
async fn delete_payment_method_by_merchant_id_payment_method_id( // async fn delete_payment_method_by_merchant_id_payment_method_id(
&self, // &self,
merchant_id: &str, // merchant_id: &str,
payment_method_id: &str, // payment_method_id: &str,
) -> CustomResult<storage_types::PaymentMethod, errors::StorageError> { // ) -> CustomResult<storage_types::PaymentMethod, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?; // let conn = connection::pg_connection_write(self).await?;
storage_types::PaymentMethod::delete_by_merchant_id_payment_method_id( // storage_types::PaymentMethod::delete_by_merchant_id_payment_method_id(
&conn, // &conn,
merchant_id, // merchant_id,
payment_method_id, // payment_method_id,
) // )
.await // .await
.map_err(|error| report!(errors::StorageError::from(error))) // .map_err(|error| report!(errors::StorageError::from(error)))
} // }
} // }
} // }
#[cfg(not(feature = "kv_store"))]
mod storage { mod storage {
use error_stack::report; use error_stack::report;
use router_env::{instrument, tracing}; use router_env::{instrument, tracing};