feat(clickhouse): Init Clickhouse container on startup (#4365)

Co-authored-by: Vrishab Srivatsa <136090360+vsrivatsa-juspay@users.noreply.github.com>
This commit is contained in:
Sampras Lopes
2024-05-06 16:34:04 +05:30
committed by GitHub
parent b878677f15
commit 89e5884f9e
10 changed files with 540 additions and 277 deletions

View File

@ -23,17 +23,16 @@ CREATE TABLE api_events_queue (
`ip_addr` String, `ip_addr` String,
`hs_latency` Nullable(UInt128), `hs_latency` Nullable(UInt128),
`http_method` LowCardinality(String), `http_method` LowCardinality(String),
`url_path` String, `url_path` Nullable(String),
`dispute_id` Nullable(String) `dispute_id` Nullable(String)
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092', ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
kafka_topic_list = 'hyperswitch-api-log-events', kafka_topic_list = 'hyperswitch-api-log-events',
kafka_group_name = 'hyper-c1', kafka_group_name = 'hyper',
kafka_format = 'JSONEachRow', kafka_format = 'JSONEachRow',
kafka_handle_error_mode = 'stream'; kafka_handle_error_mode = 'stream';
CREATE TABLE api_events (
CREATE TABLE api_events_dist ( `merchant_id` LowCardinality(String),
`merchant_id` String,
`payment_id` Nullable(String), `payment_id` Nullable(String),
`refund_id` Nullable(String), `refund_id` Nullable(String),
`payment_method_id` Nullable(String), `payment_method_id` Nullable(String),
@ -51,27 +50,32 @@ CREATE TABLE api_events_dist (
`error` Nullable(String), `error` Nullable(String),
`authentication_data` Nullable(String), `authentication_data` Nullable(String),
`status_code` UInt32, `status_code` UInt32,
`created_at_timestamp` DateTime64(3), `created_at` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`latency` UInt128, `latency` UInt128,
`user_agent` String, `user_agent` String,
`ip_addr` String, `ip_addr` String,
`hs_latency` Nullable(UInt128), `hs_latency` Nullable(UInt128),
`http_method` LowCardinality(String), `http_method` LowCardinality(String),
`url_path` String, `url_path` Nullable(String),
`dispute_id` Nullable(String), `dispute_id` Nullable(String),
`masked_response` Nullable(String),
INDEX flowIndex flow_type TYPE bloom_filter GRANULARITY 1, INDEX flowIndex flow_type TYPE bloom_filter GRANULARITY 1,
INDEX apiIndex api_flow TYPE bloom_filter GRANULARITY 1, INDEX apiIndex api_flow TYPE bloom_filter GRANULARITY 1,
INDEX statusIndex status_code TYPE bloom_filter GRANULARITY 1 INDEX statusIndex status_code TYPE bloom_filter GRANULARITY 1
) ENGINE = MergeTree ) ENGINE = MergeTree PARTITION BY toStartOfDay(created_at)
PARTITION BY toStartOfDay(created_at)
ORDER BY ORDER BY
(created_at, merchant_id, flow_type, status_code, api_flow) (
TTL created_at + toIntervalMonth(6) created_at,
; merchant_id,
flow_type,
status_code,
api_flow
) TTL inserted_at + toIntervalMonth(18) SETTINGS index_granularity = 8192;
CREATE MATERIALIZED VIEW api_events_mv TO api_events_dist ( CREATE TABLE api_events_audit (
`merchant_id` String, `merchant_id` LowCardinality(String),
`payment_id` Nullable(String), `payment_id` String,
`refund_id` Nullable(String), `refund_id` Nullable(String),
`payment_method_id` Nullable(String), `payment_method_id` Nullable(String),
`payment_method` Nullable(String), `payment_method` Nullable(String),
@ -88,14 +92,69 @@ CREATE MATERIALIZED VIEW api_events_mv TO api_events_dist (
`error` Nullable(String), `error` Nullable(String),
`authentication_data` Nullable(String), `authentication_data` Nullable(String),
`status_code` UInt32, `status_code` UInt32,
`created_at_timestamp` DateTime64(3), `created_at` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`latency` UInt128, `latency` UInt128,
`user_agent` String, `user_agent` String,
`ip_addr` String, `ip_addr` String,
`hs_latency` Nullable(UInt128), `hs_latency` Nullable(UInt128),
`http_method` LowCardinality(String), `http_method` LowCardinality(Nullable(String)),
`url_path` String, `url_path` Nullable(String),
`dispute_id` Nullable(String) `dispute_id` Nullable(String),
`masked_response` Nullable(String)
) ENGINE = MergeTree PARTITION BY merchant_id
ORDER BY
(merchant_id, payment_id) TTL inserted_at + toIntervalMonth(18) SETTINGS index_granularity = 8192;
CREATE MATERIALIZED VIEW api_events_parse_errors (
`topic` String,
`partition` Int64,
`offset` Int64,
`raw` String,
`error` String
) ENGINE = MergeTree
ORDER BY
(topic, partition, offset) SETTINGS index_granularity = 8192 AS
SELECT
_topic AS topic,
_partition AS partition,
_offset AS offset,
_raw_message AS raw,
_error AS error
FROM
api_events_queue
WHERE
length(_error) > 0;
CREATE MATERIALIZED VIEW api_events_audit_mv TO api_events_audit (
`merchant_id` String,
`payment_id` String,
`refund_id` Nullable(String),
`payment_method_id` Nullable(String),
`payment_method` Nullable(String),
`payment_method_type` Nullable(String),
`customer_id` Nullable(String),
`user_id` Nullable(String),
`connector` Nullable(String),
`request_id` String,
`flow_type` LowCardinality(String),
`api_flow` LowCardinality(String),
`api_auth_type` LowCardinality(String),
`request` String,
`response` Nullable(String),
`error` Nullable(String),
`authentication_data` Nullable(String),
`status_code` UInt32,
`created_at` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`latency` UInt128,
`user_agent` String,
`ip_addr` String,
`hs_latency` Nullable(UInt128),
`http_method` LowCardinality(Nullable(String)),
`url_path` Nullable(String),
`dispute_id` Nullable(String),
`masked_response` Nullable(String)
) AS ) AS
SELECT SELECT
merchant_id, merchant_id,
@ -116,37 +175,82 @@ SELECT
error, error,
authentication_data, authentication_data,
status_code, status_code,
created_at_timestamp, created_at_timestamp AS created_at,
now() as inserted_at, now() AS inserted_at,
latency, latency,
user_agent, user_agent,
ip_addr, ip_addr,
hs_latency, hs_latency,
http_method, http_method,
url_path, url_path,
dispute_id dispute_id,
response AS masked_response
FROM FROM
api_events_queue api_events_queue
where length(_error) = 0; WHERE
(length(_error) = 0)
AND (payment_id IS NOT NULL);
CREATE MATERIALIZED VIEW api_events_mv TO api_events (
CREATE MATERIALIZED VIEW api_events_parse_errors `merchant_id` String,
( `payment_id` Nullable(String),
`topic` String, `refund_id` Nullable(String),
`partition` Int64, `payment_method_id` Nullable(String),
`offset` Int64, `payment_method` Nullable(String),
`raw` String, `payment_method_type` Nullable(String),
`error` String `customer_id` Nullable(String),
) `user_id` Nullable(String),
ENGINE = MergeTree `connector` Nullable(String),
ORDER BY (topic, partition, offset) `request_id` String,
SETTINGS index_granularity = 8192 AS `flow_type` LowCardinality(String),
`api_flow` LowCardinality(String),
`api_auth_type` LowCardinality(String),
`request` String,
`response` Nullable(String),
`error` Nullable(String),
`authentication_data` Nullable(String),
`status_code` UInt32,
`created_at` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`latency` UInt128,
`user_agent` String,
`ip_addr` String,
`hs_latency` Nullable(UInt128),
`http_method` LowCardinality(Nullable(String)),
`url_path` Nullable(String),
`dispute_id` Nullable(String),
`masked_response` Nullable(String)
) AS
SELECT SELECT
_topic AS topic, merchant_id,
_partition AS partition, payment_id,
_offset AS offset, refund_id,
_raw_message AS raw, payment_method_id,
_error AS error payment_method,
FROM api_events_queue payment_method_type,
WHERE length(_error) > 0 customer_id,
; user_id,
connector,
request_id,
flow_type,
api_flow,
api_auth_type,
request,
response,
error,
authentication_data,
status_code,
created_at_timestamp AS created_at,
now() AS inserted_at,
latency,
user_agent,
ip_addr,
hs_latency,
http_method,
url_path,
dispute_id,
response AS masked_response
FROM
api_events_queue
WHERE
length(_error) = 0;

View File

@ -5,23 +5,95 @@ CREATE TABLE connector_events_queue (
`request_id` String, `request_id` String,
`flow` LowCardinality(String), `flow` LowCardinality(String),
`request` String, `request` String,
`masked_response` Nullable(String),
`error` Nullable(String),
`status_code` UInt32,
`created_at` DateTime64(3),
`latency` UInt128,
`method` LowCardinality(String),
`dispute_id` Nullable(String),
`refund_id` Nullable(String)
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
kafka_topic_list = 'hyperswitch-outgoing-connector-events',
kafka_group_name = 'hyper',
kafka_format = 'JSONEachRow',
kafka_handle_error_mode = 'stream';
CREATE MATERIALIZED VIEW connector_events_parse_errors (
`topic` String,
`partition` Int64,
`offset` Int64,
`raw` String,
`error` String
) ENGINE = MergeTree
ORDER BY
(topic, partition, offset) SETTINGS index_granularity = 8192 AS
SELECT
_topic AS topic,
_partition AS partition,
_offset AS offset,
_raw_message AS raw,
_error AS error
FROM
connector_events_queue
WHERE
length(_error) > 0;
CREATE TABLE connector_events (
`merchant_id` LowCardinality(String),
`payment_id` Nullable(String),
`connector_name` LowCardinality(String),
`request_id` String,
`flow` LowCardinality(String),
`request` String,
`response` Nullable(String), `response` Nullable(String),
`masked_response` Nullable(String), `masked_response` Nullable(String),
`error` Nullable(String), `error` Nullable(String),
`status_code` UInt32, `status_code` UInt32,
`created_at` DateTime64(3), `created_at` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`latency` UInt128, `latency` UInt128,
`method` LowCardinality(String), `method` LowCardinality(String),
`dispute_id` Nullable(String),
`refund_id` Nullable(String), `refund_id` Nullable(String),
`dispute_id` Nullable(String) INDEX flowIndex flow TYPE bloom_filter GRANULARITY 1,
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092', INDEX connectorIndex connector_name TYPE bloom_filter GRANULARITY 1,
kafka_topic_list = 'hyperswitch-connector-api-events', INDEX statusIndex status_code TYPE bloom_filter GRANULARITY 1
kafka_group_name = 'hyper-c1', ) ENGINE = MergeTree PARTITION BY toStartOfDay(created_at)
kafka_format = 'JSONEachRow', ORDER BY
kafka_handle_error_mode = 'stream'; (
created_at,
merchant_id,
connector_name,
flow,
status_code
) TTL inserted_at + toIntervalMonth(18) SETTINGS index_granularity = 8192;
CREATE TABLE connector_events_audit (
`merchant_id` LowCardinality(String),
`payment_id` String,
`connector_name` LowCardinality(String),
`request_id` String,
`flow` LowCardinality(String),
`request` String,
`response` Nullable(String),
`masked_response` Nullable(String),
`error` Nullable(String),
`status_code` UInt32,
`created_at` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`latency` UInt128,
`method` LowCardinality(String),
`dispute_id` Nullable(String),
`refund_id` Nullable(String),
INDEX flowIndex flow TYPE bloom_filter GRANULARITY 1,
INDEX connectorIndex connector_name TYPE bloom_filter GRANULARITY 1,
INDEX statusIndex status_code TYPE bloom_filter GRANULARITY 1
) ENGINE = MergeTree PARTITION BY merchant_id
ORDER BY
(merchant_id, payment_id) TTL inserted_at + toIntervalMonth(18) SETTINGS index_granularity = 8192;
CREATE TABLE connector_events_dist ( CREATE MATERIALIZED VIEW connector_events_audit_mv TO connector_events_audit (
`merchant_id` String, `merchant_id` String,
`payment_id` Nullable(String), `payment_id` Nullable(String),
`connector_name` LowCardinality(String), `connector_name` LowCardinality(String),
@ -37,18 +109,32 @@ CREATE TABLE connector_events_dist (
`latency` UInt128, `latency` UInt128,
`method` LowCardinality(String), `method` LowCardinality(String),
`refund_id` Nullable(String), `refund_id` Nullable(String),
`dispute_id` Nullable(String), `dispute_id` Nullable(String)
INDEX flowIndex flow TYPE bloom_filter GRANULARITY 1, ) AS
INDEX connectorIndex connector_name TYPE bloom_filter GRANULARITY 1, SELECT
INDEX statusIndex status_code TYPE bloom_filter GRANULARITY 1 merchant_id,
) ENGINE = MergeTree payment_id,
PARTITION BY toStartOfDay(created_at) connector_name,
ORDER BY request_id,
(created_at, merchant_id, connector_name, flow) flow,
TTL inserted_at + toIntervalMonth(6) request,
; masked_response AS response,
masked_response,
error,
status_code,
created_at,
now64() AS inserted_at,
latency,
method,
refund_id,
dispute_id
FROM
connector_events_queue
WHERE
(length(_error) = 0)
AND (payment_id IS NOT NULL);
CREATE MATERIALIZED VIEW connector_events_mv TO connector_events_dist ( CREATE MATERIALIZED VIEW connector_events_mv TO connector_events (
`merchant_id` String, `merchant_id` String,
`payment_id` Nullable(String), `payment_id` Nullable(String),
`connector_name` LowCardinality(String), `connector_name` LowCardinality(String),
@ -60,6 +146,7 @@ CREATE MATERIALIZED VIEW connector_events_mv TO connector_events_dist (
`error` Nullable(String), `error` Nullable(String),
`status_code` UInt32, `status_code` UInt32,
`created_at` DateTime64(3), `created_at` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`latency` UInt128, `latency` UInt128,
`method` LowCardinality(String), `method` LowCardinality(String),
`refund_id` Nullable(String), `refund_id` Nullable(String),
@ -72,38 +159,17 @@ SELECT
request_id, request_id,
flow, flow,
request, request,
response, masked_response AS response,
masked_response, masked_response,
error, error,
status_code, status_code,
created_at, created_at,
now() as inserted_at, now64() AS inserted_at,
latency, latency,
method, method,
refund_id, refund_id,
dispute_id dispute_id
FROM FROM
connector_events_queue connector_events_queue
where length(_error) = 0; WHERE
length(_error) = 0;
CREATE MATERIALIZED VIEW connector_events_parse_errors
(
`topic` String,
`partition` Int64,
`offset` Int64,
`raw` String,
`error` String
)
ENGINE = MergeTree
ORDER BY (topic, partition, offset)
SETTINGS index_granularity = 8192 AS
SELECT
_topic AS topic,
_partition AS partition,
_offset AS offset,
_raw_message AS raw,
_error AS error
FROM connector_events_queue
WHERE length(_error) > 0
;

View File

@ -1,6 +1,6 @@
CREATE TABLE dispute_queue ( CREATE TABLE dispute_queue (
`dispute_id` String, `dispute_id` String,
`amount` String, `dispute_amount` UInt32,
`currency` String, `currency` String,
`dispute_stage` LowCardinality(String), `dispute_stage` LowCardinality(String),
`dispute_status` LowCardinality(String), `dispute_status` LowCardinality(String),
@ -23,20 +23,19 @@ CREATE TABLE dispute_queue (
`sign_flag` Int8 `sign_flag` Int8
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092', ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
kafka_topic_list = 'hyperswitch-dispute-events', kafka_topic_list = 'hyperswitch-dispute-events',
kafka_group_name = 'hyper-c1', kafka_group_name = 'hyper',
kafka_format = 'JSONEachRow', kafka_format = 'JSONEachRow',
kafka_handle_error_mode = 'stream'; kafka_handle_error_mode = 'stream';
CREATE TABLE dispute ( CREATE TABLE dispute (
`dispute_id` String, `dispute_id` String,
`amount` String, `dispute_amount` UInt32,
`currency` String, `currency` String,
`dispute_stage` LowCardinality(String), `dispute_stage` LowCardinality(String),
`dispute_status` LowCardinality(String), `dispute_status` LowCardinality(String),
`payment_id` String, `payment_id` String,
`attempt_id` String, `attempt_id` String,
`merchant_id` String, `merchant_id` LowCardinality(String),
`connector_status` String, `connector_status` String,
`connector_dispute_id` String, `connector_dispute_id` String,
`connector_reason` Nullable(String), `connector_reason` Nullable(String),
@ -47,26 +46,21 @@ CREATE TABLE dispute (
`created_at` DateTime DEFAULT now() CODEC(T64, LZ4), `created_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`modified_at` DateTime DEFAULT now() CODEC(T64, LZ4), `modified_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`connector` LowCardinality(String), `connector` LowCardinality(String),
`evidence` String DEFAULT '{}' CODEC(T64, LZ4), `evidence` String DEFAULT '{}',
`profile_id` Nullable(String), `profile_id` Nullable(String),
`merchant_connector_id` Nullable(String), `merchant_connector_id` Nullable(String),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4), `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`sign_flag` Int8 `sign_flag` Int8,
INDEX connectorIndex connector TYPE bloom_filter GRANULARITY 1, INDEX connectorIndex connector TYPE bloom_filter GRANULARITY 1,
INDEX disputeStatusIndex dispute_status TYPE bloom_filter GRANULARITY 1, INDEX disputeStatusIndex dispute_status TYPE bloom_filter GRANULARITY 1,
INDEX disputeStageIndex dispute_stage TYPE bloom_filter GRANULARITY 1 INDEX disputeStageIndex dispute_stage TYPE bloom_filter GRANULARITY 1
) ENGINE = CollapsingMergeTree( ) ENGINE = CollapsingMergeTree(sign_flag) PARTITION BY toStartOfDay(created_at)
sign_flag
)
PARTITION BY toStartOfDay(created_at)
ORDER BY ORDER BY
(created_at, merchant_id, dispute_id) (created_at, merchant_id, dispute_id) TTL inserted_at + toIntervalMonth(18) SETTINGS index_granularity = 8192;
TTL created_at + toIntervalMonth(6)
;
CREATE MATERIALIZED VIEW kafka_parse_dispute TO dispute ( CREATE MATERIALIZED VIEW dispute_mv TO dispute (
`dispute_id` String, `dispute_id` String,
`amount` String, `dispute_amount` UInt32,
`currency` String, `currency` String,
`dispute_stage` LowCardinality(String), `dispute_stage` LowCardinality(String),
`dispute_status` LowCardinality(String), `dispute_status` LowCardinality(String),
@ -91,7 +85,7 @@ CREATE MATERIALIZED VIEW kafka_parse_dispute TO dispute (
) AS ) AS
SELECT SELECT
dispute_id, dispute_id,
amount, dispute_amount,
currency, currency,
dispute_stage, dispute_stage,
dispute_status, dispute_status,
@ -111,7 +105,29 @@ SELECT
evidence, evidence,
profile_id, profile_id,
merchant_connector_id, merchant_connector_id,
now() as inserted_at, now() AS inserted_at,
sign_flag sign_flag
FROM FROM
dispute_queue; dispute_queue
WHERE
length(_error) = 0;
CREATE MATERIALIZED VIEW dispute_parse_errors (
`topic` String,
`partition` Int64,
`offset` Int64,
`raw` String,
`error` String
) ENGINE = MergeTree
ORDER BY
(topic, partition, offset) SETTINGS index_granularity = 8192 AS
SELECT
_topic AS topic,
_partition AS partition,
_offset AS offset,
_raw_message AS raw,
_error AS error
FROM
dispute_queue
WHERE
length(_error) > 0;

View File

@ -1,28 +1,4 @@
CREATE TABLE CREATE TABLE outgoing_webhook_events_queue (
outgoing_webhook_events_queue (
`merchant_id` String,
`event_id` Nullable(String),
`event_type` LowCardinality(String),
`outgoing_webhook_event_type` LowCardinality(String),
`payment_id` Nullable(String),
`refund_id` Nullable(String),
`attempt_id` Nullable(String),
`dispute_id` Nullable(String),
`payment_method_id` Nullable(String),
`mandate_id` Nullable(String),
`content` Nullable(String),
`is_error` Bool,
`error` Nullable(String),
`created_at_timestamp` DateTime64(3),
`initial_attempt_id` Nullable(String)
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
kafka_topic_list = 'hyperswitch-outgoing-webhook-events',
kafka_group_name = 'hyper-c1',
kafka_format = 'JSONEachRow',
kafka_handle_error_mode = 'stream';
CREATE TABLE
outgoing_webhook_events_cluster (
`merchant_id` String, `merchant_id` String,
`event_id` String, `event_id` String,
`event_type` LowCardinality(String), `event_type` LowCardinality(String),
@ -36,23 +12,16 @@ CREATE TABLE
`content` Nullable(String), `content` Nullable(String),
`is_error` Bool, `is_error` Bool,
`error` Nullable(String), `error` Nullable(String),
`created_at_timestamp` DateTime64(3), `created_at_timestamp` DateTime64(3)
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4), ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
`initial_attempt_id` Nullable(String), kafka_topic_list = 'hyperswitch-outgoing-webhook-events',
INDEX eventIndex event_type TYPE bloom_filter GRANULARITY 1, kafka_group_name = 'hyper',
INDEX webhookeventIndex outgoing_webhook_event_type TYPE bloom_filter GRANULARITY 1 kafka_format = 'JSONEachRow',
) ENGINE = MergeTree PARTITION BY toStartOfDay(created_at_timestamp) kafka_handle_error_mode = 'stream';
ORDER BY (
created_at_timestamp,
merchant_id,
event_id,
event_type,
outgoing_webhook_event_type
) TTL inserted_at + toIntervalMonth(6);
CREATE MATERIALIZED VIEW outgoing_webhook_events_mv TO outgoing_webhook_events_cluster ( CREATE TABLE outgoing_webhook_events (
`merchant_id` String, `merchant_id` LowCardinality(String),
`event_id` Nullable(String), `event_id` String,
`event_type` LowCardinality(String), `event_type` LowCardinality(String),
`outgoing_webhook_event_type` LowCardinality(String), `outgoing_webhook_event_type` LowCardinality(String),
`payment_id` Nullable(String), `payment_id` Nullable(String),
@ -64,9 +33,56 @@ CREATE MATERIALIZED VIEW outgoing_webhook_events_mv TO outgoing_webhook_events_c
`content` Nullable(String), `content` Nullable(String),
`is_error` Bool, `is_error` Bool,
`error` Nullable(String), `error` Nullable(String),
`created_at_timestamp` DateTime64(3), `created_at` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4), `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`initial_attempt_id` Nullable(String) INDEX eventIndex event_type TYPE bloom_filter GRANULARITY 1,
INDEX webhookeventIndex outgoing_webhook_event_type TYPE bloom_filter GRANULARITY 1
) ENGINE = MergeTree PARTITION BY toStartOfDay(created_at)
ORDER BY
(
created_at,
merchant_id,
event_id,
event_type,
outgoing_webhook_event_type
) TTL inserted_at + toIntervalMonth(18) SETTINGS index_granularity = 8192;
CREATE TABLE outgoing_webhook_events_audit (
`merchant_id` LowCardinality(String),
`event_id` String,
`event_type` LowCardinality(String),
`outgoing_webhook_event_type` LowCardinality(String),
`payment_id` String,
`refund_id` Nullable(String),
`attempt_id` Nullable(String),
`dispute_id` Nullable(String),
`payment_method_id` Nullable(String),
`mandate_id` Nullable(String),
`content` Nullable(String),
`is_error` Bool,
`error` Nullable(String),
`created_at` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4)
) ENGINE = MergeTree PARTITION BY merchant_id
ORDER BY
(merchant_id, payment_id) TTL inserted_at + toIntervalMonth(18) SETTINGS index_granularity = 8192;
CREATE MATERIALIZED VIEW outgoing_webhook_events_mv TO outgoing_webhook_events (
`merchant_id` String,
`event_id` String,
`event_type` LowCardinality(String),
`outgoing_webhook_event_type` LowCardinality(String),
`payment_id` Nullable(String),
`refund_id` Nullable(String),
`attempt_id` Nullable(String),
`dispute_id` Nullable(String),
`payment_method_id` Nullable(String),
`mandate_id` Nullable(String),
`content` Nullable(String),
`is_error` Bool,
`error` Nullable(String),
`created_at` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4)
) AS ) AS
SELECT SELECT
merchant_id, merchant_id,
@ -82,12 +98,51 @@ SELECT
content, content,
is_error, is_error,
error, error,
created_at_timestamp, created_at_timestamp AS created_at,
now() AS inserted_at, now() AS inserted_at
initial_attempt_id
FROM FROM
outgoing_webhook_events_queue outgoing_webhook_events_queue
where length(_error) = 0; WHERE
length(_error) = 0;
CREATE MATERIALIZED VIEW outgoing_webhook_events_audit_mv TO outgoing_webhook_events_audit (
`merchant_id` String,
`event_id` String,
`event_type` LowCardinality(String),
`outgoing_webhook_event_type` LowCardinality(String),
`payment_id` String,
`refund_id` Nullable(String),
`attempt_id` Nullable(String),
`dispute_id` Nullable(String),
`payment_method_id` Nullable(String),
`mandate_id` Nullable(String),
`content` Nullable(String),
`is_error` Bool,
`error` Nullable(String),
`created_at` DateTime64(3),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4)
) AS
SELECT
merchant_id,
event_id,
event_type,
outgoing_webhook_event_type,
payment_id,
refund_id,
attempt_id,
dispute_id,
payment_method_id,
mandate_id,
content,
is_error,
error,
created_at_timestamp AS created_at,
now() AS inserted_at
FROM
outgoing_webhook_events_queue
WHERE
(length(_error) = 0)
AND (payment_id IS NOT NULL);
CREATE MATERIALIZED VIEW outgoing_webhook_parse_errors ( CREATE MATERIALIZED VIEW outgoing_webhook_parse_errors (
`topic` String, `topic` String,
@ -96,18 +151,15 @@ CREATE MATERIALIZED VIEW outgoing_webhook_parse_errors (
`raw` String, `raw` String,
`error` String `error` String
) ENGINE = MergeTree ) ENGINE = MergeTree
ORDER BY ( ORDER BY
topic, partition, (topic, partition, offset) SETTINGS index_granularity = 8192 AS
offset
) SETTINGS index_granularity = 8192 AS
SELECT SELECT
_topic AS topic, _topic AS topic,
_partition AS partition, _partition AS partition,
_offset AS _offset AS offset,
offset
,
_raw_message AS raw, _raw_message AS raw,
_error AS error _error AS error
FROM FROM
outgoing_webhook_events_queue outgoing_webhook_events_queue
WHERE length(_error) > 0; WHERE
length(_error) > 0;

View File

@ -1,4 +1,4 @@
CREATE TABLE payment_attempts_queue ( CREATE TABLE payment_attempt_queue (
`payment_id` String, `payment_id` String,
`merchant_id` String, `merchant_id` String,
`attempt_id` String, `attempt_id` String,
@ -41,13 +41,13 @@ CREATE TABLE payment_attempts_queue (
`sign_flag` Int8 `sign_flag` Int8
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092', ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
kafka_topic_list = 'hyperswitch-payment-attempt-events', kafka_topic_list = 'hyperswitch-payment-attempt-events',
kafka_group_name = 'hyper-c1', kafka_group_name = 'hyper',
kafka_format = 'JSONEachRow', kafka_format = 'JSONEachRow',
kafka_handle_error_mode = 'stream'; kafka_handle_error_mode = 'stream';
CREATE TABLE payment_attempt_dist ( CREATE TABLE payment_attempts (
`payment_id` String, `payment_id` String,
`merchant_id` String, `merchant_id` LowCardinality(String),
`attempt_id` String, `attempt_id` String,
`status` LowCardinality(String), `status` LowCardinality(String),
`amount` Nullable(UInt32), `amount` Nullable(UInt32),
@ -92,17 +92,11 @@ CREATE TABLE payment_attempt_dist (
INDEX authenticationTypeIndex authentication_type TYPE bloom_filter GRANULARITY 1, INDEX authenticationTypeIndex authentication_type TYPE bloom_filter GRANULARITY 1,
INDEX currencyIndex currency TYPE bloom_filter GRANULARITY 1, INDEX currencyIndex currency TYPE bloom_filter GRANULARITY 1,
INDEX statusIndex status TYPE bloom_filter GRANULARITY 1 INDEX statusIndex status TYPE bloom_filter GRANULARITY 1
) ENGINE = CollapsingMergeTree( ) ENGINE = CollapsingMergeTree(sign_flag) PARTITION BY toStartOfDay(created_at)
sign_flag
)
PARTITION BY toStartOfDay(created_at)
ORDER BY ORDER BY
(created_at, merchant_id, attempt_id) (created_at, merchant_id, attempt_id) TTL created_at + toIntervalMonth(18) SETTINGS index_granularity = 8192;
TTL created_at + toIntervalMonth(6)
;
CREATE MATERIALIZED VIEW payment_attempt_mv TO payment_attempts (
CREATE MATERIALIZED VIEW kafka_parse_pa TO payment_attempt_dist (
`payment_id` String, `payment_id` String,
`merchant_id` String, `merchant_id` String,
`attempt_id` String, `attempt_id` String,
@ -185,8 +179,9 @@ SELECT
unified_code, unified_code,
unified_message, unified_message,
mandate_data, mandate_data,
now() as inserted_at, now() AS inserted_at,
sign_flag sign_flag
FROM FROM
payment_attempts_queue; payment_attempt_queue
WHERE
length(_error) = 0;

View File

@ -1,34 +1,5 @@
CREATE TABLE payment_intents_queue ( CREATE TABLE payment_intents_queue
`payment_id` String, (
`merchant_id` String,
`status` LowCardinality(String),
`amount` UInt32,
`currency` LowCardinality(Nullable(String)),
`amount_captured` Nullable(UInt32),
`customer_id` Nullable(String),
`description` Nullable(String),
`return_url` Nullable(String),
`connector_id` LowCardinality(Nullable(String)),
`statement_descriptor_name` Nullable(String),
`statement_descriptor_suffix` Nullable(String),
`setup_future_usage` LowCardinality(Nullable(String)),
`off_session` Nullable(Bool),
`client_secret` Nullable(String),
`active_attempt_id` String,
`business_country` String,
`business_label` String,
`modified_at` DateTime CODEC(T64, LZ4),
`created_at` DateTime CODEC(T64, LZ4),
`last_synced` Nullable(DateTime) CODEC(T64, LZ4),
`sign_flag` Int8
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
kafka_topic_list = 'hyperswitch-payment-intent-events',
kafka_group_name = 'hyper-c1',
kafka_format = 'JSONEachRow',
kafka_handle_error_mode = 'stream';
CREATE TABLE payment_intents_dist (
`payment_id` String, `payment_id` String,
`merchant_id` String, `merchant_id` String,
`status` LowCardinality(String), `status` LowCardinality(String),
@ -47,6 +18,38 @@ CREATE TABLE payment_intents_dist (
`active_attempt_id` String, `active_attempt_id` String,
`business_country` LowCardinality(String), `business_country` LowCardinality(String),
`business_label` String, `business_label` String,
`attempt_count` UInt8,
`modified_at` DateTime CODEC(T64, LZ4),
`created_at` DateTime CODEC(T64, LZ4),
`last_synced` Nullable(DateTime) CODEC(T64, LZ4),
`sign_flag` Int8
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
kafka_topic_list = 'hyperswitch-payment-intent-events',
kafka_group_name = 'hyper',
kafka_format = 'JSONEachRow',
kafka_handle_error_mode = 'stream';
CREATE TABLE payment_intents
(
`payment_id` String,
`merchant_id` LowCardinality(String),
`status` LowCardinality(String),
`amount` UInt32,
`currency` LowCardinality(Nullable(String)),
`amount_captured` Nullable(UInt32),
`customer_id` Nullable(String),
`description` Nullable(String),
`return_url` Nullable(String),
`connector_id` LowCardinality(Nullable(String)),
`statement_descriptor_name` Nullable(String),
`statement_descriptor_suffix` Nullable(String),
`setup_future_usage` LowCardinality(Nullable(String)),
`off_session` Nullable(Bool),
`client_secret` Nullable(String),
`active_attempt_id` String,
`business_country` LowCardinality(String),
`business_label` String,
`attempt_count` UInt8,
`modified_at` DateTime DEFAULT now() CODEC(T64, LZ4), `modified_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`created_at` DateTime DEFAULT now() CODEC(T64, LZ4), `created_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`last_synced` Nullable(DateTime) CODEC(T64, LZ4), `last_synced` Nullable(DateTime) CODEC(T64, LZ4),
@ -55,16 +58,15 @@ CREATE TABLE payment_intents_dist (
INDEX connectorIndex connector_id TYPE bloom_filter GRANULARITY 1, INDEX connectorIndex connector_id TYPE bloom_filter GRANULARITY 1,
INDEX currencyIndex currency TYPE bloom_filter GRANULARITY 1, INDEX currencyIndex currency TYPE bloom_filter GRANULARITY 1,
INDEX statusIndex status TYPE bloom_filter GRANULARITY 1 INDEX statusIndex status TYPE bloom_filter GRANULARITY 1
) ENGINE = CollapsingMergeTree(
sign_flag
) )
ENGINE = CollapsingMergeTree(sign_flag)
PARTITION BY toStartOfDay(created_at) PARTITION BY toStartOfDay(created_at)
ORDER BY ORDER BY (created_at, merchant_id, payment_id)
(created_at, merchant_id, payment_id) TTL created_at + toIntervalMonth(18)
TTL created_at + toIntervalMonth(6) SETTINGS index_granularity = 8192;
;
CREATE MATERIALIZED VIEW kafka_parse_payment_intent TO payment_intents_dist ( CREATE MATERIALIZED VIEW payment_intents_mv TO payment_intents
(
`payment_id` String, `payment_id` String,
`merchant_id` String, `merchant_id` String,
`status` LowCardinality(String), `status` LowCardinality(String),
@ -83,6 +85,7 @@ CREATE MATERIALIZED VIEW kafka_parse_payment_intent TO payment_intents_dist (
`active_attempt_id` String, `active_attempt_id` String,
`business_country` LowCardinality(String), `business_country` LowCardinality(String),
`business_label` String, `business_label` String,
`attempt_count` UInt8,
`modified_at` DateTime64(3), `modified_at` DateTime64(3),
`created_at` DateTime64(3), `created_at` DateTime64(3),
`last_synced` Nullable(DateTime64(3)), `last_synced` Nullable(DateTime64(3)),
@ -108,9 +111,10 @@ SELECT
active_attempt_id, active_attempt_id,
business_country, business_country,
business_label, business_label,
attempt_count,
modified_at, modified_at,
created_at, created_at,
last_synced, last_synced,
now() as inserted_at, now() AS inserted_at,
sign_flag sign_flag
FROM payment_intents_queue; FROM payment_intents_queue;

View File

@ -25,13 +25,13 @@ CREATE TABLE payout_queue (
`is_eligible` Nullable(Bool), `is_eligible` Nullable(Bool),
`error_message` Nullable(String), `error_message` Nullable(String),
`error_code` Nullable(String), `error_code` Nullable(String),
`business_country` Nullable(LowCardinality(String)), `business_country` LowCardinality(Nullable(String)),
`business_label` Nullable(String), `business_label` Nullable(String),
`merchant_connector_id` Nullable(String), `merchant_connector_id` Nullable(String),
`sign_flag` Int8 `sign_flag` Int8
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092', ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
kafka_topic_list = 'hyperswitch-payout-events', kafka_topic_list = 'hyperswitch-payout-events',
kafka_group_name = 'hyper-c1', kafka_group_name = 'hyper',
kafka_format = 'JSONEachRow', kafka_format = 'JSONEachRow',
kafka_handle_error_mode = 'stream'; kafka_handle_error_mode = 'stream';
@ -42,7 +42,7 @@ CREATE TABLE payout (
`customer_id` String, `customer_id` String,
`address_id` String, `address_id` String,
`profile_id` String, `profile_id` String,
`payout_method_id` String, `payout_method_id` Nullable(String),
`payout_type` LowCardinality(String), `payout_type` LowCardinality(String),
`amount` UInt64, `amount` UInt64,
`destination_currency` LowCardinality(String), `destination_currency` LowCardinality(String),
@ -62,7 +62,7 @@ CREATE TABLE payout (
`is_eligible` Nullable(Bool), `is_eligible` Nullable(Bool),
`error_message` Nullable(String), `error_message` Nullable(String),
`error_code` Nullable(String), `error_code` Nullable(String),
`business_country` Nullable(LowCardinality(String)), `business_country` LowCardinality(Nullable(String)),
`business_label` Nullable(String), `business_label` Nullable(String),
`merchant_connector_id` Nullable(String), `merchant_connector_id` Nullable(String),
`inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4), `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
@ -75,16 +75,16 @@ CREATE TABLE payout (
INDEX businessCountryIndex business_country TYPE bloom_filter GRANULARITY 1 INDEX businessCountryIndex business_country TYPE bloom_filter GRANULARITY 1
) ENGINE = CollapsingMergeTree(sign_flag) PARTITION BY toStartOfDay(created_at) ) ENGINE = CollapsingMergeTree(sign_flag) PARTITION BY toStartOfDay(created_at)
ORDER BY ORDER BY
(created_at, merchant_id, payout_id) TTL created_at + toIntervalMonth(6); (created_at, merchant_id, payout_id) TTL created_at + toIntervalMonth(6) SETTINGS index_granularity = 8192;
CREATE MATERIALIZED VIEW kafka_parse_payout TO payout ( CREATE MATERIALIZED VIEW payout_mv TO payout (
`payout_id` String, `payout_id` String,
`payout_attempt_id` String, `payout_attempt_id` String,
`merchant_id` String, `merchant_id` String,
`customer_id` String, `customer_id` String,
`address_id` String, `address_id` String,
`profile_id` String, `profile_id` String,
`payout_method_id` String, `payout_method_id` Nullable(String),
`payout_type` LowCardinality(String), `payout_type` LowCardinality(String),
`amount` UInt64, `amount` UInt64,
`destination_currency` LowCardinality(String), `destination_currency` LowCardinality(String),
@ -95,8 +95,8 @@ CREATE MATERIALIZED VIEW kafka_parse_payout TO payout (
`return_url` Nullable(String), `return_url` Nullable(String),
`entity_type` LowCardinality(String), `entity_type` LowCardinality(String),
`metadata` Nullable(String), `metadata` Nullable(String),
`created_at` DateTime64(3), `created_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`last_modified_at` DateTime64(3), `last_modified_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`attempt_count` UInt16, `attempt_count` UInt16,
`status` LowCardinality(String), `status` LowCardinality(String),
`connector` Nullable(String), `connector` Nullable(String),
@ -104,11 +104,11 @@ CREATE MATERIALIZED VIEW kafka_parse_payout TO payout (
`is_eligible` Nullable(Bool), `is_eligible` Nullable(Bool),
`error_message` Nullable(String), `error_message` Nullable(String),
`error_code` Nullable(String), `error_code` Nullable(String),
`business_country` Nullable(LowCardinality(String)), `business_country` LowCardinality(Nullable(String)),
`business_label` Nullable(String), `business_label` Nullable(String),
`merchant_connector_id` Nullable(String), `merchant_connector_id` Nullable(String),
`inserted_at` DateTime64(3), `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4),
`sign_flag` Int8 `sign_flag` Int8,
) AS ) AS
SELECT SELECT
payout_id, payout_id,

View File

@ -19,21 +19,20 @@ CREATE TABLE refund_queue (
`description` Nullable(String), `description` Nullable(String),
`refund_reason` Nullable(String), `refund_reason` Nullable(String),
`refund_error_code` Nullable(String), `refund_error_code` Nullable(String),
`created_at` DateTime CODEC(T64, LZ4), `created_at` DateTime,
`modified_at` DateTime CODEC(T64, LZ4), `modified_at` DateTime,
`sign_flag` Int8 `sign_flag` Int8
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092', ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
kafka_topic_list = 'hyperswitch-refund-events', kafka_topic_list = 'hyperswitch-refund-events',
kafka_group_name = 'hyper-c1', kafka_group_name = 'hyper',
kafka_format = 'JSONEachRow', kafka_format = 'JSONEachRow',
kafka_handle_error_mode = 'stream'; kafka_handle_error_mode = 'stream';
CREATE TABLE refunds (
CREATE TABLE refund_dist (
`internal_reference_id` String, `internal_reference_id` String,
`refund_id` String, `refund_id` String,
`payment_id` String, `payment_id` String,
`merchant_id` String, `merchant_id` LowCardinality(String),
`connector_transaction_id` String, `connector_transaction_id` String,
`connector` LowCardinality(Nullable(String)), `connector` LowCardinality(Nullable(String)),
`connector_refund_id` Nullable(String), `connector_refund_id` Nullable(String),
@ -58,16 +57,11 @@ CREATE TABLE refund_dist (
INDEX refundTypeIndex refund_type TYPE bloom_filter GRANULARITY 1, INDEX refundTypeIndex refund_type TYPE bloom_filter GRANULARITY 1,
INDEX currencyIndex currency TYPE bloom_filter GRANULARITY 1, INDEX currencyIndex currency TYPE bloom_filter GRANULARITY 1,
INDEX statusIndex refund_status TYPE bloom_filter GRANULARITY 1 INDEX statusIndex refund_status TYPE bloom_filter GRANULARITY 1
) ENGINE = CollapsingMergeTree( ) ENGINE = CollapsingMergeTree(sign_flag) PARTITION BY toStartOfDay(created_at)
sign_flag
)
PARTITION BY toStartOfDay(created_at)
ORDER BY ORDER BY
(created_at, merchant_id, refund_id) (created_at, merchant_id, refund_id) TTL created_at + toIntervalMonth(18) SETTINGS index_granularity = 8192;
TTL created_at + toIntervalMonth(6)
;
CREATE MATERIALIZED VIEW kafka_parse_refund TO refund_dist ( CREATE MATERIALIZED VIEW refund_mv TO refunds (
`internal_reference_id` String, `internal_reference_id` String,
`refund_id` String, `refund_id` String,
`payment_id` String, `payment_id` String,
@ -116,6 +110,9 @@ SELECT
refund_error_code, refund_error_code,
created_at, created_at,
modified_at, modified_at,
now() as inserted_at, now() AS inserted_at,
sign_flag sign_flag
FROM refund_queue; FROM
refund_queue
WHERE
length(_error) = 0;

View File

@ -295,3 +295,29 @@ services:
- "8001:8001" - "8001:8001"
volumes: volumes:
- redisinsight_store:/db - redisinsight_store:/db
hyperswitch-control-center:
image: juspaydotin/hyperswitch-control-center:latest
networks:
- router_net
ports:
- "9000:9000"
environment:
apiBaseUrl: http://localhost:8080
sdkBaseUrl: http://localhost:9050/HyperLoader.js
hyperswitch-web-sdk:
build:
dockerfile_inline: |
FROM node:lts
RUN git clone https://github.com/juspay/hyperswitch-web.git --depth 1
WORKDIR hyperswitch-web
RUN npm i --force
command: bash -c 'npm run re:build && npx run webpack serve --config webpack.dev.js --host 0.0.0.0'
ports:
- "9050:9050"
environment:
sdkEnv: local
envSdkUrl: http://localhost:9050
envBackendUrl: http://localhost:8080
envLoggingUrl: http://localhost:8207

View File

@ -3,6 +3,7 @@ version: "3.8"
volumes: volumes:
pg_data: pg_data:
redisinsight_store: redisinsight_store:
ckh_data:
networks: networks:
router_net: router_net:
@ -321,12 +322,14 @@ services:
KAFKA_CLUSTERS_0_JMXPORT: 9997 KAFKA_CLUSTERS_0_JMXPORT: 9997
clickhouse-server: clickhouse-server:
image: clickhouse/clickhouse-server:23.5 image: clickhouse/clickhouse-server:24.3
networks: networks:
- router_net - router_net
ports: ports:
- "9000" - "9000"
- "8123:8123" - "8123:8123"
volumes:
- ./crates/analytics/docs/clickhouse/scripts:/docker-entrypoint-initdb.d
profiles: profiles:
- analytics - analytics
ulimits: ulimits: