feat(connector_response): kv for connector response table (#2207)

This commit is contained in:
Kartikeya Hegde
2023-09-25 20:31:13 +05:30
committed by GitHub
parent c8d35449cf
commit cefa291c00
8 changed files with 305 additions and 62 deletions

View File

@ -24,7 +24,6 @@ pub struct ConnectorResponseNew {
#[derive(Clone, Debug, Deserialize, Serialize, Identifiable, Queryable)]
#[diesel(table_name = connector_response)]
pub struct ConnectorResponse {
#[serde(skip_serializing)]
pub id: i32,
pub payment_id: String,
pub merchant_id: String,
@ -49,7 +48,7 @@ pub struct ConnectorResponseUpdateInternal {
pub connector_name: Option<String>,
}
#[derive(Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ConnectorResponseUpdate {
ResponseUpdate {
connector_transaction_id: Option<String>,

View File

@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize};
use crate::{
address::AddressNew,
connector_response::{ConnectorResponse, ConnectorResponseNew, ConnectorResponseUpdate},
errors,
payment_attempt::{PaymentAttempt, PaymentAttemptNew, PaymentAttemptUpdate},
payment_intent::{PaymentIntent, PaymentIntentNew, PaymentIntentUpdate},
@ -40,6 +41,7 @@ pub enum Insertable {
PaymentIntent(PaymentIntentNew),
PaymentAttempt(PaymentAttemptNew),
Refund(RefundNew),
ConnectorResponse(ConnectorResponseNew),
Address(Box<AddressNew>),
}
@ -49,6 +51,13 @@ pub enum Updateable {
PaymentIntentUpdate(PaymentIntentUpdateMems),
PaymentAttemptUpdate(PaymentAttemptUpdateMems),
RefundUpdate(RefundUpdateMems),
ConnectorResponseUpdate(ConnectorResponseUpdateMems),
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ConnectorResponseUpdateMems {
pub orig: ConnectorResponse,
pub update_data: ConnectorResponseUpdate,
}
#[derive(Debug, Serialize, Deserialize)]

View File

@ -26,9 +26,17 @@ impl ConnectorResponse {
conn: &PgPooledConn,
connector_response: ConnectorResponseUpdate,
) -> StorageResult<Self> {
match generics::generic_update_by_id::<<Self as HasTable>::Table, _, _, _>(
match generics::generic_update_with_unique_predicate_get_result::<
<Self as HasTable>::Table,
_,
_,
_,
>(
conn,
self.id,
dsl::merchant_id
.eq(self.merchant_id.clone())
.and(dsl::payment_id.eq(self.payment_id.clone()))
.and(dsl::attempt_id.eq(self.attempt_id.clone())),
ConnectorResponseUpdateInternal::from(connector_response),
)
.await

View File

@ -165,6 +165,7 @@ async fn drainer(
let payment_intent = "payment_intent";
let payment_attempt = "payment_attempt";
let refund = "refund";
let connector_response = "connector_response";
let address = "address";
match db_op {
// TODO: Handle errors
@ -188,6 +189,13 @@ async fn drainer(
kv::Insertable::Refund(a) => {
macro_util::handle_resp!(a.insert(&conn).await, insert_op, refund)
}
kv::Insertable::ConnectorResponse(a) => {
macro_util::handle_resp!(
a.insert(&conn).await,
insert_op,
connector_response
)
}
kv::Insertable::Address(addr) => {
macro_util::handle_resp!(addr.insert(&conn).await, insert_op, address)
}
@ -227,6 +235,11 @@ async fn drainer(
refund
)
}
kv::Updateable::ConnectorResponseUpdate(a) => macro_util::handle_resp!(
a.orig.update(&conn, a.update_data).await,
update_op,
connector_response
),
}
})
.await;

View File

@ -3,18 +3,17 @@ use router_env::{instrument, tracing};
use super::{MockDb, Store};
use crate::{
connection,
core::errors::{self, CustomResult},
types::storage::{self, enums},
types::storage::{self as storage_type, enums},
};
#[async_trait::async_trait]
pub trait ConnectorResponseInterface {
async fn insert_connector_response(
&self,
connector_response: storage::ConnectorResponseNew,
connector_response: storage_type::ConnectorResponseNew,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::ConnectorResponse, errors::StorageError>;
) -> CustomResult<storage_type::ConnectorResponse, errors::StorageError>;
async fn find_connector_response_by_payment_id_merchant_id_attempt_id(
&self,
@ -22,63 +21,272 @@ pub trait ConnectorResponseInterface {
merchant_id: &str,
attempt_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::ConnectorResponse, errors::StorageError>;
) -> CustomResult<storage_type::ConnectorResponse, errors::StorageError>;
async fn update_connector_response(
&self,
this: storage::ConnectorResponse,
payment_attempt: storage::ConnectorResponseUpdate,
this: storage_type::ConnectorResponse,
payment_attempt: storage_type::ConnectorResponseUpdate,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::ConnectorResponse, errors::StorageError>;
) -> CustomResult<storage_type::ConnectorResponse, errors::StorageError>;
}
#[async_trait::async_trait]
impl ConnectorResponseInterface for Store {
#[instrument(skip_all)]
async fn insert_connector_response(
&self,
connector_response: storage::ConnectorResponseNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::ConnectorResponse, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
connector_response
.insert(&conn)
#[cfg(not(feature = "kv_store"))]
mod storage {
use error_stack::IntoReport;
use router_env::{instrument, tracing};
use super::Store;
use crate::{
connection,
core::errors::{self, CustomResult},
types::storage::{self as storage_type, enums},
};
#[async_trait::async_trait]
impl super::ConnectorResponseInterface for Store {
#[instrument(skip_all)]
async fn insert_connector_response(
&self,
connector_response: storage_type::ConnectorResponseNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_type::ConnectorResponse, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
connector_response
.insert(&conn)
.await
.map_err(Into::into)
.into_report()
}
#[instrument(skip_all)]
async fn find_connector_response_by_payment_id_merchant_id_attempt_id(
&self,
payment_id: &str,
merchant_id: &str,
attempt_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_type::ConnectorResponse, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage_type::ConnectorResponse::find_by_payment_id_merchant_id_attempt_id(
&conn,
payment_id,
merchant_id,
attempt_id,
)
.await
.map_err(Into::into)
.into_report()
}
}
#[instrument(skip_all)]
async fn find_connector_response_by_payment_id_merchant_id_attempt_id(
&self,
payment_id: &str,
merchant_id: &str,
attempt_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::ConnectorResponse, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage::ConnectorResponse::find_by_payment_id_merchant_id_attempt_id(
&conn,
payment_id,
merchant_id,
attempt_id,
)
.await
.map_err(Into::into)
.into_report()
async fn update_connector_response(
&self,
this: storage_type::ConnectorResponse,
connector_response_update: storage_type::ConnectorResponseUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_type::ConnectorResponse, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
this.update(&conn, connector_response_update)
.await
.map_err(Into::into)
.into_report()
}
}
}
async fn update_connector_response(
&self,
this: storage::ConnectorResponse,
connector_response_update: storage::ConnectorResponseUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::ConnectorResponse, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
this.update(&conn, connector_response_update)
.await
.map_err(Into::into)
.into_report()
#[cfg(feature = "kv_store")]
mod storage {
use error_stack::{IntoReport, ResultExt};
use redis_interface::HsetnxReply;
use router_env::{instrument, tracing};
use storage_impl::redis::kv_store::{PartitionKey, RedisConnInterface};
use super::Store;
use crate::{
connection,
core::errors::{self, CustomResult},
types::storage::{self as storage_type, enums, kv},
utils::db_utils,
};
#[async_trait::async_trait]
impl super::ConnectorResponseInterface for Store {
#[instrument(skip_all)]
async fn insert_connector_response(
&self,
connector_response: storage_type::ConnectorResponseNew,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_type::ConnectorResponse, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
match storage_scheme {
data_models::MerchantStorageScheme::PostgresOnly => connector_response
.insert(&conn)
.await
.map_err(Into::into)
.into_report(),
data_models::MerchantStorageScheme::RedisKv => {
let merchant_id = &connector_response.merchant_id;
let payment_id = &connector_response.payment_id;
let attempt_id = &connector_response.attempt_id;
let key = format!("{merchant_id}_{payment_id}");
let field = format!("connector_resp_{merchant_id}_{payment_id}_{attempt_id}");
let created_connector_resp = storage_type::ConnectorResponse {
id: Default::default(),
payment_id: connector_response.payment_id.clone(),
merchant_id: connector_response.merchant_id.clone(),
attempt_id: connector_response.attempt_id.clone(),
created_at: connector_response.created_at,
modified_at: connector_response.modified_at,
connector_name: connector_response.connector_name.clone(),
connector_transaction_id: connector_response
.connector_transaction_id
.clone(),
authentication_data: connector_response.authentication_data.clone(),
encoded_data: connector_response.encoded_data.clone(),
};
match self
.get_redis_conn()
.map_err(|er| error_stack::report!(errors::StorageError::RedisError(er)))?
.serialize_and_set_hash_field_if_not_exist(
&key,
&field,
&created_connector_resp,
)
.await
{
Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue {
entity: "address",
key: Some(key),
})
.into_report(),
Ok(HsetnxReply::KeySet) => {
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Insert {
insertable: kv::Insertable::ConnectorResponse(
connector_response.clone(),
),
},
};
self.push_to_drainer_stream::<diesel_models::ConnectorResponse>(
redis_entry,
PartitionKey::MerchantIdPaymentId {
merchant_id,
payment_id,
},
)
.await
.change_context(errors::StorageError::KVError)?;
Ok(created_connector_resp)
}
Err(er) => Err(er).change_context(errors::StorageError::KVError),
}
}
}
}
#[instrument(skip_all)]
async fn find_connector_response_by_payment_id_merchant_id_attempt_id(
&self,
payment_id: &str,
merchant_id: &str,
attempt_id: &str,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_type::ConnectorResponse, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
let database_call = || async {
storage_type::ConnectorResponse::find_by_payment_id_merchant_id_attempt_id(
&conn,
payment_id,
merchant_id,
attempt_id,
)
.await
.map_err(Into::into)
.into_report()
};
match storage_scheme {
data_models::MerchantStorageScheme::PostgresOnly => database_call().await,
data_models::MerchantStorageScheme::RedisKv => {
let key = format!("{merchant_id}_{payment_id}");
let field = format!("connector_resp_{merchant_id}_{payment_id}_{attempt_id}");
let redis_conn = self
.get_redis_conn()
.map_err(|er| error_stack::report!(errors::StorageError::RedisError(er)))?;
let redis_fut = redis_conn.get_hash_field_and_deserialize(
&key,
&field,
"ConnectorResponse",
);
db_utils::try_redis_get_else_try_database_get(redis_fut, database_call).await
}
}
}
async fn update_connector_response(
&self,
this: storage_type::ConnectorResponse,
connector_response_update: storage_type::ConnectorResponseUpdate,
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_type::ConnectorResponse, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
match storage_scheme {
data_models::MerchantStorageScheme::PostgresOnly => this
.update(&conn, connector_response_update)
.await
.map_err(Into::into)
.into_report(),
data_models::MerchantStorageScheme::RedisKv => {
let key = format!("{}_{}", this.merchant_id, this.payment_id);
let updated_connector_response = connector_response_update
.clone()
.apply_changeset(this.clone());
let redis_value = serde_json::to_string(&updated_connector_response)
.into_report()
.change_context(errors::StorageError::KVError)?;
let field = format!(
"connector_resp_{}_{}_{}",
&updated_connector_response.merchant_id,
&updated_connector_response.payment_id,
&updated_connector_response.attempt_id
);
let updated_connector_response = self
.get_redis_conn()
.map_err(|er| error_stack::report!(errors::StorageError::RedisError(er)))?
.set_hash_fields(&key, (&field, &redis_value))
.await
.map(|_| updated_connector_response)
.change_context(errors::StorageError::KVError)?;
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Update {
updatable: kv::Updateable::ConnectorResponseUpdate(
kv::ConnectorResponseUpdateMems {
orig: this,
update_data: connector_response_update,
},
),
},
};
self.push_to_drainer_stream::<storage_type::ConnectorResponse>(
redis_entry,
PartitionKey::MerchantIdPaymentId {
merchant_id: &updated_connector_response.merchant_id,
payment_id: &updated_connector_response.payment_id,
},
)
.await
.change_context(errors::StorageError::KVError)?;
Ok(updated_connector_response)
}
}
}
}
}
@ -87,11 +295,11 @@ impl ConnectorResponseInterface for MockDb {
#[instrument(skip_all)]
async fn insert_connector_response(
&self,
new: storage::ConnectorResponseNew,
new: storage_type::ConnectorResponseNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::ConnectorResponse, errors::StorageError> {
) -> CustomResult<storage_type::ConnectorResponse, errors::StorageError> {
let mut connector_response = self.connector_response.lock().await;
let response = storage::ConnectorResponse {
let response = storage_type::ConnectorResponse {
id: connector_response
.len()
.try_into()
@ -118,7 +326,7 @@ impl ConnectorResponseInterface for MockDb {
_merchant_id: &str,
_attempt_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::ConnectorResponse, errors::StorageError> {
) -> CustomResult<storage_type::ConnectorResponse, errors::StorageError> {
// [#172]: Implement function for `MockDb`
Err(errors::StorageError::MockDbError)?
}
@ -127,10 +335,10 @@ impl ConnectorResponseInterface for MockDb {
#[allow(clippy::unwrap_used)]
async fn update_connector_response(
&self,
this: storage::ConnectorResponse,
connector_response_update: storage::ConnectorResponseUpdate,
this: storage_type::ConnectorResponse,
connector_response_update: storage_type::ConnectorResponseUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::ConnectorResponse, errors::StorageError> {
) -> CustomResult<storage_type::ConnectorResponse, errors::StorageError> {
let mut connector_response = self.connector_response.lock().await;
let response = connector_response
.iter_mut()

View File

@ -1,4 +1,4 @@
pub use diesel_models::kv::{
DBOperation, Insertable, PaymentAttemptUpdateMems, PaymentIntentUpdateMems, RefundUpdateMems,
TypedSql, Updateable,
ConnectorResponseUpdateMems, DBOperation, Insertable, PaymentAttemptUpdateMems,
PaymentIntentUpdateMems, RefundUpdateMems, TypedSql, Updateable,
};

View File

@ -0,0 +1,5 @@
use diesel_models::connector_response::ConnectorResponse;
use crate::redis::kv_store::KvStorePartition;
impl KvStorePartition for ConnectorResponse {}

View File

@ -8,6 +8,7 @@ use redis::{kv_store::RedisConnInterface, RedisStore};
mod address;
pub mod config;
pub mod connection;
mod connector_response;
pub mod database;
pub mod errors;
mod lookup;