feat(sample_data): extend the batch sample data interface trait for disputes (#6293)

This commit is contained in:
Tommy shu
2024-10-16 02:43:33 -04:00
committed by GitHub
parent da194f34c6
commit 1b31c57fd9
4 changed files with 157 additions and 3 deletions

View File

@ -12,9 +12,11 @@ use crate::schema_v2::{
payment_attempt::dsl as payment_attempt_dsl, payment_intent::dsl as payment_intent_dsl, payment_attempt::dsl as payment_attempt_dsl, payment_intent::dsl as payment_intent_dsl,
}; };
use crate::{ use crate::{
errors, schema::refund::dsl as refund_dsl, user::sample_data::PaymentAttemptBatchNew, errors,
PaymentAttempt, PaymentIntent, PaymentIntentNew, PgPooledConn, Refund, RefundNew, schema::{dispute::dsl as dispute_dsl, refund::dsl as refund_dsl},
StorageResult, user::sample_data::PaymentAttemptBatchNew,
Dispute, DisputeNew, PaymentAttempt, PaymentIntent, PaymentIntentNew, PgPooledConn, Refund,
RefundNew, StorageResult,
}; };
pub async fn insert_payment_intents( pub async fn insert_payment_intents(
@ -61,6 +63,21 @@ pub async fn insert_refunds(
.attach_printable("Error while inserting refunds") .attach_printable("Error while inserting refunds")
} }
pub async fn insert_disputes(
conn: &PgPooledConn,
batch: Vec<DisputeNew>,
) -> StorageResult<Vec<Dispute>> {
let query = diesel::insert_into(<Dispute>::table()).values(batch);
logger::debug!(query = %debug_query::<diesel::pg::Pg,_>(&query).to_string());
query
.get_results_async(conn)
.await
.change_context(errors::DatabaseError::Others)
.attach_printable("Error while inserting disputes")
}
#[cfg(feature = "v1")] #[cfg(feature = "v1")]
pub async fn delete_payment_intents( pub async fn delete_payment_intents(
conn: &PgPooledConn, conn: &PgPooledConn,
@ -165,3 +182,29 @@ pub async fn delete_refunds(
_ => Ok(result), _ => Ok(result),
}) })
} }
pub async fn delete_disputes(
conn: &PgPooledConn,
merchant_id: &common_utils::id_type::MerchantId,
) -> StorageResult<Vec<Dispute>> {
let query = diesel::delete(<Dispute>::table())
.filter(dispute_dsl::merchant_id.eq(merchant_id.to_owned()))
.filter(dispute_dsl::dispute_id.like("test_%"));
logger::debug!(query = %debug_query::<diesel::pg::Pg,_>(&query).to_string());
query
.get_results_async(conn)
.await
.change_context(errors::DatabaseError::Others)
.attach_printable("Error while deleting disputes")
.and_then(|result| match result.len() {
n if n > 0 => {
logger::debug!("{n} records deleted");
Ok(result)
}
0 => Err(error_stack::report!(errors::DatabaseError::NotFound)
.attach_printable("No records deleted")),
_ => Ok(result),
})
}

View File

@ -3239,6 +3239,26 @@ impl BatchSampleDataInterface for KafkaStore {
Ok(refunds_list) Ok(refunds_list)
} }
#[cfg(feature = "v1")]
async fn insert_disputes_batch_for_sample_data(
&self,
batch: Vec<diesel_models::DisputeNew>,
) -> CustomResult<Vec<diesel_models::Dispute>, hyperswitch_domain_models::errors::StorageError>
{
let disputes_list = self
.diesel_store
.insert_disputes_batch_for_sample_data(batch)
.await?;
for dispute in disputes_list.iter() {
let _ = self
.kafka_producer
.log_dispute(dispute, None, self.tenant_id.clone())
.await;
}
Ok(disputes_list)
}
#[cfg(feature = "v1")] #[cfg(feature = "v1")]
async fn delete_payment_intents_for_sample_data( async fn delete_payment_intents_for_sample_data(
&self, &self,
@ -3306,6 +3326,27 @@ impl BatchSampleDataInterface for KafkaStore {
Ok(refunds_list) Ok(refunds_list)
} }
#[cfg(feature = "v1")]
async fn delete_disputes_for_sample_data(
&self,
merchant_id: &id_type::MerchantId,
) -> CustomResult<Vec<diesel_models::Dispute>, hyperswitch_domain_models::errors::StorageError>
{
let disputes_list = self
.diesel_store
.delete_disputes_for_sample_data(merchant_id)
.await?;
for dispute in disputes_list.iter() {
let _ = self
.kafka_producer
.log_dispute_delete(dispute, self.tenant_id.clone())
.await;
}
Ok(disputes_list)
}
} }
#[async_trait::async_trait] #[async_trait::async_trait]

View File

