diff --git a/config/config.example.toml b/config/config.example.toml index ebb58b4305..8f7ea8a992 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -569,7 +569,7 @@ connector_logs_topic = "topic" # Kafka topic to be used for connector api outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webhook events dispute_analytics_topic = "topic" # Kafka topic to be used for Dispute events audit_events_topic = "topic" # Kafka topic to be used for Payment Audit events -payout_analytics_topic = "topic" # Kafka topic to be used for Payout events +payout_analytics_topic = "topic" # Kafka topic to be used for Payouts and PayoutAttempt events # File storage configuration [file_storage] diff --git a/config/deployments/env_specific.toml b/config/deployments/env_specific.toml index e0caafa7d7..c725a8bd69 100644 --- a/config/deployments/env_specific.toml +++ b/config/deployments/env_specific.toml @@ -80,7 +80,7 @@ connector_logs_topic = "topic" # Kafka topic to be used for connector api outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webhook events dispute_analytics_topic = "topic" # Kafka topic to be used for Dispute events audit_events_topic = "topic" # Kafka topic to be used for Payment Audit events -payout_analytics_topic = "topic" # Kafka topic to be used for Payout events +payout_analytics_topic = "topic" # Kafka topic to be used for Payouts and PayoutAttempt events # File storage configuration [file_storage] diff --git a/crates/analytics/docs/clickhouse/scripts/payouts.sql b/crates/analytics/docs/clickhouse/scripts/payouts.sql index 0358c0e937..68d109f7b9 100644 --- a/crates/analytics/docs/clickhouse/scripts/payouts.sql +++ b/crates/analytics/docs/clickhouse/scripts/payouts.sql @@ -1,10 +1,12 @@ CREATE TABLE payout_queue ( `payout_id` String, + `payout_attempt_id` String, `merchant_id` String, `customer_id` String, `address_id` String, - `payout_type` LowCardinality(String), + `profile_id` String, `payout_method_id` Nullable(String), + `payout_type` LowCardinality(String), `amount` UInt64, `destination_currency` LowCardinality(String), `source_currency` LowCardinality(String), @@ -17,8 +19,15 @@ CREATE TABLE payout_queue ( `created_at` DateTime CODEC(T64, LZ4), `last_modified_at` DateTime CODEC(T64, LZ4), `attempt_count` UInt16, - `profile_id` String, `status` LowCardinality(String), + `connector` Nullable(String), + `connector_payout_id` String, + `is_eligible` Nullable(Bool), + `error_message` Nullable(String), + `error_code` Nullable(String), + `business_country` Nullable(LowCardinality(String)), + `business_label` Nullable(String), + `merchant_connector_id` Nullable(String), `sign_flag` Int8 ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092', kafka_topic_list = 'hyperswitch-payout-events', @@ -28,11 +37,13 @@ kafka_handle_error_mode = 'stream'; CREATE TABLE payout ( `payout_id` String, + `payout_attempt_id` String, `merchant_id` String, `customer_id` String, `address_id` String, + `profile_id` String, + `payout_method_id` String, `payout_type` LowCardinality(String), - `payout_method_id` Nullable(String), `amount` UInt64, `destination_currency` LowCardinality(String), `source_currency` LowCardinality(String), @@ -45,26 +56,36 @@ CREATE TABLE payout ( `created_at` DateTime DEFAULT now() CODEC(T64, LZ4), `last_modified_at` DateTime DEFAULT now() CODEC(T64, LZ4), `attempt_count` UInt16, - `profile_id` String, `status` LowCardinality(String), + `connector` Nullable(String), + `connector_payout_id` String, + `is_eligible` Nullable(Bool), + `error_message` Nullable(String), + `error_code` Nullable(String), + `business_country` Nullable(LowCardinality(String)), + `business_label` Nullable(String), + `merchant_connector_id` Nullable(String), `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4), `sign_flag` Int8, INDEX payoutTypeIndex payout_type TYPE bloom_filter GRANULARITY 1, INDEX destinationCurrencyIndex destination_currency TYPE bloom_filter GRANULARITY 1, INDEX sourceCurrencyIndex source_currency TYPE bloom_filter GRANULARITY 1, INDEX entityTypeIndex entity_type TYPE bloom_filter GRANULARITY 1, - INDEX statusIndex status TYPE bloom_filter GRANULARITY 1 + INDEX statusIndex status TYPE bloom_filter GRANULARITY 1, + INDEX businessCountryIndex business_country TYPE bloom_filter GRANULARITY 1 ) ENGINE = CollapsingMergeTree(sign_flag) PARTITION BY toStartOfDay(created_at) ORDER BY (created_at, merchant_id, payout_id) TTL created_at + toIntervalMonth(6); CREATE MATERIALIZED VIEW kafka_parse_payout TO payout ( `payout_id` String, + `payout_attempt_id` String, `merchant_id` String, `customer_id` String, `address_id` String, + `profile_id` String, + `payout_method_id` String, `payout_type` LowCardinality(String), - `payout_method_id` Nullable(String), `amount` UInt64, `destination_currency` LowCardinality(String), `source_currency` LowCardinality(String), @@ -77,18 +98,27 @@ CREATE MATERIALIZED VIEW kafka_parse_payout TO payout ( `created_at` DateTime64(3), `last_modified_at` DateTime64(3), `attempt_count` UInt16, - `profile_id` String, `status` LowCardinality(String), + `connector` Nullable(String), + `connector_payout_id` String, + `is_eligible` Nullable(Bool), + `error_message` Nullable(String), + `error_code` Nullable(String), + `business_country` Nullable(LowCardinality(String)), + `business_label` Nullable(String), + `merchant_connector_id` Nullable(String), `inserted_at` DateTime64(3), `sign_flag` Int8 ) AS SELECT payout_id, + payout_attempt_id, merchant_id, customer_id, address_id, - payout_type, + profile_id, payout_method_id, + payout_type, amount, destination_currency, source_currency, @@ -101,8 +131,15 @@ SELECT created_at, last_modified_at, attempt_count, - profile_id, status, + connector, + connector_payout_id, + is_eligible, + error_message, + error_code, + business_country, + business_label, + merchant_connector_id, now() as inserted_at, sign_flag FROM diff --git a/crates/data_models/src/payouts/payout_attempt.rs b/crates/data_models/src/payouts/payout_attempt.rs index 5c4c98970f..afe45b0a6a 100644 --- a/crates/data_models/src/payouts/payout_attempt.rs +++ b/crates/data_models/src/payouts/payout_attempt.rs @@ -11,7 +11,8 @@ use crate::errors; pub trait PayoutAttemptInterface { async fn insert_payout_attempt( &self, - _payout: PayoutAttemptNew, + _payout_attempt: PayoutAttemptNew, + _payouts: &Payouts, _storage_scheme: MerchantStorageScheme, ) -> error_stack::Result; @@ -19,6 +20,7 @@ pub trait PayoutAttemptInterface { &self, _this: &PayoutAttempt, _payout_attempt_update: PayoutAttemptUpdate, + _payouts: &Payouts, _storage_scheme: MerchantStorageScheme, ) -> error_stack::Result; diff --git a/crates/data_models/src/payouts/payouts.rs b/crates/data_models/src/payouts/payouts.rs index 4d70b6aaf5..28a8e4d427 100644 --- a/crates/data_models/src/payouts/payouts.rs +++ b/crates/data_models/src/payouts/payouts.rs @@ -4,8 +4,9 @@ use serde::{Deserialize, Serialize}; use storage_enums::MerchantStorageScheme; use time::PrimitiveDateTime; +use super::payout_attempt::PayoutAttempt; #[cfg(feature = "olap")] -use super::{payout_attempt::PayoutAttempt, PayoutFetchConstraints}; +use super::PayoutFetchConstraints; use crate::errors; #[async_trait::async_trait] @@ -27,6 +28,7 @@ pub trait PayoutsInterface { &self, _this: &Payouts, _payout: PayoutsUpdate, + _payout_attempt: &PayoutAttempt, _storage_scheme: MerchantStorageScheme, ) -> error_stack::Result; diff --git a/crates/router/src/core/payouts.rs b/crates/router/src/core/payouts.rs index 0f435a2e26..be4376d4f0 100644 --- a/crates/router/src/core/payouts.rs +++ b/crates/router/src/core/payouts.rs @@ -347,12 +347,17 @@ pub async fn payouts_update_core( entity_type: req.entity_type.unwrap_or(payouts.entity_type), metadata: req.metadata.clone().or(payouts.metadata.clone()), status: Some(status), - profile_id: Some(payout_attempt.profile_id), + profile_id: Some(payout_attempt.profile_id.clone()), }; let db = &*state.store; payout_data.payouts = db - .update_payout(&payouts, updated_payouts, merchant_account.storage_scheme) + .update_payout( + &payouts, + updated_payouts, + &payout_attempt, + merchant_account.storage_scheme, + ) .await .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Error updating payouts")?; @@ -385,6 +390,7 @@ pub async fn payouts_update_core( .update_payout_attempt( &payout_attempt, updated_payout_attempt, + &payout_data.payouts, merchant_account.storage_scheme, ) .await @@ -517,6 +523,7 @@ pub async fn payouts_cancel_core( .update_payout_attempt( &payout_attempt, updated_payout_attempt, + &payout_data.payouts, merchant_account.storage_scheme, ) .await @@ -527,6 +534,7 @@ pub async fn payouts_cancel_core( .update_payout( &payout_data.payouts, storage::PayoutsUpdate::StatusUpdate { status }, + &payout_data.payout_attempt, merchant_account.storage_scheme, ) .await @@ -867,6 +875,7 @@ pub async fn call_connector_payout( db.update_payout_attempt( &payout_data.payout_attempt, updated_payout_attempt, + payouts, merchant_account.storage_scheme, ) .await @@ -1128,6 +1137,7 @@ pub async fn check_payout_eligibility( .update_payout_attempt( payout_attempt, updated_payout_attempt, + &payout_data.payouts, merchant_account.storage_scheme, ) .await @@ -1137,6 +1147,7 @@ pub async fn check_payout_eligibility( .update_payout( &payout_data.payouts, storage::PayoutsUpdate::StatusUpdate { status }, + &payout_data.payout_attempt, merchant_account.storage_scheme, ) .await @@ -1163,6 +1174,7 @@ pub async fn check_payout_eligibility( .update_payout_attempt( &payout_data.payout_attempt, updated_payout_attempt, + &payout_data.payouts, merchant_account.storage_scheme, ) .await @@ -1172,6 +1184,7 @@ pub async fn check_payout_eligibility( .update_payout( &payout_data.payouts, storage::PayoutsUpdate::StatusUpdate { status }, + &payout_data.payout_attempt, merchant_account.storage_scheme, ) .await @@ -1246,6 +1259,7 @@ pub async fn create_payout( .update_payout_attempt( payout_attempt, updated_payout_attempt, + &payout_data.payouts, merchant_account.storage_scheme, ) .await @@ -1255,6 +1269,7 @@ pub async fn create_payout( .update_payout( &payout_data.payouts, storage::PayoutsUpdate::StatusUpdate { status }, + &payout_data.payout_attempt, merchant_account.storage_scheme, ) .await @@ -1281,6 +1296,7 @@ pub async fn create_payout( .update_payout_attempt( &payout_data.payout_attempt, updated_payout_attempt, + &payout_data.payouts, merchant_account.storage_scheme, ) .await @@ -1290,6 +1306,7 @@ pub async fn create_payout( .update_payout( &payout_data.payouts, storage::PayoutsUpdate::StatusUpdate { status }, + &payout_data.payout_attempt, merchant_account.storage_scheme, ) .await @@ -1357,6 +1374,7 @@ pub async fn cancel_payout( .update_payout_attempt( &payout_data.payout_attempt, updated_payout_attempt, + &payout_data.payouts, merchant_account.storage_scheme, ) .await @@ -1366,6 +1384,7 @@ pub async fn cancel_payout( .update_payout( &payout_data.payouts, storage::PayoutsUpdate::StatusUpdate { status }, + &payout_data.payout_attempt, merchant_account.storage_scheme, ) .await @@ -1385,6 +1404,7 @@ pub async fn cancel_payout( .update_payout_attempt( &payout_data.payout_attempt, updated_payout_attempt, + &payout_data.payouts, merchant_account.storage_scheme, ) .await @@ -1394,6 +1414,7 @@ pub async fn cancel_payout( .update_payout( &payout_data.payouts, storage::PayoutsUpdate::StatusUpdate { status }, + &payout_data.payout_attempt, merchant_account.storage_scheme, ) .await @@ -1479,6 +1500,7 @@ pub async fn fulfill_payout( .update_payout_attempt( payout_attempt, updated_payout_attempt, + &payout_data.payouts, merchant_account.storage_scheme, ) .await @@ -1488,6 +1510,7 @@ pub async fn fulfill_payout( .update_payout( &payout_data.payouts, storage::PayoutsUpdate::StatusUpdate { status }, + &payout_data.payout_attempt, merchant_account.storage_scheme, ) .await @@ -1514,6 +1537,7 @@ pub async fn fulfill_payout( .update_payout_attempt( &payout_data.payout_attempt, updated_payout_attempt, + &payout_data.payouts, merchant_account.storage_scheme, ) .await @@ -1523,6 +1547,7 @@ pub async fn fulfill_payout( .update_payout( &payout_data.payouts, storage::PayoutsUpdate::StatusUpdate { status }, + &payout_data.payout_attempt, merchant_account.storage_scheme, ) .await @@ -1730,7 +1755,11 @@ pub async fn payout_create_db_entries( ..Default::default() }; let payout_attempt = db - .insert_payout_attempt(payout_attempt_req, merchant_account.storage_scheme) + .insert_payout_attempt( + payout_attempt_req, + &payouts, + merchant_account.storage_scheme, + ) .await .to_duplicate_response(errors::ApiErrorResponse::DuplicatePayout { payout_id: payout_id.to_owned(), diff --git a/crates/router/src/core/payouts/helpers.rs b/crates/router/src/core/payouts/helpers.rs index 9b54bb03d1..7de5a95bd8 100644 --- a/crates/router/src/core/payouts/helpers.rs +++ b/crates/router/src/core/payouts/helpers.rs @@ -171,6 +171,7 @@ pub async fn make_payout_method_data<'a>( db.update_payout_attempt( &payout_data.payout_attempt, updated_payout_attempt, + &payout_data.payouts, storage_scheme, ) .await @@ -549,6 +550,7 @@ pub async fn save_payout_data_to_locker( db.update_payout( &payout_data.payouts, updated_payout, + payout_attempt, merchant_account.storage_scheme, ) .await diff --git a/crates/router/src/core/payouts/retry.rs b/crates/router/src/core/payouts/retry.rs index fbb7ca0ba7..7c0b826560 100644 --- a/crates/router/src/core/payouts/retry.rs +++ b/crates/router/src/core/payouts/retry.rs @@ -285,6 +285,7 @@ pub async fn modify_trackers( .update_payout( &payout_data.payouts, updated_payouts, + &payout_data.payout_attempt, merchant_account.storage_scheme, ) .await @@ -308,7 +309,11 @@ pub async fn modify_trackers( ..Default::default() }; payout_data.payout_attempt = db - .insert_payout_attempt(payout_attempt_req, merchant_account.storage_scheme) + .insert_payout_attempt( + payout_attempt_req, + &payouts, + merchant_account.storage_scheme, + ) .await .to_duplicate_response(errors::ApiErrorResponse::DuplicatePayout { payout_id }) .attach_printable("Error inserting payouts in db")?; diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index e24dde449d..f0effcbcac 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -32,6 +32,8 @@ use super::{ user::{sample_data::BatchSampleDataInterface, UserInterface}, user_role::UserRoleInterface, }; +#[cfg(feature = "payouts")] +use crate::services::kafka::payout::KafkaPayout; use crate::{ core::errors::{self, ProcessTrackerError}, db::{ @@ -1491,21 +1493,49 @@ impl PayoutAttemptInterface for KafkaStore { &self, this: &storage::PayoutAttempt, payout_attempt_update: storage::PayoutAttemptUpdate, + payouts: &storage::Payouts, storage_scheme: MerchantStorageScheme, ) -> CustomResult { - self.diesel_store - .update_payout_attempt(this, payout_attempt_update, storage_scheme) + let updated_payout_attempt = self + .diesel_store + .update_payout_attempt(this, payout_attempt_update, payouts, storage_scheme) + .await?; + if let Err(err) = self + .kafka_producer + .log_payout( + &KafkaPayout::from_storage(payouts, &updated_payout_attempt), + Some(KafkaPayout::from_storage(payouts, this)), + ) .await + { + logger::error!(message="Failed to update analytics entry for Payouts {payouts:?}\n{updated_payout_attempt:?}", error_message=?err); + }; + + Ok(updated_payout_attempt) } async fn insert_payout_attempt( &self, payout_attempt: storage::PayoutAttemptNew, + payouts: &storage::Payouts, storage_scheme: MerchantStorageScheme, ) -> CustomResult { - self.diesel_store - .insert_payout_attempt(payout_attempt, storage_scheme) + let payout_attempt_new = self + .diesel_store + .insert_payout_attempt(payout_attempt, payouts, storage_scheme) + .await?; + if let Err(err) = self + .kafka_producer + .log_payout( + &KafkaPayout::from_storage(payouts, &payout_attempt_new), + None, + ) .await + { + logger::error!(message="Failed to add analytics entry for Payouts {payouts:?}\n{payout_attempt_new:?}", error_message=?err); + }; + + Ok(payout_attempt_new) } async fn get_filters_for_payouts( @@ -1544,21 +1574,23 @@ impl PayoutsInterface for KafkaStore { &self, this: &storage::Payouts, payout_update: storage::PayoutsUpdate, + payout_attempt: &storage::PayoutAttempt, storage_scheme: MerchantStorageScheme, ) -> CustomResult { let payout = self .diesel_store - .update_payout(this, payout_update, storage_scheme) + .update_payout(this, payout_update, payout_attempt, storage_scheme) .await?; - - if let Err(er) = self + if let Err(err) = self .kafka_producer - .log_payout(&payout, Some(this.clone())) + .log_payout( + &KafkaPayout::from_storage(&payout, payout_attempt), + Some(KafkaPayout::from_storage(this, payout_attempt)), + ) .await { - logger::error!(message="Failed to add analytics entry for Payout {payout:?}", error_message=?er); + logger::error!(message="Failed to update analytics entry for Payouts {payout:?}\n{payout_attempt:?}", error_message=?err); }; - Ok(payout) } @@ -1567,16 +1599,9 @@ impl PayoutsInterface for KafkaStore { payout: storage::PayoutsNew, storage_scheme: MerchantStorageScheme, ) -> CustomResult { - let payout = self - .diesel_store + self.diesel_store .insert_payout(payout, storage_scheme) - .await?; - - if let Err(er) = self.kafka_producer.log_payout(&payout, None).await { - logger::error!(message="Failed to add analytics entry for Payout {payout:?}", error_message=?er); - }; - - Ok(payout) + .await } async fn find_optional_payout_by_merchant_id_payout_id( diff --git a/crates/router/src/events.rs b/crates/router/src/events.rs index 2a0944b2f2..d02b42d417 100644 --- a/crates/router/src/events.rs +++ b/crates/router/src/events.rs @@ -26,6 +26,7 @@ pub enum EventType { OutgoingWebhookLogs, Dispute, AuditEvent, + #[cfg(feature = "payouts")] Payout, } diff --git a/crates/router/src/services/kafka.rs b/crates/router/src/services/kafka.rs index e4708f8668..166df034cc 100644 --- a/crates/router/src/services/kafka.rs +++ b/crates/router/src/services/kafka.rs @@ -6,12 +6,12 @@ use rdkafka::{ config::FromClientConfig, producer::{BaseRecord, DefaultProducerContext, Producer, ThreadedProducer}, }; - +#[cfg(feature = "payouts")] +pub mod payout; use crate::events::EventType; mod dispute; mod payment_attempt; mod payment_intent; -mod payout; mod refund; use data_models::payments::{payment_attempt::PaymentAttempt, PaymentIntent}; use diesel_models::refund::Refund; @@ -25,8 +25,6 @@ use self::{ payment_intent::KafkaPaymentIntent, refund::KafkaRefund, }; use crate::types::storage::Dispute; -#[cfg(feature = "payouts")] -use crate::types::storage::Payouts; // Using message queue result here to avoid confusion with Kafka result provided by library pub type MQResult = CustomResult; @@ -97,6 +95,7 @@ pub struct KafkaSettings { outgoing_webhook_logs_topic: String, dispute_analytics_topic: String, audit_events_topic: String, + #[cfg(feature = "payouts")] payout_analytics_topic: String, } @@ -163,6 +162,7 @@ impl KafkaSettings { )) })?; + #[cfg(feature = "payouts")] common_utils::fp_utils::when(self.payout_analytics_topic.is_default_or_empty(), || { Err(ApplicationError::InvalidConfigurationValueError( "Kafka Payout Analytics topic must not be empty".into(), @@ -184,6 +184,7 @@ pub struct KafkaProducer { outgoing_webhook_logs_topic: String, dispute_analytics_topic: String, audit_events_topic: String, + #[cfg(feature = "payouts")] payout_analytics_topic: String, } @@ -224,6 +225,7 @@ impl KafkaProducer { outgoing_webhook_logs_topic: conf.outgoing_webhook_logs_topic.clone(), dispute_analytics_topic: conf.dispute_analytics_topic.clone(), audit_events_topic: conf.audit_events_topic.clone(), + #[cfg(feature = "payouts")] payout_analytics_topic: conf.payout_analytics_topic.clone(), }) } @@ -239,6 +241,7 @@ impl KafkaProducer { EventType::OutgoingWebhookLogs => &self.outgoing_webhook_logs_topic, EventType::Dispute => &self.dispute_analytics_topic, EventType::AuditEvent => &self.audit_events_topic, + #[cfg(feature = "payouts")] EventType::Payout => &self.payout_analytics_topic, }; self.producer @@ -357,27 +360,27 @@ impl KafkaProducer { } #[cfg(feature = "payouts")] - pub async fn log_payout(&self, payout: &Payouts, old_payout: Option) -> MQResult<()> { + pub async fn log_payout( + &self, + payout: &KafkaPayout<'_>, + old_payout: Option>, + ) -> MQResult<()> { if let Some(negative_event) = old_payout { - self.log_event(&KafkaEvent::old(&KafkaPayout::from_storage( - &negative_event, - ))) - .attach_printable_lazy(|| { - format!("Failed to add negative payout event {negative_event:?}") - })?; + self.log_event(&KafkaEvent::old(&negative_event)) + .attach_printable_lazy(|| { + format!("Failed to add negative payout event {negative_event:?}") + })?; }; - self.log_event(&KafkaEvent::new(&KafkaPayout::from_storage(payout))) + self.log_event(&KafkaEvent::new(payout)) .attach_printable_lazy(|| format!("Failed to add positive payout event {payout:?}")) } #[cfg(feature = "payouts")] - pub async fn log_payout_delete(&self, delete_old_payout: &Payouts) -> MQResult<()> { - self.log_event(&KafkaEvent::old(&KafkaPayout::from_storage( - delete_old_payout, - ))) - .attach_printable_lazy(|| { - format!("Failed to add negative payout event {delete_old_payout:?}") - }) + pub async fn log_payout_delete(&self, delete_old_payout: &KafkaPayout<'_>) -> MQResult<()> { + self.log_event(&KafkaEvent::old(delete_old_payout)) + .attach_printable_lazy(|| { + format!("Failed to add negative payout event {delete_old_payout:?}") + }) } pub fn get_topic(&self, event: EventType) -> &str { @@ -390,6 +393,7 @@ impl KafkaProducer { EventType::OutgoingWebhookLogs => &self.outgoing_webhook_logs_topic, EventType::Dispute => &self.dispute_analytics_topic, EventType::AuditEvent => &self.audit_events_topic, + #[cfg(feature = "payouts")] EventType::Payout => &self.payout_analytics_topic, } } diff --git a/crates/router/src/services/kafka/payout.rs b/crates/router/src/services/kafka/payout.rs index 7e181e3d1f..e2bd8ef83f 100644 --- a/crates/router/src/services/kafka/payout.rs +++ b/crates/router/src/services/kafka/payout.rs @@ -1,67 +1,83 @@ use common_utils::pii; +use data_models::payouts::{payout_attempt::PayoutAttempt, payouts::Payouts}; use diesel_models::enums as storage_enums; use time::OffsetDateTime; -#[cfg(feature = "payouts")] -use crate::types::storage::Payouts; - #[derive(serde::Serialize, Debug)] pub struct KafkaPayout<'a> { pub payout_id: &'a String, + pub payout_attempt_id: &'a String, pub merchant_id: &'a String, pub customer_id: &'a String, pub address_id: &'a String, - pub payout_type: &'a storage_enums::PayoutType, + pub profile_id: &'a String, pub payout_method_id: Option<&'a String>, + pub payout_type: storage_enums::PayoutType, pub amount: i64, - pub destination_currency: &'a storage_enums::Currency, - pub source_currency: &'a storage_enums::Currency, + pub destination_currency: storage_enums::Currency, + pub source_currency: storage_enums::Currency, pub description: Option<&'a String>, pub recurring: bool, pub auto_fulfill: bool, pub return_url: Option<&'a String>, - pub entity_type: &'a storage_enums::PayoutEntityType, - pub metadata: Option<&'a pii::SecretSerdeValue>, - #[serde(default, with = "time::serde::timestamp")] + pub entity_type: storage_enums::PayoutEntityType, + pub metadata: Option, + #[serde(with = "time::serde::timestamp")] pub created_at: OffsetDateTime, - #[serde(default, with = "time::serde::timestamp")] + #[serde(with = "time::serde::timestamp")] pub last_modified_at: OffsetDateTime, pub attempt_count: i16, - pub profile_id: &'a String, - pub status: &'a storage_enums::PayoutStatus, + pub status: storage_enums::PayoutStatus, + + pub connector: Option<&'a String>, + pub connector_payout_id: &'a String, + pub is_eligible: Option, + pub error_message: Option<&'a String>, + pub error_code: Option<&'a String>, + pub business_country: Option, + pub business_label: Option<&'a String>, + pub merchant_connector_id: Option<&'a String>, } -#[cfg(feature = "payouts")] impl<'a> KafkaPayout<'a> { - pub fn from_storage(payout: &'a Payouts) -> Self { + pub fn from_storage(payouts: &'a Payouts, payout_attempt: &'a PayoutAttempt) -> Self { Self { - payout_id: &payout.payout_id, - merchant_id: &payout.merchant_id, - customer_id: &payout.customer_id, - address_id: &payout.address_id, - payout_type: &payout.payout_type, - payout_method_id: payout.payout_method_id.as_ref(), - amount: payout.amount, - destination_currency: &payout.destination_currency, - source_currency: &payout.source_currency, - description: payout.description.as_ref(), - recurring: payout.recurring, - auto_fulfill: payout.auto_fulfill, - return_url: payout.return_url.as_ref(), - entity_type: &payout.entity_type, - metadata: payout.metadata.as_ref(), - created_at: payout.created_at.assume_utc(), - last_modified_at: payout.last_modified_at.assume_utc(), - attempt_count: payout.attempt_count, - profile_id: &payout.profile_id, - status: &payout.status, + payout_id: &payouts.payout_id, + payout_attempt_id: &payout_attempt.payout_attempt_id, + merchant_id: &payouts.merchant_id, + customer_id: &payouts.customer_id, + address_id: &payouts.address_id, + profile_id: &payouts.profile_id, + payout_method_id: payouts.payout_method_id.as_ref(), + payout_type: payouts.payout_type, + amount: payouts.amount, + destination_currency: payouts.destination_currency, + source_currency: payouts.source_currency, + description: payouts.description.as_ref(), + recurring: payouts.recurring, + auto_fulfill: payouts.auto_fulfill, + return_url: payouts.return_url.as_ref(), + entity_type: payouts.entity_type, + metadata: payouts.metadata.clone(), + created_at: payouts.created_at.assume_utc(), + last_modified_at: payouts.last_modified_at.assume_utc(), + attempt_count: payouts.attempt_count, + status: payouts.status, + connector: payout_attempt.connector.as_ref(), + connector_payout_id: &payout_attempt.connector_payout_id, + is_eligible: payout_attempt.is_eligible, + error_message: payout_attempt.error_message.as_ref(), + error_code: payout_attempt.error_code.as_ref(), + business_country: payout_attempt.business_country, + business_label: payout_attempt.business_label.as_ref(), + merchant_connector_id: payout_attempt.merchant_connector_id.as_ref(), } } } impl<'a> super::KafkaMessage for KafkaPayout<'a> { fn key(&self) -> String { - format!("{}_{}", self.merchant_id, self.payout_id) + format!("{}_{}", self.merchant_id, self.payout_attempt_id) } fn creation_timestamp(&self) -> Option { diff --git a/crates/storage_impl/src/mock_db/payout_attempt.rs b/crates/storage_impl/src/mock_db/payout_attempt.rs index 1590dea6e7..6b792e033d 100644 --- a/crates/storage_impl/src/mock_db/payout_attempt.rs +++ b/crates/storage_impl/src/mock_db/payout_attempt.rs @@ -1,8 +1,11 @@ use common_utils::errors::CustomResult; use data_models::{ errors::StorageError, - payouts::payout_attempt::{ - PayoutAttempt, PayoutAttemptInterface, PayoutAttemptNew, PayoutAttemptUpdate, + payouts::{ + payout_attempt::{ + PayoutAttempt, PayoutAttemptInterface, PayoutAttemptNew, PayoutAttemptUpdate, + }, + payouts::Payouts, }, }; use diesel_models::enums as storage_enums; @@ -15,6 +18,7 @@ impl PayoutAttemptInterface for MockDb { &self, _this: &PayoutAttempt, _payout_attempt_update: PayoutAttemptUpdate, + _payouts: &Payouts, _storage_scheme: storage_enums::MerchantStorageScheme, ) -> CustomResult { // TODO: Implement function for `MockDb` @@ -23,7 +27,8 @@ impl PayoutAttemptInterface for MockDb { async fn insert_payout_attempt( &self, - _payout: PayoutAttemptNew, + _payout_attempt: PayoutAttemptNew, + _payouts: &Payouts, _storage_scheme: storage_enums::MerchantStorageScheme, ) -> CustomResult { // TODO: Implement function for `MockDb` @@ -42,7 +47,7 @@ impl PayoutAttemptInterface for MockDb { async fn get_filters_for_payouts( &self, - _payouts: &[data_models::payouts::payouts::Payouts], + _payouts: &[Payouts], _merchant_id: &str, _storage_scheme: storage_enums::MerchantStorageScheme, ) -> CustomResult { diff --git a/crates/storage_impl/src/mock_db/payouts.rs b/crates/storage_impl/src/mock_db/payouts.rs index 323f3e11ac..1aee200eed 100644 --- a/crates/storage_impl/src/mock_db/payouts.rs +++ b/crates/storage_impl/src/mock_db/payouts.rs @@ -1,9 +1,10 @@ use common_utils::errors::CustomResult; -#[cfg(all(feature = "olap", feature = "payouts"))] -use data_models::payouts::payout_attempt::PayoutAttempt; use data_models::{ errors::StorageError, - payouts::payouts::{Payouts, PayoutsInterface, PayoutsNew, PayoutsUpdate}, + payouts::{ + payout_attempt::PayoutAttempt, + payouts::{Payouts, PayoutsInterface, PayoutsNew, PayoutsUpdate}, + }, }; use diesel_models::enums as storage_enums; @@ -25,6 +26,7 @@ impl PayoutsInterface for MockDb { &self, _this: &Payouts, _payout_update: PayoutsUpdate, + _payout_attempt: &PayoutAttempt, _storage_scheme: storage_enums::MerchantStorageScheme, ) -> CustomResult { // TODO: Implement function for `MockDb` diff --git a/crates/storage_impl/src/payouts/payout_attempt.rs b/crates/storage_impl/src/payouts/payout_attempt.rs index 1f6c02c1fb..991a11d86f 100644 --- a/crates/storage_impl/src/payouts/payout_attempt.rs +++ b/crates/storage_impl/src/payouts/payout_attempt.rs @@ -40,12 +40,13 @@ impl PayoutAttemptInterface for KVRouterStore { async fn insert_payout_attempt( &self, new_payout_attempt: PayoutAttemptNew, + payouts: &Payouts, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store - .insert_payout_attempt(new_payout_attempt, storage_scheme) + .insert_payout_attempt(new_payout_attempt, payouts, storage_scheme) .await } MerchantStorageScheme::RedisKv => { @@ -129,12 +130,13 @@ impl PayoutAttemptInterface for KVRouterStore { &self, this: &PayoutAttempt, payout_update: PayoutAttemptUpdate, + payouts: &Payouts, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store - .update_payout_attempt(this, payout_update, storage_scheme) + .update_payout_attempt(this, payout_update, payouts, storage_scheme) .await } MerchantStorageScheme::RedisKv => { @@ -254,6 +256,7 @@ impl PayoutAttemptInterface for crate::RouterStore { async fn insert_payout_attempt( &self, new: PayoutAttemptNew, + _payouts: &Payouts, _storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { let conn = pg_connection_write(self).await?; @@ -272,6 +275,7 @@ impl PayoutAttemptInterface for crate::RouterStore { &self, this: &PayoutAttempt, payout: PayoutAttemptUpdate, + _payouts: &Payouts, _storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { let conn = pg_connection_write(self).await?; diff --git a/crates/storage_impl/src/payouts/payouts.rs b/crates/storage_impl/src/payouts/payouts.rs index fd4dbc8e3a..437874729b 100644 --- a/crates/storage_impl/src/payouts/payouts.rs +++ b/crates/storage_impl/src/payouts/payouts.rs @@ -2,10 +2,13 @@ use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; use common_utils::ext_traits::Encode; #[cfg(feature = "olap")] -use data_models::payouts::{payout_attempt::PayoutAttempt, PayoutFetchConstraints}; +use data_models::payouts::PayoutFetchConstraints; use data_models::{ errors::StorageError, - payouts::payouts::{Payouts, PayoutsInterface, PayoutsNew, PayoutsUpdate}, + payouts::{ + payout_attempt::PayoutAttempt, + payouts::{Payouts, PayoutsInterface, PayoutsNew, PayoutsUpdate}, + }, }; #[cfg(feature = "olap")] use diesel::{associations::HasTable, ExpressionMethods, JoinOnDsl, QueryDsl}; @@ -115,12 +118,13 @@ impl PayoutsInterface for KVRouterStore { &self, this: &Payouts, payout_update: PayoutsUpdate, + payout_attempt: &PayoutAttempt, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store - .update_payout(this, payout_update, storage_scheme) + .update_payout(this, payout_update, payout_attempt, storage_scheme) .await } MerchantStorageScheme::RedisKv => { @@ -316,6 +320,7 @@ impl PayoutsInterface for crate::RouterStore { &self, this: &Payouts, payout: PayoutsUpdate, + _payout_attempt: &PayoutAttempt, _storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { let conn = pg_connection_write(self).await?;