chore: adding addition fields from psql to kafka event for analytics usecase (#3815)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: Sampras Lopes <lsampras@pm.me>
This commit is contained in:
ShivanshMathurJuspay
2024-03-01 12:22:41 +05:30
committed by GitHub
parent 49d2298102
commit cc0d006330
4 changed files with 57 additions and 144 deletions

View File

@ -1,142 +0,0 @@
CREATE TABLE hyperswitch.dispute_queue on cluster '{cluster}' (
`dispute_id` String,
`amount` String,
`currency` String,
`dispute_stage` LowCardinality(String),
`dispute_status` LowCardinality(String),
`payment_id` String,
`attempt_id` String,
`merchant_id` String,
`connector_status` String,
`connector_dispute_id` String,
`connector_reason` Nullable(String),
`connector_reason_code` Nullable(String),
`challenge_required_by` Nullable(DateTime) CODEC(T64, LZ4),
`connector_created_at` Nullable(DateTime) CODEC(T64, LZ4),
`connector_updated_at` Nullable(DateTime) CODEC(T64, LZ4),
`created_at` DateTime CODEC(T64, LZ4),
`modified_at` DateTime CODEC(T64, LZ4),
`connector` LowCardinality(String),
`evidence` Nullable(String),
`profile_id` Nullable(String),
`merchant_connector_id` Nullable(String),
`sign_flag` Int8
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
kafka_topic_list = 'hyperswitch-dispute-events',
kafka_group_name = 'hyper-c1',
kafka_format = 'JSONEachRow',
kafka_handle_error_mode = 'stream';
CREATE MATERIALIZED VIEW hyperswitch.dispute_mv on cluster '{cluster}' TO hyperswitch.dispute (
`dispute_id` String,
`amount` String,
`currency` String,
`dispute_stage` LowCardinality(String),
`dispute_status` LowCardinality(String),
`payment_id` String,
`attempt_id` String,
`merchant_id` String,
`connector_status` String,
`connector_dispute_id` String,
`connector_reason` Nullable(String),
`connector_reason_code` Nullable(String),
`challenge_required_by` Nullable(DateTime64(3)),
`connector_created_at` Nullable(DateTime64(3)),
`connector_updated_at` Nullable(DateTime64(3)),
`created_at` DateTime64(3),
`modified_at` DateTime64(3),
`connector` LowCardinality(String),
`evidence` Nullable(String),
`profile_id` Nullable(String),
`merchant_connector_id` Nullable(String),
`inserted_at` DateTime64(3),
`sign_flag` Int8
) AS
SELECT
dispute_id,
amount,
currency,
dispute_stage,
dispute_status,
payment_id,
attempt_id,
merchant_id,
connector_status,
connector_dispute_id,
connector_reason,
connector_reason_code,
challenge_required_by,
connector_created_at,
connector_updated_at,
created_at,
modified_at,
connector,
evidence,
profile_id,
merchant_connector_id,
now() as inserted_at,
sign_flag
FROM
hyperswitch.dispute_queue
WHERE length(_error) = 0;
CREATE TABLE hyperswitch.dispute_clustered on cluster '{cluster}' (
`dispute_id` String,
`amount` String,
`currency` String,
`dispute_stage` LowCardinality(String),
`dispute_status` LowCardinality(String),
`payment_id` String,
`attempt_id` String,
`merchant_id` String,
`connector_status` String,
`connector_dispute_id` String,
`connector_reason` Nullable(String),
`connector_reason_code` Nullable(String),
`challenge_required_by` Nullable(DateTime) CODEC(T64, LZ4),
`connector_created_at` Nullable(DateTime) CODEC(T64, LZ4),
`connector_updated_at` Nullable(DateTime) CODEC(T64, LZ4),
`created_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`modified_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`connector` LowCardinality(String),
`evidence` String DEFAULT '{}' CODEC(T64, LZ4),
`profile_id` Nullable(String),
`merchant_connector_id` Nullable(String),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`sign_flag` Int8
INDEX connectorIndex connector TYPE bloom_filter GRANULARITY 1,
INDEX disputeStatusIndex dispute_status TYPE bloom_filter GRANULARITY 1,
INDEX disputeStageIndex dispute_stage TYPE bloom_filter GRANULARITY 1
) ENGINE = ReplicatedCollapsingMergeTree(
'/clickhouse/{installation}/{cluster}/tables/{shard}/hyperswitch/dispute_clustered',
'{replica}',
dispute_status
)
PARTITION BY toStartOfDay(created_at)
ORDER BY
(created_at, merchant_id, dispute_id)
TTL created_at + toIntervalMonth(6);
CREATE MATERIALIZED VIEW hyperswitch.dispute_parse_errors on cluster '{cluster}'
(
`topic` String,
`partition` Int64,
`offset` Int64,
`raw` String,
`error` String
)
ENGINE = MergeTree
ORDER BY (topic, partition, offset)
SETTINGS index_granularity = 8192 AS
SELECT
_topic AS topic,
_partition AS partition,
_offset AS offset,
_raw_message AS raw,
_error AS error
FROM hyperswitch.dispute_queue
WHERE length(_error) > 0
;

View File

@ -58,7 +58,7 @@ CREATE TABLE api_events_dist (
`hs_latency` Nullable(UInt128),
`http_method` LowCardinality(String),
`url_path` String,
`dispute_id` Nullable(String)
`dispute_id` Nullable(String),
INDEX flowIndex flow_type TYPE bloom_filter GRANULARITY 1,
INDEX apiIndex api_flow TYPE bloom_filter GRANULARITY 1,
INDEX statusIndex status_code TYPE bloom_filter GRANULARITY 1

View File

@ -29,6 +29,15 @@ CREATE TABLE payment_attempts_queue (
`created_at` DateTime CODEC(T64, LZ4),
`last_synced` Nullable(DateTime) CODEC(T64, LZ4),
`modified_at` DateTime CODEC(T64, LZ4),
`payment_method_data` Nullable(String),
`error_reason` Nullable(String),
`multiple_capture_count` Nullable(Int16),
`amount_capturable` Nullable(UInt64) ,
`merchant_connector_id` Nullable(String),
`net_amount` Nullable(UInt64) ,
`unified_code` Nullable(String),
`unified_message` Nullable(String),
`mandate_data` Nullable(String),
`sign_flag` Int8
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
kafka_topic_list = 'hyperswitch-payment-attempt-events',
@ -67,6 +76,15 @@ CREATE TABLE payment_attempt_dist (
`created_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`last_synced` Nullable(DateTime) CODEC(T64, LZ4),
`modified_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`payment_method_data` Nullable(String),
`error_reason` Nullable(String),
`multiple_capture_count` Nullable(Int16),
`amount_capturable` Nullable(UInt64) ,
`merchant_connector_id` Nullable(String),
`net_amount` Nullable(UInt64) ,
`unified_code` Nullable(String),
`unified_message` Nullable(String),
`mandate_data` Nullable(String),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`sign_flag` Int8,
INDEX connectorIndex connector TYPE bloom_filter GRANULARITY 1,
@ -115,6 +133,15 @@ CREATE MATERIALIZED VIEW kafka_parse_pa TO payment_attempt_dist (
`capture_on` Nullable(DateTime64(3)),
`last_synced` Nullable(DateTime64(3)),
`modified_at` DateTime64(3),
`payment_method_data` Nullable(String),
`error_reason` Nullable(String),
`multiple_capture_count` Nullable(Int16),
`amount_capturable` Nullable(UInt64) ,
`merchant_connector_id` Nullable(String),
`net_amount` Nullable(UInt64) ,
`unified_code` Nullable(String),
`unified_message` Nullable(String),
`mandate_data` Nullable(String),
`inserted_at` DateTime64(3),
`sign_flag` Int8
) AS
@ -149,6 +176,15 @@ SELECT
capture_on,
last_synced,
modified_at,
payment_method_data,
error_reason,
multiple_capture_count,
amount_capturable,
merchant_connector_id,
net_amount,
unified_code,
unified_message,
mandate_data,
now() as inserted_at,
sign_flag
FROM

View File

@ -1,4 +1,5 @@
use data_models::payments::payment_attempt::PaymentAttempt;
// use diesel_models::enums::MandateDetails;
use data_models::{mandates::MandateDetails, payments::payment_attempt::PaymentAttempt};
use diesel_models::enums as storage_enums;
use time::OffsetDateTime;
@ -39,6 +40,15 @@ pub struct KafkaPaymentAttempt<'a> {
// TODO: These types should implement copy ideally
pub payment_experience: Option<&'a storage_enums::PaymentExperience>,
pub payment_method_type: Option<&'a storage_enums::PaymentMethodType>,
pub payment_method_data: Option<String>,
pub error_reason: Option<&'a String>,
pub multiple_capture_count: Option<i16>,
pub amount_capturable: i64,
pub merchant_connector_id: Option<&'a String>,
pub net_amount: i64,
pub unified_code: Option<&'a String>,
pub unified_message: Option<&'a String>,
pub mandate_data: Option<&'a MandateDetails>,
}
impl<'a> KafkaPaymentAttempt<'a> {
@ -74,6 +84,15 @@ impl<'a> KafkaPaymentAttempt<'a> {
connector_metadata: attempt.connector_metadata.as_ref().map(|v| v.to_string()),
payment_experience: attempt.payment_experience.as_ref(),
payment_method_type: attempt.payment_method_type.as_ref(),
payment_method_data: attempt.payment_method_data.as_ref().map(|v| v.to_string()),
error_reason: attempt.error_reason.as_ref(),
multiple_capture_count: attempt.multiple_capture_count,
amount_capturable: attempt.amount_capturable,
merchant_connector_id: attempt.merchant_connector_id.as_ref(),
net_amount: attempt.net_amount,
unified_code: attempt.unified_code.as_ref(),
unified_message: attempt.unified_message.as_ref(),
mandate_data: attempt.mandate_data.as_ref(),
}
}
}