mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-29 09:07:09 +08:00
feat(payouts): add kafka events (#4264)
Co-authored-by: Ivor Dsouza <ivor.dsouza@juspay.in> Co-authored-by: ivor-juspay <138492857+ivor-juspay@users.noreply.github.com> Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
@ -569,7 +569,7 @@ connector_logs_topic = "topic" # Kafka topic to be used for connector api
|
||||
outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webhook events
|
||||
dispute_analytics_topic = "topic" # Kafka topic to be used for Dispute events
|
||||
audit_events_topic = "topic" # Kafka topic to be used for Payment Audit events
|
||||
payout_analytics_topic = "topic" # Kafka topic to be used for Payout events
|
||||
payout_analytics_topic = "topic" # Kafka topic to be used for Payouts and PayoutAttempt events
|
||||
|
||||
# File storage configuration
|
||||
[file_storage]
|
||||
|
||||
@ -80,7 +80,7 @@ connector_logs_topic = "topic" # Kafka topic to be used for connector api
|
||||
outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webhook events
|
||||
dispute_analytics_topic = "topic" # Kafka topic to be used for Dispute events
|
||||
audit_events_topic = "topic" # Kafka topic to be used for Payment Audit events
|
||||
payout_analytics_topic = "topic" # Kafka topic to be used for Payout events
|
||||
payout_analytics_topic = "topic" # Kafka topic to be used for Payouts and PayoutAttempt events
|
||||
|
||||
# File storage configuration
|
||||
[file_storage]
|
||||
|
||||
@ -1,10 +1,12 @@
|
||||
CREATE TABLE payout_queue (
|
||||
`payout_id` String,
|
||||
`payout_attempt_id` String,
|
||||
`merchant_id` String,
|
||||
`customer_id` String,
|
||||
`address_id` String,
|
||||
`payout_type` LowCardinality(String),
|
||||
`profile_id` String,
|
||||
`payout_method_id` Nullable(String),
|
||||
`payout_type` LowCardinality(String),
|
||||
`amount` UInt64,
|
||||
`destination_currency` LowCardinality(String),
|
||||
`source_currency` LowCardinality(String),
|
||||
@ -17,8 +19,15 @@ CREATE TABLE payout_queue (
|
||||
`created_at` DateTime CODEC(T64, LZ4),
|
||||
`last_modified_at` DateTime CODEC(T64, LZ4),
|
||||
`attempt_count` UInt16,
|
||||
`profile_id` String,
|
||||
`status` LowCardinality(String),
|
||||
`connector` Nullable(String),
|
||||
`connector_payout_id` String,
|
||||
`is_eligible` Nullable(Bool),
|
||||
`error_message` Nullable(String),
|
||||
`error_code` Nullable(String),
|
||||
`business_country` Nullable(LowCardinality(String)),
|
||||
`business_label` Nullable(String),
|
||||
`merchant_connector_id` Nullable(String),
|
||||
`sign_flag` Int8
|
||||
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
|
||||
kafka_topic_list = 'hyperswitch-payout-events',
|
||||
@ -28,11 +37,13 @@ kafka_handle_error_mode = 'stream';
|
||||
|
||||
CREATE TABLE payout (
|
||||
`payout_id` String,
|
||||
`payout_attempt_id` String,
|
||||
`merchant_id` String,
|
||||
`customer_id` String,
|
||||
`address_id` String,
|
||||
`profile_id` String,
|
||||
`payout_method_id` String,
|
||||
`payout_type` LowCardinality(String),
|
||||
`payout_method_id` Nullable(String),
|
||||
`amount` UInt64,
|
||||
`destination_currency` LowCardinality(String),
|
||||
`source_currency` LowCardinality(String),
|
||||
@ -45,26 +56,36 @@ CREATE TABLE payout (
|
||||
`created_at` DateTime DEFAULT now() CODEC(T64, LZ4),
|
||||
`last_modified_at` DateTime DEFAULT now() CODEC(T64, LZ4),
|
||||
`attempt_count` UInt16,
|
||||
`profile_id` String,
|
||||
`status` LowCardinality(String),
|
||||
`connector` Nullable(String),
|
||||
`connector_payout_id` String,
|
||||
`is_eligible` Nullable(Bool),
|
||||
`error_message` Nullable(String),
|
||||
`error_code` Nullable(String),
|
||||
`business_country` Nullable(LowCardinality(String)),
|
||||
`business_label` Nullable(String),
|
||||
`merchant_connector_id` Nullable(String),
|
||||
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
|
||||
`sign_flag` Int8,
|
||||
INDEX payoutTypeIndex payout_type TYPE bloom_filter GRANULARITY 1,
|
||||
INDEX destinationCurrencyIndex destination_currency TYPE bloom_filter GRANULARITY 1,
|
||||
INDEX sourceCurrencyIndex source_currency TYPE bloom_filter GRANULARITY 1,
|
||||
INDEX entityTypeIndex entity_type TYPE bloom_filter GRANULARITY 1,
|
||||
INDEX statusIndex status TYPE bloom_filter GRANULARITY 1
|
||||
INDEX statusIndex status TYPE bloom_filter GRANULARITY 1,
|
||||
INDEX businessCountryIndex business_country TYPE bloom_filter GRANULARITY 1
|
||||
) ENGINE = CollapsingMergeTree(sign_flag) PARTITION BY toStartOfDay(created_at)
|
||||
ORDER BY
|
||||
(created_at, merchant_id, payout_id) TTL created_at + toIntervalMonth(6);
|
||||
|
||||
CREATE MATERIALIZED VIEW kafka_parse_payout TO payout (
|
||||
`payout_id` String,
|
||||
`payout_attempt_id` String,
|
||||
`merchant_id` String,
|
||||
`customer_id` String,
|
||||
`address_id` String,
|
||||
`profile_id` String,
|
||||
`payout_method_id` String,
|
||||
`payout_type` LowCardinality(String),
|
||||
`payout_method_id` Nullable(String),
|
||||
`amount` UInt64,
|
||||
`destination_currency` LowCardinality(String),
|
||||
`source_currency` LowCardinality(String),
|
||||
@ -77,18 +98,27 @@ CREATE MATERIALIZED VIEW kafka_parse_payout TO payout (
|
||||
`created_at` DateTime64(3),
|
||||
`last_modified_at` DateTime64(3),
|
||||
`attempt_count` UInt16,
|
||||
`profile_id` String,
|
||||
`status` LowCardinality(String),
|
||||
`connector` Nullable(String),
|
||||
`connector_payout_id` String,
|
||||
`is_eligible` Nullable(Bool),
|
||||
`error_message` Nullable(String),
|
||||
`error_code` Nullable(String),
|
||||
`business_country` Nullable(LowCardinality(String)),
|
||||
`business_label` Nullable(String),
|
||||
`merchant_connector_id` Nullable(String),
|
||||
`inserted_at` DateTime64(3),
|
||||
`sign_flag` Int8
|
||||
) AS
|
||||
SELECT
|
||||
payout_id,
|
||||
payout_attempt_id,
|
||||
merchant_id,
|
||||
customer_id,
|
||||
address_id,
|
||||
payout_type,
|
||||
profile_id,
|
||||
payout_method_id,
|
||||
payout_type,
|
||||
amount,
|
||||
destination_currency,
|
||||
source_currency,
|
||||
@ -101,8 +131,15 @@ SELECT
|
||||
created_at,
|
||||
last_modified_at,
|
||||
attempt_count,
|
||||
profile_id,
|
||||
status,
|
||||
connector,
|
||||
connector_payout_id,
|
||||
is_eligible,
|
||||
error_message,
|
||||
error_code,
|
||||
business_country,
|
||||
business_label,
|
||||
merchant_connector_id,
|
||||
now() as inserted_at,
|
||||
sign_flag
|
||||
FROM
|
||||
|
||||
@ -11,7 +11,8 @@ use crate::errors;
|
||||
pub trait PayoutAttemptInterface {
|
||||
async fn insert_payout_attempt(
|
||||
&self,
|
||||
_payout: PayoutAttemptNew,
|
||||
_payout_attempt: PayoutAttemptNew,
|
||||
_payouts: &Payouts,
|
||||
_storage_scheme: MerchantStorageScheme,
|
||||
) -> error_stack::Result<PayoutAttempt, errors::StorageError>;
|
||||
|
||||
@ -19,6 +20,7 @@ pub trait PayoutAttemptInterface {
|
||||
&self,
|
||||
_this: &PayoutAttempt,
|
||||
_payout_attempt_update: PayoutAttemptUpdate,
|
||||
_payouts: &Payouts,
|
||||
_storage_scheme: MerchantStorageScheme,
|
||||
) -> error_stack::Result<PayoutAttempt, errors::StorageError>;
|
||||
|
||||
|
||||
@ -4,8 +4,9 @@ use serde::{Deserialize, Serialize};
|
||||
use storage_enums::MerchantStorageScheme;
|
||||
use time::PrimitiveDateTime;
|
||||
|
||||
use super::payout_attempt::PayoutAttempt;
|
||||
#[cfg(feature = "olap")]
|
||||
use super::{payout_attempt::PayoutAttempt, PayoutFetchConstraints};
|
||||
use super::PayoutFetchConstraints;
|
||||
use crate::errors;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@ -27,6 +28,7 @@ pub trait PayoutsInterface {
|
||||
&self,
|
||||
_this: &Payouts,
|
||||
_payout: PayoutsUpdate,
|
||||
_payout_attempt: &PayoutAttempt,
|
||||
_storage_scheme: MerchantStorageScheme,
|
||||
) -> error_stack::Result<Payouts, errors::StorageError>;
|
||||
|
||||
|
||||
@ -347,12 +347,17 @@ pub async fn payouts_update_core(
|
||||
entity_type: req.entity_type.unwrap_or(payouts.entity_type),
|
||||
metadata: req.metadata.clone().or(payouts.metadata.clone()),
|
||||
status: Some(status),
|
||||
profile_id: Some(payout_attempt.profile_id),
|
||||
profile_id: Some(payout_attempt.profile_id.clone()),
|
||||
};
|
||||
|
||||
let db = &*state.store;
|
||||
payout_data.payouts = db
|
||||
.update_payout(&payouts, updated_payouts, merchant_account.storage_scheme)
|
||||
.update_payout(
|
||||
&payouts,
|
||||
updated_payouts,
|
||||
&payout_attempt,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable("Error updating payouts")?;
|
||||
@ -385,6 +390,7 @@ pub async fn payouts_update_core(
|
||||
.update_payout_attempt(
|
||||
&payout_attempt,
|
||||
updated_payout_attempt,
|
||||
&payout_data.payouts,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -517,6 +523,7 @@ pub async fn payouts_cancel_core(
|
||||
.update_payout_attempt(
|
||||
&payout_attempt,
|
||||
updated_payout_attempt,
|
||||
&payout_data.payouts,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -527,6 +534,7 @@ pub async fn payouts_cancel_core(
|
||||
.update_payout(
|
||||
&payout_data.payouts,
|
||||
storage::PayoutsUpdate::StatusUpdate { status },
|
||||
&payout_data.payout_attempt,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -867,6 +875,7 @@ pub async fn call_connector_payout(
|
||||
db.update_payout_attempt(
|
||||
&payout_data.payout_attempt,
|
||||
updated_payout_attempt,
|
||||
payouts,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -1128,6 +1137,7 @@ pub async fn check_payout_eligibility(
|
||||
.update_payout_attempt(
|
||||
payout_attempt,
|
||||
updated_payout_attempt,
|
||||
&payout_data.payouts,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -1137,6 +1147,7 @@ pub async fn check_payout_eligibility(
|
||||
.update_payout(
|
||||
&payout_data.payouts,
|
||||
storage::PayoutsUpdate::StatusUpdate { status },
|
||||
&payout_data.payout_attempt,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -1163,6 +1174,7 @@ pub async fn check_payout_eligibility(
|
||||
.update_payout_attempt(
|
||||
&payout_data.payout_attempt,
|
||||
updated_payout_attempt,
|
||||
&payout_data.payouts,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -1172,6 +1184,7 @@ pub async fn check_payout_eligibility(
|
||||
.update_payout(
|
||||
&payout_data.payouts,
|
||||
storage::PayoutsUpdate::StatusUpdate { status },
|
||||
&payout_data.payout_attempt,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -1246,6 +1259,7 @@ pub async fn create_payout(
|
||||
.update_payout_attempt(
|
||||
payout_attempt,
|
||||
updated_payout_attempt,
|
||||
&payout_data.payouts,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -1255,6 +1269,7 @@ pub async fn create_payout(
|
||||
.update_payout(
|
||||
&payout_data.payouts,
|
||||
storage::PayoutsUpdate::StatusUpdate { status },
|
||||
&payout_data.payout_attempt,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -1281,6 +1296,7 @@ pub async fn create_payout(
|
||||
.update_payout_attempt(
|
||||
&payout_data.payout_attempt,
|
||||
updated_payout_attempt,
|
||||
&payout_data.payouts,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -1290,6 +1306,7 @@ pub async fn create_payout(
|
||||
.update_payout(
|
||||
&payout_data.payouts,
|
||||
storage::PayoutsUpdate::StatusUpdate { status },
|
||||
&payout_data.payout_attempt,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -1357,6 +1374,7 @@ pub async fn cancel_payout(
|
||||
.update_payout_attempt(
|
||||
&payout_data.payout_attempt,
|
||||
updated_payout_attempt,
|
||||
&payout_data.payouts,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -1366,6 +1384,7 @@ pub async fn cancel_payout(
|
||||
.update_payout(
|
||||
&payout_data.payouts,
|
||||
storage::PayoutsUpdate::StatusUpdate { status },
|
||||
&payout_data.payout_attempt,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -1385,6 +1404,7 @@ pub async fn cancel_payout(
|
||||
.update_payout_attempt(
|
||||
&payout_data.payout_attempt,
|
||||
updated_payout_attempt,
|
||||
&payout_data.payouts,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -1394,6 +1414,7 @@ pub async fn cancel_payout(
|
||||
.update_payout(
|
||||
&payout_data.payouts,
|
||||
storage::PayoutsUpdate::StatusUpdate { status },
|
||||
&payout_data.payout_attempt,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -1479,6 +1500,7 @@ pub async fn fulfill_payout(
|
||||
.update_payout_attempt(
|
||||
payout_attempt,
|
||||
updated_payout_attempt,
|
||||
&payout_data.payouts,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -1488,6 +1510,7 @@ pub async fn fulfill_payout(
|
||||
.update_payout(
|
||||
&payout_data.payouts,
|
||||
storage::PayoutsUpdate::StatusUpdate { status },
|
||||
&payout_data.payout_attempt,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -1514,6 +1537,7 @@ pub async fn fulfill_payout(
|
||||
.update_payout_attempt(
|
||||
&payout_data.payout_attempt,
|
||||
updated_payout_attempt,
|
||||
&payout_data.payouts,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -1523,6 +1547,7 @@ pub async fn fulfill_payout(
|
||||
.update_payout(
|
||||
&payout_data.payouts,
|
||||
storage::PayoutsUpdate::StatusUpdate { status },
|
||||
&payout_data.payout_attempt,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -1730,7 +1755,11 @@ pub async fn payout_create_db_entries(
|
||||
..Default::default()
|
||||
};
|
||||
let payout_attempt = db
|
||||
.insert_payout_attempt(payout_attempt_req, merchant_account.storage_scheme)
|
||||
.insert_payout_attempt(
|
||||
payout_attempt_req,
|
||||
&payouts,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
.to_duplicate_response(errors::ApiErrorResponse::DuplicatePayout {
|
||||
payout_id: payout_id.to_owned(),
|
||||
|
||||
@ -171,6 +171,7 @@ pub async fn make_payout_method_data<'a>(
|
||||
db.update_payout_attempt(
|
||||
&payout_data.payout_attempt,
|
||||
updated_payout_attempt,
|
||||
&payout_data.payouts,
|
||||
storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -549,6 +550,7 @@ pub async fn save_payout_data_to_locker(
|
||||
db.update_payout(
|
||||
&payout_data.payouts,
|
||||
updated_payout,
|
||||
payout_attempt,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
|
||||
@ -285,6 +285,7 @@ pub async fn modify_trackers(
|
||||
.update_payout(
|
||||
&payout_data.payouts,
|
||||
updated_payouts,
|
||||
&payout_data.payout_attempt,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
@ -308,7 +309,11 @@ pub async fn modify_trackers(
|
||||
..Default::default()
|
||||
};
|
||||
payout_data.payout_attempt = db
|
||||
.insert_payout_attempt(payout_attempt_req, merchant_account.storage_scheme)
|
||||
.insert_payout_attempt(
|
||||
payout_attempt_req,
|
||||
&payouts,
|
||||
merchant_account.storage_scheme,
|
||||
)
|
||||
.await
|
||||
.to_duplicate_response(errors::ApiErrorResponse::DuplicatePayout { payout_id })
|
||||
.attach_printable("Error inserting payouts in db")?;
|
||||
|
||||
@ -32,6 +32,8 @@ use super::{
|
||||
user::{sample_data::BatchSampleDataInterface, UserInterface},
|
||||
user_role::UserRoleInterface,
|
||||
};
|
||||
#[cfg(feature = "payouts")]
|
||||
use crate::services::kafka::payout::KafkaPayout;
|
||||
use crate::{
|
||||
core::errors::{self, ProcessTrackerError},
|
||||
db::{
|
||||
@ -1491,21 +1493,49 @@ impl PayoutAttemptInterface for KafkaStore {
|
||||
&self,
|
||||
this: &storage::PayoutAttempt,
|
||||
payout_attempt_update: storage::PayoutAttemptUpdate,
|
||||
payouts: &storage::Payouts,
|
||||
storage_scheme: MerchantStorageScheme,
|
||||
) -> CustomResult<storage::PayoutAttempt, errors::DataStorageError> {
|
||||
self.diesel_store
|
||||
.update_payout_attempt(this, payout_attempt_update, storage_scheme)
|
||||
let updated_payout_attempt = self
|
||||
.diesel_store
|
||||
.update_payout_attempt(this, payout_attempt_update, payouts, storage_scheme)
|
||||
.await?;
|
||||
if let Err(err) = self
|
||||
.kafka_producer
|
||||
.log_payout(
|
||||
&KafkaPayout::from_storage(payouts, &updated_payout_attempt),
|
||||
Some(KafkaPayout::from_storage(payouts, this)),
|
||||
)
|
||||
.await
|
||||
{
|
||||
logger::error!(message="Failed to update analytics entry for Payouts {payouts:?}\n{updated_payout_attempt:?}", error_message=?err);
|
||||
};
|
||||
|
||||
Ok(updated_payout_attempt)
|
||||
}
|
||||
|
||||
async fn insert_payout_attempt(
|
||||
&self,
|
||||
payout_attempt: storage::PayoutAttemptNew,
|
||||
payouts: &storage::Payouts,
|
||||
storage_scheme: MerchantStorageScheme,
|
||||
) -> CustomResult<storage::PayoutAttempt, errors::DataStorageError> {
|
||||
self.diesel_store
|
||||
.insert_payout_attempt(payout_attempt, storage_scheme)
|
||||
let payout_attempt_new = self
|
||||
.diesel_store
|
||||
.insert_payout_attempt(payout_attempt, payouts, storage_scheme)
|
||||
.await?;
|
||||
if let Err(err) = self
|
||||
.kafka_producer
|
||||
.log_payout(
|
||||
&KafkaPayout::from_storage(payouts, &payout_attempt_new),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
{
|
||||
logger::error!(message="Failed to add analytics entry for Payouts {payouts:?}\n{payout_attempt_new:?}", error_message=?err);
|
||||
};
|
||||
|
||||
Ok(payout_attempt_new)
|
||||
}
|
||||
|
||||
async fn get_filters_for_payouts(
|
||||
@ -1544,21 +1574,23 @@ impl PayoutsInterface for KafkaStore {
|
||||
&self,
|
||||
this: &storage::Payouts,
|
||||
payout_update: storage::PayoutsUpdate,
|
||||
payout_attempt: &storage::PayoutAttempt,
|
||||
storage_scheme: MerchantStorageScheme,
|
||||
) -> CustomResult<storage::Payouts, errors::DataStorageError> {
|
||||
let payout = self
|
||||
.diesel_store
|
||||
.update_payout(this, payout_update, storage_scheme)
|
||||
.update_payout(this, payout_update, payout_attempt, storage_scheme)
|
||||
.await?;
|
||||
|
||||
if let Err(er) = self
|
||||
if let Err(err) = self
|
||||
.kafka_producer
|
||||
.log_payout(&payout, Some(this.clone()))
|
||||
.log_payout(
|
||||
&KafkaPayout::from_storage(&payout, payout_attempt),
|
||||
Some(KafkaPayout::from_storage(this, payout_attempt)),
|
||||
)
|
||||
.await
|
||||
{
|
||||
logger::error!(message="Failed to add analytics entry for Payout {payout:?}", error_message=?er);
|
||||
logger::error!(message="Failed to update analytics entry for Payouts {payout:?}\n{payout_attempt:?}", error_message=?err);
|
||||
};
|
||||
|
||||
Ok(payout)
|
||||
}
|
||||
|
||||
@ -1567,16 +1599,9 @@ impl PayoutsInterface for KafkaStore {
|
||||
payout: storage::PayoutsNew,
|
||||
storage_scheme: MerchantStorageScheme,
|
||||
) -> CustomResult<storage::Payouts, errors::DataStorageError> {
|
||||
let payout = self
|
||||
.diesel_store
|
||||
self.diesel_store
|
||||
.insert_payout(payout, storage_scheme)
|
||||
.await?;
|
||||
|
||||
if let Err(er) = self.kafka_producer.log_payout(&payout, None).await {
|
||||
logger::error!(message="Failed to add analytics entry for Payout {payout:?}", error_message=?er);
|
||||
};
|
||||
|
||||
Ok(payout)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn find_optional_payout_by_merchant_id_payout_id(
|
||||
|
||||
@ -26,6 +26,7 @@ pub enum EventType {
|
||||
OutgoingWebhookLogs,
|
||||
Dispute,
|
||||
AuditEvent,
|
||||
#[cfg(feature = "payouts")]
|
||||
Payout,
|
||||
}
|
||||
|
||||
|
||||
@ -6,12 +6,12 @@ use rdkafka::{
|
||||
config::FromClientConfig,
|
||||
producer::{BaseRecord, DefaultProducerContext, Producer, ThreadedProducer},
|
||||
};
|
||||
|
||||
#[cfg(feature = "payouts")]
|
||||
pub mod payout;
|
||||
use crate::events::EventType;
|
||||
mod dispute;
|
||||
mod payment_attempt;
|
||||
mod payment_intent;
|
||||
mod payout;
|
||||
mod refund;
|
||||
use data_models::payments::{payment_attempt::PaymentAttempt, PaymentIntent};
|
||||
use diesel_models::refund::Refund;
|
||||
@ -25,8 +25,6 @@ use self::{
|
||||
payment_intent::KafkaPaymentIntent, refund::KafkaRefund,
|
||||
};
|
||||
use crate::types::storage::Dispute;
|
||||
#[cfg(feature = "payouts")]
|
||||
use crate::types::storage::Payouts;
|
||||
|
||||
// Using message queue result here to avoid confusion with Kafka result provided by library
|
||||
pub type MQResult<T> = CustomResult<T, KafkaError>;
|
||||
@ -97,6 +95,7 @@ pub struct KafkaSettings {
|
||||
outgoing_webhook_logs_topic: String,
|
||||
dispute_analytics_topic: String,
|
||||
audit_events_topic: String,
|
||||
#[cfg(feature = "payouts")]
|
||||
payout_analytics_topic: String,
|
||||
}
|
||||
|
||||
@ -163,6 +162,7 @@ impl KafkaSettings {
|
||||
))
|
||||
})?;
|
||||
|
||||
#[cfg(feature = "payouts")]
|
||||
common_utils::fp_utils::when(self.payout_analytics_topic.is_default_or_empty(), || {
|
||||
Err(ApplicationError::InvalidConfigurationValueError(
|
||||
"Kafka Payout Analytics topic must not be empty".into(),
|
||||
@ -184,6 +184,7 @@ pub struct KafkaProducer {
|
||||
outgoing_webhook_logs_topic: String,
|
||||
dispute_analytics_topic: String,
|
||||
audit_events_topic: String,
|
||||
#[cfg(feature = "payouts")]
|
||||
payout_analytics_topic: String,
|
||||
}
|
||||
|
||||
@ -224,6 +225,7 @@ impl KafkaProducer {
|
||||
outgoing_webhook_logs_topic: conf.outgoing_webhook_logs_topic.clone(),
|
||||
dispute_analytics_topic: conf.dispute_analytics_topic.clone(),
|
||||
audit_events_topic: conf.audit_events_topic.clone(),
|
||||
#[cfg(feature = "payouts")]
|
||||
payout_analytics_topic: conf.payout_analytics_topic.clone(),
|
||||
})
|
||||
}
|
||||
@ -239,6 +241,7 @@ impl KafkaProducer {
|
||||
EventType::OutgoingWebhookLogs => &self.outgoing_webhook_logs_topic,
|
||||
EventType::Dispute => &self.dispute_analytics_topic,
|
||||
EventType::AuditEvent => &self.audit_events_topic,
|
||||
#[cfg(feature = "payouts")]
|
||||
EventType::Payout => &self.payout_analytics_topic,
|
||||
};
|
||||
self.producer
|
||||
@ -357,27 +360,27 @@ impl KafkaProducer {
|
||||
}
|
||||
|
||||
#[cfg(feature = "payouts")]
|
||||
pub async fn log_payout(&self, payout: &Payouts, old_payout: Option<Payouts>) -> MQResult<()> {
|
||||
pub async fn log_payout(
|
||||
&self,
|
||||
payout: &KafkaPayout<'_>,
|
||||
old_payout: Option<KafkaPayout<'_>>,
|
||||
) -> MQResult<()> {
|
||||
if let Some(negative_event) = old_payout {
|
||||
self.log_event(&KafkaEvent::old(&KafkaPayout::from_storage(
|
||||
&negative_event,
|
||||
)))
|
||||
.attach_printable_lazy(|| {
|
||||
format!("Failed to add negative payout event {negative_event:?}")
|
||||
})?;
|
||||
self.log_event(&KafkaEvent::old(&negative_event))
|
||||
.attach_printable_lazy(|| {
|
||||
format!("Failed to add negative payout event {negative_event:?}")
|
||||
})?;
|
||||
};
|
||||
self.log_event(&KafkaEvent::new(&KafkaPayout::from_storage(payout)))
|
||||
self.log_event(&KafkaEvent::new(payout))
|
||||
.attach_printable_lazy(|| format!("Failed to add positive payout event {payout:?}"))
|
||||
}
|
||||
|
||||
#[cfg(feature = "payouts")]
|
||||
pub async fn log_payout_delete(&self, delete_old_payout: &Payouts) -> MQResult<()> {
|
||||
self.log_event(&KafkaEvent::old(&KafkaPayout::from_storage(
|
||||
delete_old_payout,
|
||||
)))
|
||||
.attach_printable_lazy(|| {
|
||||
format!("Failed to add negative payout event {delete_old_payout:?}")
|
||||
})
|
||||
pub async fn log_payout_delete(&self, delete_old_payout: &KafkaPayout<'_>) -> MQResult<()> {
|
||||
self.log_event(&KafkaEvent::old(delete_old_payout))
|
||||
.attach_printable_lazy(|| {
|
||||
format!("Failed to add negative payout event {delete_old_payout:?}")
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_topic(&self, event: EventType) -> &str {
|
||||
@ -390,6 +393,7 @@ impl KafkaProducer {
|
||||
EventType::OutgoingWebhookLogs => &self.outgoing_webhook_logs_topic,
|
||||
EventType::Dispute => &self.dispute_analytics_topic,
|
||||
EventType::AuditEvent => &self.audit_events_topic,
|
||||
#[cfg(feature = "payouts")]
|
||||
EventType::Payout => &self.payout_analytics_topic,
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,67 +1,83 @@
|
||||
use common_utils::pii;
|
||||
use data_models::payouts::{payout_attempt::PayoutAttempt, payouts::Payouts};
|
||||
use diesel_models::enums as storage_enums;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
#[cfg(feature = "payouts")]
|
||||
use crate::types::storage::Payouts;
|
||||
|
||||
#[derive(serde::Serialize, Debug)]
|
||||
pub struct KafkaPayout<'a> {
|
||||
pub payout_id: &'a String,
|
||||
pub payout_attempt_id: &'a String,
|
||||
pub merchant_id: &'a String,
|
||||
pub customer_id: &'a String,
|
||||
pub address_id: &'a String,
|
||||
pub payout_type: &'a storage_enums::PayoutType,
|
||||
pub profile_id: &'a String,
|
||||
pub payout_method_id: Option<&'a String>,
|
||||
pub payout_type: storage_enums::PayoutType,
|
||||
pub amount: i64,
|
||||
pub destination_currency: &'a storage_enums::Currency,
|
||||
pub source_currency: &'a storage_enums::Currency,
|
||||
pub destination_currency: storage_enums::Currency,
|
||||
pub source_currency: storage_enums::Currency,
|
||||
pub description: Option<&'a String>,
|
||||
pub recurring: bool,
|
||||
pub auto_fulfill: bool,
|
||||
pub return_url: Option<&'a String>,
|
||||
pub entity_type: &'a storage_enums::PayoutEntityType,
|
||||
pub metadata: Option<&'a pii::SecretSerdeValue>,
|
||||
#[serde(default, with = "time::serde::timestamp")]
|
||||
pub entity_type: storage_enums::PayoutEntityType,
|
||||
pub metadata: Option<pii::SecretSerdeValue>,
|
||||
#[serde(with = "time::serde::timestamp")]
|
||||
pub created_at: OffsetDateTime,
|
||||
#[serde(default, with = "time::serde::timestamp")]
|
||||
#[serde(with = "time::serde::timestamp")]
|
||||
pub last_modified_at: OffsetDateTime,
|
||||
pub attempt_count: i16,
|
||||
pub profile_id: &'a String,
|
||||
pub status: &'a storage_enums::PayoutStatus,
|
||||
pub status: storage_enums::PayoutStatus,
|
||||
|
||||
pub connector: Option<&'a String>,
|
||||
pub connector_payout_id: &'a String,
|
||||
pub is_eligible: Option<bool>,
|
||||
pub error_message: Option<&'a String>,
|
||||
pub error_code: Option<&'a String>,
|
||||
pub business_country: Option<storage_enums::CountryAlpha2>,
|
||||
pub business_label: Option<&'a String>,
|
||||
pub merchant_connector_id: Option<&'a String>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "payouts")]
|
||||
impl<'a> KafkaPayout<'a> {
|
||||
pub fn from_storage(payout: &'a Payouts) -> Self {
|
||||
pub fn from_storage(payouts: &'a Payouts, payout_attempt: &'a PayoutAttempt) -> Self {
|
||||
Self {
|
||||
payout_id: &payout.payout_id,
|
||||
merchant_id: &payout.merchant_id,
|
||||
customer_id: &payout.customer_id,
|
||||
address_id: &payout.address_id,
|
||||
payout_type: &payout.payout_type,
|
||||
payout_method_id: payout.payout_method_id.as_ref(),
|
||||
amount: payout.amount,
|
||||
destination_currency: &payout.destination_currency,
|
||||
source_currency: &payout.source_currency,
|
||||
description: payout.description.as_ref(),
|
||||
recurring: payout.recurring,
|
||||
auto_fulfill: payout.auto_fulfill,
|
||||
return_url: payout.return_url.as_ref(),
|
||||
entity_type: &payout.entity_type,
|
||||
metadata: payout.metadata.as_ref(),
|
||||
created_at: payout.created_at.assume_utc(),
|
||||
last_modified_at: payout.last_modified_at.assume_utc(),
|
||||
attempt_count: payout.attempt_count,
|
||||
profile_id: &payout.profile_id,
|
||||
status: &payout.status,
|
||||
payout_id: &payouts.payout_id,
|
||||
payout_attempt_id: &payout_attempt.payout_attempt_id,
|
||||
merchant_id: &payouts.merchant_id,
|
||||
customer_id: &payouts.customer_id,
|
||||
address_id: &payouts.address_id,
|
||||
profile_id: &payouts.profile_id,
|
||||
payout_method_id: payouts.payout_method_id.as_ref(),
|
||||
payout_type: payouts.payout_type,
|
||||
amount: payouts.amount,
|
||||
destination_currency: payouts.destination_currency,
|
||||
source_currency: payouts.source_currency,
|
||||
description: payouts.description.as_ref(),
|
||||
recurring: payouts.recurring,
|
||||
auto_fulfill: payouts.auto_fulfill,
|
||||
return_url: payouts.return_url.as_ref(),
|
||||
entity_type: payouts.entity_type,
|
||||
metadata: payouts.metadata.clone(),
|
||||
created_at: payouts.created_at.assume_utc(),
|
||||
last_modified_at: payouts.last_modified_at.assume_utc(),
|
||||
attempt_count: payouts.attempt_count,
|
||||
status: payouts.status,
|
||||
connector: payout_attempt.connector.as_ref(),
|
||||
connector_payout_id: &payout_attempt.connector_payout_id,
|
||||
is_eligible: payout_attempt.is_eligible,
|
||||
error_message: payout_attempt.error_message.as_ref(),
|
||||
error_code: payout_attempt.error_code.as_ref(),
|
||||
business_country: payout_attempt.business_country,
|
||||
business_label: payout_attempt.business_label.as_ref(),
|
||||
merchant_connector_id: payout_attempt.merchant_connector_id.as_ref(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> super::KafkaMessage for KafkaPayout<'a> {
|
||||
fn key(&self) -> String {
|
||||
format!("{}_{}", self.merchant_id, self.payout_id)
|
||||
format!("{}_{}", self.merchant_id, self.payout_attempt_id)
|
||||
}
|
||||
|
||||
fn creation_timestamp(&self) -> Option<i64> {
|
||||
|
||||
@ -1,8 +1,11 @@
|
||||
use common_utils::errors::CustomResult;
|
||||
use data_models::{
|
||||
errors::StorageError,
|
||||
payouts::payout_attempt::{
|
||||
PayoutAttempt, PayoutAttemptInterface, PayoutAttemptNew, PayoutAttemptUpdate,
|
||||
payouts::{
|
||||
payout_attempt::{
|
||||
PayoutAttempt, PayoutAttemptInterface, PayoutAttemptNew, PayoutAttemptUpdate,
|
||||
},
|
||||
payouts::Payouts,
|
||||
},
|
||||
};
|
||||
use diesel_models::enums as storage_enums;
|
||||
@ -15,6 +18,7 @@ impl PayoutAttemptInterface for MockDb {
|
||||
&self,
|
||||
_this: &PayoutAttempt,
|
||||
_payout_attempt_update: PayoutAttemptUpdate,
|
||||
_payouts: &Payouts,
|
||||
_storage_scheme: storage_enums::MerchantStorageScheme,
|
||||
) -> CustomResult<PayoutAttempt, StorageError> {
|
||||
// TODO: Implement function for `MockDb`
|
||||
@ -23,7 +27,8 @@ impl PayoutAttemptInterface for MockDb {
|
||||
|
||||
async fn insert_payout_attempt(
|
||||
&self,
|
||||
_payout: PayoutAttemptNew,
|
||||
_payout_attempt: PayoutAttemptNew,
|
||||
_payouts: &Payouts,
|
||||
_storage_scheme: storage_enums::MerchantStorageScheme,
|
||||
) -> CustomResult<PayoutAttempt, StorageError> {
|
||||
// TODO: Implement function for `MockDb`
|
||||
@ -42,7 +47,7 @@ impl PayoutAttemptInterface for MockDb {
|
||||
|
||||
async fn get_filters_for_payouts(
|
||||
&self,
|
||||
_payouts: &[data_models::payouts::payouts::Payouts],
|
||||
_payouts: &[Payouts],
|
||||
_merchant_id: &str,
|
||||
_storage_scheme: storage_enums::MerchantStorageScheme,
|
||||
) -> CustomResult<data_models::payouts::payout_attempt::PayoutListFilters, StorageError> {
|
||||
|
||||
@ -1,9 +1,10 @@
|
||||
use common_utils::errors::CustomResult;
|
||||
#[cfg(all(feature = "olap", feature = "payouts"))]
|
||||
use data_models::payouts::payout_attempt::PayoutAttempt;
|
||||
use data_models::{
|
||||
errors::StorageError,
|
||||
payouts::payouts::{Payouts, PayoutsInterface, PayoutsNew, PayoutsUpdate},
|
||||
payouts::{
|
||||
payout_attempt::PayoutAttempt,
|
||||
payouts::{Payouts, PayoutsInterface, PayoutsNew, PayoutsUpdate},
|
||||
},
|
||||
};
|
||||
use diesel_models::enums as storage_enums;
|
||||
|
||||
@ -25,6 +26,7 @@ impl PayoutsInterface for MockDb {
|
||||
&self,
|
||||
_this: &Payouts,
|
||||
_payout_update: PayoutsUpdate,
|
||||
_payout_attempt: &PayoutAttempt,
|
||||
_storage_scheme: storage_enums::MerchantStorageScheme,
|
||||
) -> CustomResult<Payouts, StorageError> {
|
||||
// TODO: Implement function for `MockDb`
|
||||
|
||||
@ -40,12 +40,13 @@ impl<T: DatabaseStore> PayoutAttemptInterface for KVRouterStore<T> {
|
||||
async fn insert_payout_attempt(
|
||||
&self,
|
||||
new_payout_attempt: PayoutAttemptNew,
|
||||
payouts: &Payouts,
|
||||
storage_scheme: MerchantStorageScheme,
|
||||
) -> error_stack::Result<PayoutAttempt, errors::StorageError> {
|
||||
match storage_scheme {
|
||||
MerchantStorageScheme::PostgresOnly => {
|
||||
self.router_store
|
||||
.insert_payout_attempt(new_payout_attempt, storage_scheme)
|
||||
.insert_payout_attempt(new_payout_attempt, payouts, storage_scheme)
|
||||
.await
|
||||
}
|
||||
MerchantStorageScheme::RedisKv => {
|
||||
@ -129,12 +130,13 @@ impl<T: DatabaseStore> PayoutAttemptInterface for KVRouterStore<T> {
|
||||
&self,
|
||||
this: &PayoutAttempt,
|
||||
payout_update: PayoutAttemptUpdate,
|
||||
payouts: &Payouts,
|
||||
storage_scheme: MerchantStorageScheme,
|
||||
) -> error_stack::Result<PayoutAttempt, errors::StorageError> {
|
||||
match storage_scheme {
|
||||
MerchantStorageScheme::PostgresOnly => {
|
||||
self.router_store
|
||||
.update_payout_attempt(this, payout_update, storage_scheme)
|
||||
.update_payout_attempt(this, payout_update, payouts, storage_scheme)
|
||||
.await
|
||||
}
|
||||
MerchantStorageScheme::RedisKv => {
|
||||
@ -254,6 +256,7 @@ impl<T: DatabaseStore> PayoutAttemptInterface for crate::RouterStore<T> {
|
||||
async fn insert_payout_attempt(
|
||||
&self,
|
||||
new: PayoutAttemptNew,
|
||||
_payouts: &Payouts,
|
||||
_storage_scheme: MerchantStorageScheme,
|
||||
) -> error_stack::Result<PayoutAttempt, errors::StorageError> {
|
||||
let conn = pg_connection_write(self).await?;
|
||||
@ -272,6 +275,7 @@ impl<T: DatabaseStore> PayoutAttemptInterface for crate::RouterStore<T> {
|
||||
&self,
|
||||
this: &PayoutAttempt,
|
||||
payout: PayoutAttemptUpdate,
|
||||
_payouts: &Payouts,
|
||||
_storage_scheme: MerchantStorageScheme,
|
||||
) -> error_stack::Result<PayoutAttempt, errors::StorageError> {
|
||||
let conn = pg_connection_write(self).await?;
|
||||
|
||||
@ -2,10 +2,13 @@
|
||||
use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl};
|
||||
use common_utils::ext_traits::Encode;
|
||||
#[cfg(feature = "olap")]
|
||||
use data_models::payouts::{payout_attempt::PayoutAttempt, PayoutFetchConstraints};
|
||||
use data_models::payouts::PayoutFetchConstraints;
|
||||
use data_models::{
|
||||
errors::StorageError,
|
||||
payouts::payouts::{Payouts, PayoutsInterface, PayoutsNew, PayoutsUpdate},
|
||||
payouts::{
|
||||
payout_attempt::PayoutAttempt,
|
||||
payouts::{Payouts, PayoutsInterface, PayoutsNew, PayoutsUpdate},
|
||||
},
|
||||
};
|
||||
#[cfg(feature = "olap")]
|
||||
use diesel::{associations::HasTable, ExpressionMethods, JoinOnDsl, QueryDsl};
|
||||
@ -115,12 +118,13 @@ impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {
|
||||
&self,
|
||||
this: &Payouts,
|
||||
payout_update: PayoutsUpdate,
|
||||
payout_attempt: &PayoutAttempt,
|
||||
storage_scheme: MerchantStorageScheme,
|
||||
) -> error_stack::Result<Payouts, StorageError> {
|
||||
match storage_scheme {
|
||||
MerchantStorageScheme::PostgresOnly => {
|
||||
self.router_store
|
||||
.update_payout(this, payout_update, storage_scheme)
|
||||
.update_payout(this, payout_update, payout_attempt, storage_scheme)
|
||||
.await
|
||||
}
|
||||
MerchantStorageScheme::RedisKv => {
|
||||
@ -316,6 +320,7 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
|
||||
&self,
|
||||
this: &Payouts,
|
||||
payout: PayoutsUpdate,
|
||||
_payout_attempt: &PayoutAttempt,
|
||||
_storage_scheme: MerchantStorageScheme,
|
||||
) -> error_stack::Result<Payouts, StorageError> {
|
||||
let conn = pg_connection_write(self).await?;
|
||||
|
||||
Reference in New Issue
Block a user