diff --git a/crates/analytics/docs/clickhouse/scripts/api_events_v2.sql b/crates/analytics/docs/clickhouse/scripts/api_events.sql similarity index 83% rename from crates/analytics/docs/clickhouse/scripts/api_events_v2.sql rename to crates/analytics/docs/clickhouse/scripts/api_events.sql index 33f158ce48..c3fc3d7b06 100644 --- a/crates/analytics/docs/clickhouse/scripts/api_events_v2.sql +++ b/crates/analytics/docs/clickhouse/scripts/api_events.sql @@ -1,4 +1,4 @@ -CREATE TABLE api_events_v2_queue ( +CREATE TABLE api_events_queue ( `merchant_id` String, `payment_id` Nullable(String), `refund_id` Nullable(String), @@ -14,12 +14,15 @@ CREATE TABLE api_events_v2_queue ( `api_auth_type` LowCardinality(String), `request` String, `response` Nullable(String), + `error` Nullable(String), `authentication_data` Nullable(String), `status_code` UInt32, - `created_at` DateTime CODEC(T64, LZ4), + `created_at_timestamp` DateTime64(3), `latency` UInt128, `user_agent` String, `ip_addr` String, + `hs_latency` Nullable(UInt128), + `http_method` LowCardinality(String), `url_path` String ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092', kafka_topic_list = 'hyperswitch-api-log-events', @@ -28,7 +31,7 @@ kafka_format = 'JSONEachRow', kafka_handle_error_mode = 'stream'; -CREATE TABLE api_events_v2_dist ( +CREATE TABLE api_events_dist ( `merchant_id` String, `payment_id` Nullable(String), `refund_id` Nullable(String), @@ -44,13 +47,15 @@ CREATE TABLE api_events_v2_dist ( `api_auth_type` LowCardinality(String), `request` String, `response` Nullable(String), + `error` Nullable(String), `authentication_data` Nullable(String), `status_code` UInt32, - `created_at` DateTime CODEC(T64, LZ4), - `inserted_at` DateTime CODEC(T64, LZ4), + `created_at_timestamp` DateTime64(3), `latency` UInt128, `user_agent` String, `ip_addr` String, + `hs_latency` Nullable(UInt128), + `http_method` LowCardinality(String), `url_path` String, INDEX flowIndex flow_type TYPE bloom_filter GRANULARITY 1, INDEX apiIndex api_flow TYPE bloom_filter GRANULARITY 1, @@ -62,7 +67,7 @@ ORDER BY TTL created_at + toIntervalMonth(6) ; -CREATE MATERIALIZED VIEW api_events_v2_mv TO api_events_v2_dist ( +CREATE MATERIALIZED VIEW api_events_mv TO api_events_dist ( `merchant_id` String, `payment_id` Nullable(String), `refund_id` Nullable(String), @@ -78,13 +83,15 @@ CREATE MATERIALIZED VIEW api_events_v2_mv TO api_events_v2_dist ( `api_auth_type` LowCardinality(String), `request` String, `response` Nullable(String), + `error` Nullable(String), `authentication_data` Nullable(String), `status_code` UInt32, - `created_at` DateTime CODEC(T64, LZ4), - `inserted_at` DateTime CODEC(T64, LZ4), + `created_at_timestamp` DateTime64(3), `latency` UInt128, `user_agent` String, `ip_addr` String, + `hs_latency` Nullable(UInt128), + `http_method` LowCardinality(String), `url_path` String ) AS SELECT @@ -103,16 +110,19 @@ SELECT api_auth_type, request, response, + error, authentication_data, status_code, - created_at, + created_at_timestamp, now() as inserted_at, latency, user_agent, ip_addr, + hs_latency, + http_method, url_path FROM - api_events_v2_queue + api_events_queue where length(_error) = 0; @@ -133,6 +143,6 @@ SELECT _offset AS offset, _raw_message AS raw, _error AS error -FROM api_events_v2_queue +FROM api_events_queue WHERE length(_error) > 0 ; diff --git a/crates/analytics/docs/clickhouse/scripts/connector_events.sql b/crates/analytics/docs/clickhouse/scripts/connector_events.sql new file mode 100644 index 0000000000..5821cd0355 --- /dev/null +++ b/crates/analytics/docs/clickhouse/scripts/connector_events.sql @@ -0,0 +1,97 @@ +CREATE TABLE connector_events_queue ( + `merchant_id` String, + `payment_id` Nullable(String), + `connector_name` LowCardinality(String), + `request_id` String, + `flow` LowCardinality(String), + `request` String, + `response` Nullable(String), + `error` Nullable(String), + `status_code` UInt32, + `created_at` DateTime64(3), + `latency` UInt128, + `method` LowCardinality(String) +) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092', +kafka_topic_list = 'hyperswitch-connector-api-events', +kafka_group_name = 'hyper-c1', +kafka_format = 'JSONEachRow', +kafka_handle_error_mode = 'stream'; + + +CREATE TABLE connector_events_dist ( + `merchant_id` String, + `payment_id` Nullable(String), + `connector_name` LowCardinality(String), + `request_id` String, + `flow` LowCardinality(String), + `request` String, + `response` Nullable(String), + `error` Nullable(String), + `status_code` UInt32, + `created_at` DateTime64(3), + `inserted_at` DateTime64(3), + `latency` UInt128, + `method` LowCardinality(String), + INDEX flowIndex flowTYPE bloom_filter GRANULARITY 1, + INDEX connectorIndex connector_name TYPE bloom_filter GRANULARITY 1, + INDEX statusIndex status_code TYPE bloom_filter GRANULARITY 1 +) ENGINE = MergeTree +PARTITION BY toStartOfDay(created_at) +ORDER BY + (created_at, merchant_id, flow_type, status_code, api_flow) +TTL created_at + toIntervalMonth(6) +; + +CREATE MATERIALIZED VIEW connector_events_mv TO connector_events_dist ( + `merchant_id` String, + `payment_id` Nullable(String), + `connector_name` LowCardinality(String), + `request_id` String, + `flow` LowCardinality(String), + `request` String, + `response` Nullable(String), + `error` Nullable(String), + `status_code` UInt32, + `created_at` DateTime64(3), + `latency` UInt128, + `method` LowCardinality(String) +) AS +SELECT + merchant_id, + payment_id, + connector_name, + request_id, + flow, + request, + response, + error, + status_code, + created_at, + now() as inserted_at, + latency, + method, +FROM + connector_events_queue +where length(_error) = 0; + + +CREATE MATERIALIZED VIEW connector_events_parse_errors +( + `topic` String, + `partition` Int64, + `offset` Int64, + `raw` String, + `error` String +) +ENGINE = MergeTree +ORDER BY (topic, partition, offset) +SETTINGS index_granularity = 8192 AS +SELECT + _topic AS topic, + _partition AS partition, + _offset AS offset, + _raw_message AS raw, + _error AS error +FROM connector_events_queue +WHERE length(_error) > 0 +; diff --git a/crates/common_utils/src/events.rs b/crates/common_utils/src/events.rs index c9efbb73c2..6bbf78afe4 100644 --- a/crates/common_utils/src/events.rs +++ b/crates/common_utils/src/events.rs @@ -40,7 +40,10 @@ pub enum ApiEventsType { }, Routing, ResourceListAPI, - PaymentRedirectionResponse, + PaymentRedirectionResponse { + connector: Option, + payment_id: Option, + }, Gsm, // TODO: This has to be removed once the corresponding apiEventTypes are created Miscellaneous, diff --git a/crates/router/src/events/api_logs.rs b/crates/router/src/events/api_logs.rs index 42017f4500..78a66d2f04 100644 --- a/crates/router/src/events/api_logs.rs +++ b/crates/router/src/events/api_logs.rs @@ -116,7 +116,6 @@ impl_misc_api_event_type!( AttachEvidenceRequest, DisputeId, PaymentLinkFormData, - PaymentsRedirectResponseData, ConfigUpdate ); @@ -131,3 +130,15 @@ impl_misc_api_event_type!( DummyConnectorRefundResponse, DummyConnectorRefundRequest ); + +impl ApiEventMetric for PaymentsRedirectResponseData { + fn get_api_event_type(&self) -> Option { + Some(ApiEventsType::PaymentRedirectionResponse { + connector: self.connector.clone(), + payment_id: match &self.resource_id { + api_models::payments::PaymentIdType::PaymentIntentId(id) => Some(id.clone()), + _ => None, + }, + }) + } +} diff --git a/crates/router/src/services/api.rs b/crates/router/src/services/api.rs index 8298d9a105..ba3607fb7d 100644 --- a/crates/router/src/services/api.rs +++ b/crates/router/src/services/api.rs @@ -377,7 +377,17 @@ where req.connector.clone(), std::any::type_name::(), masked_request_body, - None, + response + .as_ref() + .map(|response| { + response + .as_ref() + .map_or_else(|value| value, |value| value) + .response + .escape_ascii() + .to_string() + }) + .ok(), request_url, request_method, req.payment_id.clone(), diff --git a/crates/router/src/services/kafka.rs b/crates/router/src/services/kafka.rs index 5a6d7043e6..2b29a61b4a 100644 --- a/crates/router/src/services/kafka.rs +++ b/crates/router/src/services/kafka.rs @@ -8,12 +8,9 @@ use rdkafka::{ }; use crate::events::EventType; -mod api_event; -pub mod outgoing_request; mod payment_attempt; mod payment_intent; mod refund; -pub use api_event::{ApiCallEventType, ApiEvents, ApiEventsType}; use data_models::payments::{payment_attempt::PaymentAttempt, PaymentIntent}; use diesel_models::refund::Refund; use serde::Serialize; @@ -300,11 +297,6 @@ impl KafkaProducer { }) } - pub async fn log_api_event(&self, event: &ApiEvents) -> MQResult<()> { - self.log_kafka_event(&self.api_logs_topic, event) - .attach_printable_lazy(|| format!("Failed to add api log event {event:?}")) - } - pub fn get_topic(&self, event: EventType) -> &str { match event { EventType::ApiLogs => &self.api_logs_topic, diff --git a/crates/router/src/services/kafka/api_event.rs b/crates/router/src/services/kafka/api_event.rs deleted file mode 100644 index 7de2719159..0000000000 --- a/crates/router/src/services/kafka/api_event.rs +++ /dev/null @@ -1,108 +0,0 @@ -use api_models::enums as api_enums; -use serde::{Deserialize, Serialize}; -use time::OffsetDateTime; - -#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -#[serde(tag = "flow_type")] -pub enum ApiEventsType { - Payment { - payment_id: String, - }, - Refund { - payment_id: String, - refund_id: String, - }, - Default, - PaymentMethod { - payment_method_id: String, - payment_method: Option, - payment_method_type: Option, - }, - Customer { - customer_id: String, - }, - User { - //specified merchant_id will overridden on global defined - merchant_id: String, - user_id: String, - }, - Webhooks { - connector: String, - payment_id: Option, - }, - OutgoingEvent, -} - -#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -pub struct ApiEvents { - pub api_name: String, - pub request_id: Option, - //It is require to solve ambiquity in case of event_type is User - #[serde(skip_serializing_if = "Option::is_none")] - pub merchant_id: Option, - pub request: String, - pub response: String, - pub status_code: u16, - #[serde(with = "time::serde::timestamp")] - pub created_at: OffsetDateTime, - pub latency: u128, - //conflicting fields underlying enums will be used - #[serde(flatten)] - pub event_type: ApiEventsType, - pub user_agent: Option, - pub ip_addr: Option, - pub url_path: Option, - pub api_event_type: Option, -} - -#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -pub enum ApiCallEventType { - IncomingApiEvent, - OutgoingApiEvent, -} - -impl super::KafkaMessage for ApiEvents { - fn key(&self) -> String { - match &self.event_type { - ApiEventsType::Payment { payment_id } => format!( - "{}_{}", - self.merchant_id - .as_ref() - .unwrap_or(&"default_merchant_id".to_string()), - payment_id - ), - ApiEventsType::Refund { - payment_id, - refund_id, - } => format!("{payment_id}_{refund_id}"), - ApiEventsType::Default => "key".to_string(), - ApiEventsType::PaymentMethod { - payment_method_id, - payment_method, - payment_method_type, - } => format!( - "{:?}_{:?}_{:?}", - payment_method_id.clone(), - payment_method.clone(), - payment_method_type.clone(), - ), - ApiEventsType::Customer { customer_id } => customer_id.to_string(), - ApiEventsType::User { - merchant_id, - user_id, - } => format!("{}_{}", merchant_id, user_id), - ApiEventsType::Webhooks { - connector, - payment_id, - } => format!( - "webhook_{}_{connector}", - payment_id.clone().unwrap_or_default() - ), - ApiEventsType::OutgoingEvent => "outgoing_event".to_string(), - } - } - - fn creation_timestamp(&self) -> Option { - Some(self.created_at.unix_timestamp()) - } -} diff --git a/crates/router/src/services/kafka/outgoing_request.rs b/crates/router/src/services/kafka/outgoing_request.rs deleted file mode 100644 index bb09fe91fe..0000000000 --- a/crates/router/src/services/kafka/outgoing_request.rs +++ /dev/null @@ -1,19 +0,0 @@ -use reqwest::Url; - -pub struct OutgoingRequest { - pub url: Url, - pub latency: u128, -} - -// impl super::KafkaMessage for OutgoingRequest { -// fn key(&self) -> String { -// format!( -// "{}_{}", - -// ) -// } - -// fn creation_timestamp(&self) -> Option { -// Some(self.created_at.unix_timestamp()) -// } -// }