@ -1,5 +1,6 @@
use common_utils::types::keymanager::KeyManagerState; use common_utils::types::keymanager::KeyManagerState;
use diesel_models::{ use diesel_models::{
dispute::{Dispute, DisputeNew},
errors::DatabaseError, errors::DatabaseError,
query::user::sample_data as sample_data_queries, query::user::sample_data as sample_data_queries,
refund::{Refund, RefundNew}, refund::{Refund, RefundNew},
@ -39,6 +40,12 @@ pub trait BatchSampleDataInterface {
batch: Vec<RefundNew>, batch: Vec<RefundNew>,
) -> CustomResult<Vec<Refund>, StorageError>; ) -> CustomResult<Vec<Refund>, StorageError>;
#[cfg(feature = "v1")]
async fn insert_disputes_batch_for_sample_data(
&self,
batch: Vec<DisputeNew>,
) -> CustomResult<Vec<Dispute>, StorageError>;
#[cfg(feature = "v1")] #[cfg(feature = "v1")]
async fn delete_payment_intents_for_sample_data( async fn delete_payment_intents_for_sample_data(
&self, &self,
@ -58,6 +65,12 @@ pub trait BatchSampleDataInterface {
&self, &self,
merchant_id: &common_utils::id_type::MerchantId, merchant_id: &common_utils::id_type::MerchantId,
) -> CustomResult<Vec<Refund>, StorageError>; ) -> CustomResult<Vec<Refund>, StorageError>;
#[cfg(feature = "v1")]
async fn delete_disputes_for_sample_data(
&self,
merchant_id: &common_utils::id_type::MerchantId,
) -> CustomResult<Vec<Dispute>, StorageError>;
} }
#[async_trait::async_trait] #[async_trait::async_trait]
@ -127,6 +140,19 @@ impl BatchSampleDataInterface for Store {
.map_err(diesel_error_to_data_error) .map_err(diesel_error_to_data_error)
} }
#[cfg(feature = "v1")]
async fn insert_disputes_batch_for_sample_data(
&self,
batch: Vec<DisputeNew>,
) -> CustomResult<Vec<Dispute>, StorageError> {
let conn = pg_connection_write(self)
.await
.change_context(StorageError::DatabaseConnectionError)?;
sample_data_queries::insert_disputes(&conn, batch)
.await
.map_err(diesel_error_to_data_error)
}
#[cfg(feature = "v1")] #[cfg(feature = "v1")]
async fn delete_payment_intents_for_sample_data( async fn delete_payment_intents_for_sample_data(
&self, &self,
@ -184,6 +210,19 @@ impl BatchSampleDataInterface for Store {
.await .await
.map_err(diesel_error_to_data_error) .map_err(diesel_error_to_data_error)
} }
#[cfg(feature = "v1")]
async fn delete_disputes_for_sample_data(
&self,
merchant_id: &common_utils::id_type::MerchantId,
) -> CustomResult<Vec<Dispute>, StorageError> {
let conn = pg_connection_write(self)
.await
.change_context(StorageError::DatabaseConnectionError)?;
sample_data_queries::delete_disputes(&conn, merchant_id)
.await
.map_err(diesel_error_to_data_error)
}
} }
#[async_trait::async_trait] #[async_trait::async_trait]
@ -214,6 +253,14 @@ impl BatchSampleDataInterface for storage_impl::MockDb {
Err(StorageError::MockDbError)? Err(StorageError::MockDbError)?
} }
#[cfg(feature = "v1")]
async fn insert_disputes_batch_for_sample_data(
&self,
_batch: Vec<DisputeNew>,
) -> CustomResult<Vec<Dispute>, StorageError> {
Err(StorageError::MockDbError)?
}
#[cfg(feature = "v1")] #[cfg(feature = "v1")]
async fn delete_payment_intents_for_sample_data( async fn delete_payment_intents_for_sample_data(
&self, &self,
@ -239,6 +286,14 @@ impl BatchSampleDataInterface for storage_impl::MockDb {
) -> CustomResult<Vec<Refund>, StorageError> { ) -> CustomResult<Vec<Refund>, StorageError> {
Err(StorageError::MockDbError)? Err(StorageError::MockDbError)?
} }
#[cfg(feature = "v1")]
async fn delete_disputes_for_sample_data(
&self,
_merchant_id: &common_utils::id_type::MerchantId,
) -> CustomResult<Vec<Dispute>, StorageError> {
Err(StorageError::MockDbError)?
}
} }
// TODO: This error conversion is re-used from storage_impl and is not DRY when it should be // TODO: This error conversion is re-used from storage_impl and is not DRY when it should be

View File

@ -582,6 +582,21 @@ impl KafkaProducer {
.attach_printable_lazy(|| format!("Failed to add consolidated dispute event {dispute:?}")) .attach_printable_lazy(|| format!("Failed to add consolidated dispute event {dispute:?}"))
} }
pub async fn log_dispute_delete(
&self,
delete_old_dispute: &Dispute,
tenant_id: TenantID,
) -> MQResult<()> {
self.log_event(&KafkaEvent::old(
&KafkaDispute::from_storage(delete_old_dispute),
tenant_id.clone(),
self.ckh_database_name.clone(),
))
.attach_printable_lazy(|| {
format!("Failed to add negative dispute event {delete_old_dispute:?}")
})
}
#[cfg(feature = "payouts")] #[cfg(feature = "payouts")]
pub async fn log_payout( pub async fn log_payout(
&self, &self,