fix(analytics): added response to the connector outgoing event (#3129)

Co-authored-by: harsh-sharma-juspay <125131007+harsh-sharma-juspay@users.noreply.github.com>
Co-authored-by: Sampras lopes <lsampras@pm.me>
This commit is contained in:
Sagar naik
2024-01-05 18:58:55 +05:30
committed by GitHub
parent 00008c16c1
commit d152c3a1ca
8 changed files with 145 additions and 149 deletions

View File

@ -1,4 +1,4 @@
CREATE TABLE api_events_v2_queue (
CREATE TABLE api_events_queue (
`merchant_id` String,
`payment_id` Nullable(String),
`refund_id` Nullable(String),
@ -14,12 +14,15 @@ CREATE TABLE api_events_v2_queue (
`api_auth_type` LowCardinality(String),
`request` String,
`response` Nullable(String),
`error` Nullable(String),
`authentication_data` Nullable(String),
`status_code` UInt32,
`created_at` DateTime CODEC(T64, LZ4),
`created_at_timestamp` DateTime64(3),
`latency` UInt128,
`user_agent` String,
`ip_addr` String,
`hs_latency` Nullable(UInt128),
`http_method` LowCardinality(String),
`url_path` String
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
kafka_topic_list = 'hyperswitch-api-log-events',
@ -28,7 +31,7 @@ kafka_format = 'JSONEachRow',
kafka_handle_error_mode = 'stream';
CREATE TABLE api_events_v2_dist (
CREATE TABLE api_events_dist (
`merchant_id` String,
`payment_id` Nullable(String),
`refund_id` Nullable(String),
@ -44,13 +47,15 @@ CREATE TABLE api_events_v2_dist (
`api_auth_type` LowCardinality(String),
`request` String,
`response` Nullable(String),
`error` Nullable(String),
`authentication_data` Nullable(String),
`status_code` UInt32,
`created_at` DateTime CODEC(T64, LZ4),
`inserted_at` DateTime CODEC(T64, LZ4),
`created_at_timestamp` DateTime64(3),
`latency` UInt128,
`user_agent` String,
`ip_addr` String,
`hs_latency` Nullable(UInt128),
`http_method` LowCardinality(String),
`url_path` String,
INDEX flowIndex flow_type TYPE bloom_filter GRANULARITY 1,
INDEX apiIndex api_flow TYPE bloom_filter GRANULARITY 1,
@ -62,7 +67,7 @@ ORDER BY
TTL created_at + toIntervalMonth(6)
;
CREATE MATERIALIZED VIEW api_events_v2_mv TO api_events_v2_dist (
CREATE MATERIALIZED VIEW api_events_mv TO api_events_dist (
`merchant_id` String,
`payment_id` Nullable(String),
`refund_id` Nullable(String),
@ -78,13 +83,15 @@ CREATE MATERIALIZED VIEW api_events_v2_mv TO api_events_v2_dist (
`api_auth_type` LowCardinality(String),
`request` String,
`response` Nullable(String),
`error` Nullable(String),
`authentication_data` Nullable(String),
`status_code` UInt32,
`created_at` DateTime CODEC(T64, LZ4),
`inserted_at` DateTime CODEC(T64, LZ4),
`created_at_timestamp` DateTime64(3),
`latency` UInt128,
`user_agent` String,
`ip_addr` String,
`hs_latency` Nullable(UInt128),
`http_method` LowCardinality(String),
`url_path` String
) AS
SELECT
@ -103,16 +110,19 @@ SELECT
api_auth_type,
request,
response,
error,
authentication_data,
status_code,
created_at,
created_at_timestamp,
now() as inserted_at,
latency,
user_agent,
ip_addr,
hs_latency,
http_method,
url_path
FROM
api_events_v2_queue
api_events_queue
where length(_error) = 0;
@ -133,6 +143,6 @@ SELECT
_offset AS offset,
_raw_message AS raw,
_error AS error
FROM api_events_v2_queue
FROM api_events_queue
WHERE length(_error) > 0
;

View File

@ -0,0 +1,97 @@
CREATE TABLE connector_events_queue (
`merchant_id` String,
`payment_id` Nullable(String),
`connector_name` LowCardinality(String),
`request_id` String,
`flow` LowCardinality(String),
`request` String,
`response` Nullable(String),
`error` Nullable(String),
`status_code` UInt32,
`created_at` DateTime64(3),
`latency` UInt128,
`method` LowCardinality(String)
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
kafka_topic_list = 'hyperswitch-connector-api-events',
kafka_group_name = 'hyper-c1',
kafka_format = 'JSONEachRow',
kafka_handle_error_mode = 'stream';
CREATE TABLE connector_events_dist (
`merchant_id` String,
`payment_id` Nullable(String),
`connector_name` LowCardinality(String),
`request_id` String,
`flow` LowCardinality(String),
`request` String,
`response` Nullable(String),
`error` Nullable(String),
`status_code` UInt32,
`created_at` DateTime64(3),
`inserted_at` DateTime64(3),
`latency` UInt128,
`method` LowCardinality(String),
INDEX flowIndex flowTYPE bloom_filter GRANULARITY 1,
INDEX connectorIndex connector_name TYPE bloom_filter GRANULARITY 1,
INDEX statusIndex status_code TYPE bloom_filter GRANULARITY 1
) ENGINE = MergeTree
PARTITION BY toStartOfDay(created_at)
ORDER BY
(created_at, merchant_id, flow_type, status_code, api_flow)
TTL created_at + toIntervalMonth(6)
;
CREATE MATERIALIZED VIEW connector_events_mv TO connector_events_dist (
`merchant_id` String,
`payment_id` Nullable(String),
`connector_name` LowCardinality(String),
`request_id` String,
`flow` LowCardinality(String),
`request` String,
`response` Nullable(String),
`error` Nullable(String),
`status_code` UInt32,
`created_at` DateTime64(3),
`latency` UInt128,
`method` LowCardinality(String)
) AS
SELECT
merchant_id,
payment_id,
connector_name,
request_id,
flow,
request,
response,
error,
status_code,
created_at,
now() as inserted_at,
latency,
method,
FROM
connector_events_queue
where length(_error) = 0;
CREATE MATERIALIZED VIEW connector_events_parse_errors
(
`topic` String,
`partition` Int64,
`offset` Int64,
`raw` String,
`error` String
)
ENGINE = MergeTree
ORDER BY (topic, partition, offset)
SETTINGS index_granularity = 8192 AS
SELECT
_topic AS topic,
_partition AS partition,
_offset AS offset,
_raw_message AS raw,
_error AS error
FROM connector_events_queue
WHERE length(_error) > 0
;

View File

@ -40,7 +40,10 @@ pub enum ApiEventsType {
},
Routing,
ResourceListAPI,
PaymentRedirectionResponse,
PaymentRedirectionResponse {
connector: Option<String>,
payment_id: Option<String>,
},
Gsm,
// TODO: This has to be removed once the corresponding apiEventTypes are created
Miscellaneous,

View File

@ -116,7 +116,6 @@ impl_misc_api_event_type!(
AttachEvidenceRequest,
DisputeId,
PaymentLinkFormData,
PaymentsRedirectResponseData,
ConfigUpdate
);
@ -131,3 +130,15 @@ impl_misc_api_event_type!(
DummyConnectorRefundResponse,
DummyConnectorRefundRequest
);
impl ApiEventMetric for PaymentsRedirectResponseData {
fn get_api_event_type(&self) -> Option<ApiEventsType> {
Some(ApiEventsType::PaymentRedirectionResponse {
connector: self.connector.clone(),
payment_id: match &self.resource_id {
api_models::payments::PaymentIdType::PaymentIntentId(id) => Some(id.clone()),
_ => None,
},
})
}
}

View File

@ -377,7 +377,17 @@ where
req.connector.clone(),
std::any::type_name::<T>(),
masked_request_body,
None,
response
.as_ref()
.map(|response| {
response
.as_ref()
.map_or_else(|value| value, |value| value)
.response
.escape_ascii()
.to_string()
})
.ok(),
request_url,
request_method,
req.payment_id.clone(),

View File

@ -8,12 +8,9 @@ use rdkafka::{
};
use crate::events::EventType;
mod api_event;
pub mod outgoing_request;
mod payment_attempt;
mod payment_intent;
mod refund;
pub use api_event::{ApiCallEventType, ApiEvents, ApiEventsType};
use data_models::payments::{payment_attempt::PaymentAttempt, PaymentIntent};
use diesel_models::refund::Refund;
use serde::Serialize;
@ -300,11 +297,6 @@ impl KafkaProducer {
})
}
pub async fn log_api_event(&self, event: &ApiEvents) -> MQResult<()> {
self.log_kafka_event(&self.api_logs_topic, event)
.attach_printable_lazy(|| format!("Failed to add api log event {event:?}"))
}
pub fn get_topic(&self, event: EventType) -> &str {
match event {
EventType::ApiLogs => &self.api_logs_topic,

View File

@ -1,108 +0,0 @@
use api_models::enums as api_enums;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(tag = "flow_type")]
pub enum ApiEventsType {
Payment {
payment_id: String,
},
Refund {
payment_id: String,
refund_id: String,
},
Default,
PaymentMethod {
payment_method_id: String,
payment_method: Option<api_enums::PaymentMethod>,
payment_method_type: Option<api_enums::PaymentMethodType>,
},
Customer {
customer_id: String,
},
User {
//specified merchant_id will overridden on global defined
merchant_id: String,
user_id: String,
},
Webhooks {
connector: String,
payment_id: Option<String>,
},
OutgoingEvent,
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct ApiEvents {
pub api_name: String,
pub request_id: Option<String>,
//It is require to solve ambiquity in case of event_type is User
#[serde(skip_serializing_if = "Option::is_none")]
pub merchant_id: Option<String>,
pub request: String,
pub response: String,
pub status_code: u16,
#[serde(with = "time::serde::timestamp")]
pub created_at: OffsetDateTime,
pub latency: u128,
//conflicting fields underlying enums will be used
#[serde(flatten)]
pub event_type: ApiEventsType,
pub user_agent: Option<String>,
pub ip_addr: Option<String>,
pub url_path: Option<String>,
pub api_event_type: Option<ApiCallEventType>,
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum ApiCallEventType {
IncomingApiEvent,
OutgoingApiEvent,
}
impl super::KafkaMessage for ApiEvents {
fn key(&self) -> String {
match &self.event_type {
ApiEventsType::Payment { payment_id } => format!(
"{}_{}",
self.merchant_id
.as_ref()
.unwrap_or(&"default_merchant_id".to_string()),
payment_id
),
ApiEventsType::Refund {
payment_id,
refund_id,
} => format!("{payment_id}_{refund_id}"),
ApiEventsType::Default => "key".to_string(),
ApiEventsType::PaymentMethod {
payment_method_id,
payment_method,
payment_method_type,
} => format!(
"{:?}_{:?}_{:?}",
payment_method_id.clone(),
payment_method.clone(),
payment_method_type.clone(),
),
ApiEventsType::Customer { customer_id } => customer_id.to_string(),
ApiEventsType::User {
merchant_id,
user_id,
} => format!("{}_{}", merchant_id, user_id),
ApiEventsType::Webhooks {
connector,
payment_id,
} => format!(
"webhook_{}_{connector}",
payment_id.clone().unwrap_or_default()
),
ApiEventsType::OutgoingEvent => "outgoing_event".to_string(),
}
}
fn creation_timestamp(&self) -> Option<i64> {
Some(self.created_at.unix_timestamp())
}
}

View File

@ -1,19 +0,0 @@
use reqwest::Url;
pub struct OutgoingRequest {
pub url: Url,
pub latency: u128,
}
// impl super::KafkaMessage for OutgoingRequest {
// fn key(&self) -> String {
// format!(
// "{}_{}",
// )
// }
// fn creation_timestamp(&self) -> Option<i64> {
// Some(self.created_at.unix_timestamp())
// }
// }