From ebe44b9d7d90c7ee15a36be96470e83c19681ce4 Mon Sep 17 00:00:00 2001 From: Sarthak Soni <76486416+Sarthak1799@users.noreply.github.com> Date: Tue, 3 Jun 2025 02:23:56 +0530 Subject: [PATCH] feat(routing): Add audit trail for routing (#8188) Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> --- config/config.example.toml | 1 + config/deployments/env_specific.toml | 1 + config/development.toml | 1 + config/docker_compose.toml | 1 + .../clickhouse/scripts/connector_events.sql | 11 +- .../clickhouse/scripts/routing_events.sql | 191 +++++++++++ crates/analytics/src/clickhouse.rs | 14 + crates/analytics/src/lib.rs | 2 + crates/analytics/src/routing_events.rs | 5 + crates/analytics/src/routing_events/core.rs | 26 ++ crates/analytics/src/routing_events/events.rs | 73 +++++ crates/analytics/src/sqlx.rs | 2 + crates/analytics/src/types.rs | 1 + crates/api_models/src/analytics.rs | 1 + .../src/analytics/routing_events.rs | 6 + crates/api_models/src/events.rs | 6 +- crates/api_models/src/routing.rs | 246 +++++++++++++- crates/external_services/Cargo.toml | 19 +- crates/hyperswitch_interfaces/src/events.rs | 1 + .../src/events/routing_api_logs.rs | 186 +++++++++++ crates/router/src/analytics.rs | 50 ++- crates/router/src/core/payments.rs | 1 + crates/router/src/core/payments/routing.rs | 307 +++++++++++++++++- crates/router/src/core/routing/helpers.rs | 224 ++++++++++++- crates/router/src/events.rs | 2 + crates/router/src/events/routing_api_logs.rs | 19 ++ crates/router/src/services/kafka.rs | 10 + 27 files changed, 1373 insertions(+), 34 deletions(-) create mode 100644 crates/analytics/docs/clickhouse/scripts/routing_events.sql create mode 100644 crates/analytics/src/routing_events.rs create mode 100644 crates/analytics/src/routing_events/core.rs create mode 100644 crates/analytics/src/routing_events/events.rs create mode 100644 crates/api_models/src/analytics/routing_events.rs create mode 100644 crates/hyperswitch_interfaces/src/events/routing_api_logs.rs create mode 100644 crates/router/src/events/routing_api_logs.rs diff --git a/config/config.example.toml b/config/config.example.toml index 16846d0c40..12a905e1f5 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -945,6 +945,7 @@ audit_events_topic = "topic" # Kafka topic to be used for Payment Au payout_analytics_topic = "topic" # Kafka topic to be used for Payouts and PayoutAttempt events consolidated_events_topic = "topic" # Kafka topic to be used for Consolidated events authentication_analytics_topic = "topic" # Kafka topic to be used for Authentication events +routing_logs_topic = "topic" # Kafka topic to be used for Routing events # File storage configuration [file_storage] diff --git a/config/deployments/env_specific.toml b/config/deployments/env_specific.toml index 05c1745a6d..022ba701e8 100644 --- a/config/deployments/env_specific.toml +++ b/config/deployments/env_specific.toml @@ -93,6 +93,7 @@ payout_analytics_topic = "topic" # Kafka topic to be used for Payouts an consolidated_events_topic = "topic" # Kafka topic to be used for Consolidated events authentication_analytics_topic = "topic" # Kafka topic to be used for Authentication events fraud_check_analytics_topic = "topic" # Kafka topic to be used for Fraud Check events +routing_logs_topic = "topic" # Kafka topic to be used for Routing events # File storage configuration [file_storage] diff --git a/config/development.toml b/config/development.toml index 158f8b0116..d090c0aaaa 100644 --- a/config/development.toml +++ b/config/development.toml @@ -1035,6 +1035,7 @@ audit_events_topic = "hyperswitch-audit-events" payout_analytics_topic = "hyperswitch-payout-events" consolidated_events_topic = "hyperswitch-consolidated-events" authentication_analytics_topic = "hyperswitch-authentication-events" +routing_logs_topic = "hyperswitch-routing-api-events" [debit_routing_config] supported_currencies = "USD" diff --git a/config/docker_compose.toml b/config/docker_compose.toml index 5a425bd89f..c6d40c59e8 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -900,6 +900,7 @@ audit_events_topic = "hyperswitch-audit-events" payout_analytics_topic = "hyperswitch-payout-events" consolidated_events_topic = "hyperswitch-consolidated-events" authentication_analytics_topic = "hyperswitch-authentication-events" +routing_logs_topic = "hyperswitch-routing-api-events" [analytics] source = "sqlx" diff --git a/crates/analytics/docs/clickhouse/scripts/connector_events.sql b/crates/analytics/docs/clickhouse/scripts/connector_events.sql index 61a036bccd..33826fbaaf 100644 --- a/crates/analytics/docs/clickhouse/scripts/connector_events.sql +++ b/crates/analytics/docs/clickhouse/scripts/connector_events.sql @@ -1,4 +1,5 @@ -CREATE TABLE connector_events_queue ( +CREATE TABLE connector_events_queue +( `merchant_id` String, `payment_id` Nullable(String), `connector_name` LowCardinality(String), @@ -13,11 +14,9 @@ CREATE TABLE connector_events_queue ( `method` LowCardinality(String), `dispute_id` Nullable(String), `refund_id` Nullable(String) -) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092', -kafka_topic_list = 'hyperswitch-outgoing-connector-events', -kafka_group_name = 'hyper', -kafka_format = 'JSONEachRow', -kafka_handle_error_mode = 'stream'; +) +ENGINE = Kafka +SETTINGS kafka_broker_list = 'kafka0:29092', kafka_topic_list = 'hyperswitch-outgoing-connector-events', kafka_group_name = 'hyper', kafka_format = 'JSONEachRow', kafka_handle_error_mode = 'stream'; CREATE MATERIALIZED VIEW connector_events_parse_errors ( `topic` String, diff --git a/crates/analytics/docs/clickhouse/scripts/routing_events.sql b/crates/analytics/docs/clickhouse/scripts/routing_events.sql new file mode 100644 index 0000000000..d9e2ccd9e5 --- /dev/null +++ b/crates/analytics/docs/clickhouse/scripts/routing_events.sql @@ -0,0 +1,191 @@ +-- Need to discuss with the team about the structure of this table. +CREATE TABLE routing_events_queue +( + `merchant_id` String, + `profile_id` String, + `payment_id` String, + `refund_id` Nullable(String), + `dispute_id` Nullable(String), + `routable_connectors` String, + `payment_connector` Nullable(String), + `request_id` String, + `flow` LowCardinality(String), + `url` Nullable(String), + `request` String, + `response` Nullable(String), + `error` Nullable(String), + `status_code` Nullable(UInt32), + `created_at` DateTime64(9), + `method` LowCardinality(String), + `routing_engine` LowCardinality(String), + `routing_approach` Nullable(String) +) +ENGINE = Kafka +SETTINGS kafka_broker_list = 'kafka0:29092', kafka_topic_list = 'hyperswitch-routing-api-events', kafka_group_name = 'hyper', kafka_format = 'JSONEachRow', kafka_handle_error_mode = 'stream'; + +CREATE MATERIALIZED VIEW routing_events_parse_errors ( + `topic` String, + `partition` Int64, + `offset` Int64, + `raw` String, + `error` String +) ENGINE = MergeTree +ORDER BY + (topic, partition, offset) SETTINGS index_granularity = 8192 AS +SELECT + _topic AS topic, + _partition AS partition, + _offset AS offset, + _raw_message AS raw, + _error AS error +FROM + routing_events_queue +WHERE + length(_error) > 0; + +CREATE TABLE routing_events ( + `merchant_id` String, + `profile_id` String, + `payment_id` String, + `refund_id` Nullable(String), + `dispute_id` Nullable(String), + `routable_connectors` String, + `payment_connector` Nullable(String), + `request_id` String, + `flow` LowCardinality(String), + `url` Nullable(String), + `request` String, + `response` Nullable(String), + `error` Nullable(String), + `status_code` Nullable(UInt32), + `created_at` DateTime64(9), + `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4), + `method` LowCardinality(String), + `routing_engine` LowCardinality(String), + `routing_approach` Nullable(String), + INDEX flowIndex flow TYPE bloom_filter GRANULARITY 1, + INDEX profileIndex profile_id TYPE bloom_filter GRANULARITY 1 +) ENGINE = MergeTree +PARTITION BY toStartOfDay(created_at) +ORDER BY ( created_at, merchant_id, profile_id, payment_id ) +SETTINGS index_granularity = 8192; + +CREATE TABLE routing_events_audit ( + `merchant_id` String, + `profile_id` String, + `payment_id` String, + `refund_id` Nullable(String), + `dispute_id` Nullable(String), + `routable_connectors` String, + `payment_connector` Nullable(String), + `request_id` String, + `flow` LowCardinality(String), + `url` Nullable(String), + `request` String, + `response` Nullable(String), + `error` Nullable(String), + `status_code` Nullable(UInt32), + `created_at` DateTime64(9), + `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4), + `method` LowCardinality(String), + `routing_engine` LowCardinality(String), + `routing_approach` Nullable(String), + INDEX flowIndex flow TYPE bloom_filter GRANULARITY 1, + INDEX profileIndex profile_id TYPE bloom_filter GRANULARITY 1 +) ENGINE = MergeTree +PARTITION BY (merchant_id) +ORDER BY ( merchant_id, payment_id ) +SETTINGS index_granularity = 8192; + + +CREATE MATERIALIZED VIEW routing_events_mv TO routing_events ( + `merchant_id` String, + `profile_id` String, + `payment_id` String, + `refund_id` Nullable(String), + `dispute_id` Nullable(String), + `routable_connectors` String, + `payment_connector` Nullable(String), + `request_id` String, + `flow` LowCardinality(String), + `url` Nullable(String), + `request` String, + `response` Nullable(String), + `error` Nullable(String), + `status_code` Nullable(UInt32), + `created_at` DateTime64(9), + `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4), + `method` LowCardinality(String), + `routing_engine` LowCardinality(String), + `routing_approach` Nullable(String) +) AS +SELECT + merchant_id, + profile_id, + payment_id, + refund_id, + dispute_id, + routable_connectors, + payment_connector, + request_id, + flow, + url, + request, + response, + error, + status_code, + created_at, + now() AS inserted_at, + method, + routing_engine, + routing_approach +FROM + routing_events_queue +WHERE + length(_error) = 0; + +CREATE MATERIALIZED VIEW routing_events_audit_mv TO routing_events_audit ( + `merchant_id` String, + `profile_id` String, + `payment_id` String, + `refund_id` Nullable(String), + `dispute_id` Nullable(String), + `routable_connectors` String, + `payment_connector` Nullable(String), + `request_id` String, + `flow` LowCardinality(String), + `url` Nullable(String), + `request` String, + `response` Nullable(String), + `error` Nullable(String), + `status_code` Nullable(UInt32), + `created_at` DateTime64(9), + `inserted_at` DateTime DEFAULT now() CODEC(T64, LZ4), + `method` LowCardinality(String), + `routing_engine` LowCardinality(String), + `routing_approach` Nullable(String) +) AS +SELECT + merchant_id, + profile_id, + payment_id, + refund_id, + dispute_id, + routable_connectors, + payment_connector, + request_id, + flow, + url, + request, + response, + error, + status_code, + created_at, + now() AS inserted_at, + method, + routing_engine, + routing_approach +FROM + routing_events_queue +WHERE + length(_error) = 0; \ No newline at end of file diff --git a/crates/analytics/src/clickhouse.rs b/crates/analytics/src/clickhouse.rs index 1d52cc9e76..f58df9bedf 100644 --- a/crates/analytics/src/clickhouse.rs +++ b/crates/analytics/src/clickhouse.rs @@ -32,6 +32,7 @@ use crate::{ connector_events::events::ConnectorEventsResult, disputes::{filters::DisputeFilterRow, metrics::DisputeMetricRow}, outgoing_webhook_event::events::OutgoingWebhookLogsResult, + routing_events::events::RoutingEventsResult, sdk_events::events::SdkEventsResult, types::TableEngine, }; @@ -150,6 +151,7 @@ impl AnalyticsDataSource for ClickhouseClient { | AnalyticsCollection::SdkEventsAnalytics | AnalyticsCollection::ApiEvents | AnalyticsCollection::ConnectorEvents + | AnalyticsCollection::RoutingEvents | AnalyticsCollection::ApiEventsAnalytics | AnalyticsCollection::OutgoingWebhookEvent | AnalyticsCollection::ActivePaymentsAnalytics => TableEngine::BasicTree, @@ -187,6 +189,7 @@ impl super::api_event::events::ApiLogsFilterAnalytics for ClickhouseClient {} impl super::api_event::filters::ApiEventFilterAnalytics for ClickhouseClient {} impl super::api_event::metrics::ApiEventMetricAnalytics for ClickhouseClient {} impl super::connector_events::events::ConnectorEventLogAnalytics for ClickhouseClient {} +impl super::routing_events::events::RoutingEventLogAnalytics for ClickhouseClient {} impl super::outgoing_webhook_event::events::OutgoingWebhookLogsFilterAnalytics for ClickhouseClient { @@ -236,6 +239,16 @@ impl TryInto for serde_json::Value { } } +impl TryInto for serde_json::Value { + type Error = Report; + + fn try_into(self) -> Result { + serde_json::from_value(self).change_context(ParsingError::StructParseFailure( + "Failed to parse RoutingEventsResult in clickhouse results", + )) + } +} + impl TryInto for serde_json::Value { type Error = Report; @@ -471,6 +484,7 @@ impl ToSql for AnalyticsCollection { Self::DisputeSessionized => Ok("sessionizer_dispute".to_string()), Self::ActivePaymentsAnalytics => Ok("active_payments".to_string()), Self::Authentications => Ok("authentications".to_string()), + Self::RoutingEvents => Ok("routing_events_audit".to_string()), } } } diff --git a/crates/analytics/src/lib.rs b/crates/analytics/src/lib.rs index 2cd2da30e4..1718c5044a 100644 --- a/crates/analytics/src/lib.rs +++ b/crates/analytics/src/lib.rs @@ -16,6 +16,7 @@ pub mod payment_intents; pub mod payments; mod query; pub mod refunds; +pub mod routing_events; pub mod sdk_events; pub mod search; mod sqlx; @@ -1165,6 +1166,7 @@ pub enum AnalyticsFlow { GetDisputeFilters, GetDisputeMetrics, GetSankey, + GetRoutingEvents, } impl FlowMetric for AnalyticsFlow {} diff --git a/crates/analytics/src/routing_events.rs b/crates/analytics/src/routing_events.rs new file mode 100644 index 0000000000..531af749c9 --- /dev/null +++ b/crates/analytics/src/routing_events.rs @@ -0,0 +1,5 @@ +mod core; +pub mod events; +pub trait RoutingEventAnalytics: events::RoutingEventLogAnalytics {} + +pub use self::core::routing_events_core; diff --git a/crates/analytics/src/routing_events/core.rs b/crates/analytics/src/routing_events/core.rs new file mode 100644 index 0000000000..30efcfaaa9 --- /dev/null +++ b/crates/analytics/src/routing_events/core.rs @@ -0,0 +1,26 @@ +use api_models::analytics::routing_events::RoutingEventsRequest; +use common_utils::errors::ReportSwitchExt; +use error_stack::ResultExt; + +use super::events::{get_routing_events, RoutingEventsResult}; +use crate::{errors::AnalyticsResult, types::FiltersError, AnalyticsProvider}; + +pub async fn routing_events_core( + pool: &AnalyticsProvider, + req: RoutingEventsRequest, + merchant_id: &common_utils::id_type::MerchantId, +) -> AnalyticsResult> { + let data = match pool { + AnalyticsProvider::Sqlx(_) => Err(FiltersError::NotImplemented( + "Connector Events not implemented for SQLX", + )) + .attach_printable("SQL Analytics is not implemented for Connector Events"), + AnalyticsProvider::Clickhouse(ckh_pool) + | AnalyticsProvider::CombinedSqlx(_, ckh_pool) + | AnalyticsProvider::CombinedCkh(_, ckh_pool) => { + get_routing_events(merchant_id, req, ckh_pool).await + } + } + .switch()?; + Ok(data) +} diff --git a/crates/analytics/src/routing_events/events.rs b/crates/analytics/src/routing_events/events.rs new file mode 100644 index 0000000000..27b2f25162 --- /dev/null +++ b/crates/analytics/src/routing_events/events.rs @@ -0,0 +1,73 @@ +use api_models::analytics::{routing_events::RoutingEventsRequest, Granularity}; +use common_utils::errors::ReportSwitchExt; +use error_stack::ResultExt; +use time::PrimitiveDateTime; + +use crate::{ + query::{Aggregate, GroupByClause, QueryBuilder, ToSql, Window}, + types::{AnalyticsCollection, AnalyticsDataSource, FiltersError, FiltersResult, LoadRow}, +}; +pub trait RoutingEventLogAnalytics: LoadRow {} + +pub async fn get_routing_events( + merchant_id: &common_utils::id_type::MerchantId, + query_param: RoutingEventsRequest, + pool: &T, +) -> FiltersResult> +where + T: AnalyticsDataSource + RoutingEventLogAnalytics, + PrimitiveDateTime: ToSql, + AnalyticsCollection: ToSql, + Granularity: GroupByClause, + Aggregate<&'static str>: ToSql, + Window<&'static str>: ToSql, +{ + let mut query_builder: QueryBuilder = QueryBuilder::new(AnalyticsCollection::RoutingEvents); + query_builder.add_select_column("*").switch()?; + + query_builder + .add_filter_clause("merchant_id", merchant_id) + .switch()?; + + query_builder + .add_filter_clause("payment_id", &query_param.payment_id) + .switch()?; + + if let Some(refund_id) = query_param.refund_id { + query_builder + .add_filter_clause("refund_id", &refund_id) + .switch()?; + } + + if let Some(dispute_id) = query_param.dispute_id { + query_builder + .add_filter_clause("dispute_id", &dispute_id) + .switch()?; + } + + query_builder + .execute_query::(pool) + .await + .change_context(FiltersError::QueryBuildingError)? + .change_context(FiltersError::QueryExecutionFailure) +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct RoutingEventsResult { + pub merchant_id: common_utils::id_type::MerchantId, + pub profile_id: common_utils::id_type::ProfileId, + pub payment_id: String, + pub routable_connectors: String, + pub payment_connector: Option, + pub request_id: Option, + pub flow: String, + pub url: Option, + pub request: String, + pub response: Option, + pub error: Option, + pub status_code: Option, + #[serde(with = "common_utils::custom_serde::iso8601")] + pub created_at: PrimitiveDateTime, + pub method: String, + pub routing_engine: String, +} diff --git a/crates/analytics/src/sqlx.rs b/crates/analytics/src/sqlx.rs index 7934c91184..028a1ddc14 100644 --- a/crates/analytics/src/sqlx.rs +++ b/crates/analytics/src/sqlx.rs @@ -1400,6 +1400,8 @@ impl ToSql for AnalyticsCollection { .attach_printable("DisputeSessionized table is not implemented for Sqlx"))?, Self::Authentications => Err(error_stack::report!(ParsingError::UnknownError) .attach_printable("Authentications table is not implemented for Sqlx"))?, + Self::RoutingEvents => Err(error_stack::report!(ParsingError::UnknownError) + .attach_printable("RoutingEvents table is not implemented for Sqlx"))?, } } } diff --git a/crates/analytics/src/types.rs b/crates/analytics/src/types.rs index 7bf8fc7d3c..28ff2cec8b 100644 --- a/crates/analytics/src/types.rs +++ b/crates/analytics/src/types.rs @@ -42,6 +42,7 @@ pub enum AnalyticsCollection { DisputeSessionized, ApiEventsAnalytics, ActivePaymentsAnalytics, + RoutingEvents, } #[allow(dead_code)] diff --git a/crates/api_models/src/analytics.rs b/crates/api_models/src/analytics.rs index 71d7f80eb7..fcce19601a 100644 --- a/crates/api_models/src/analytics.rs +++ b/crates/api_models/src/analytics.rs @@ -25,6 +25,7 @@ pub mod outgoing_webhook_event; pub mod payment_intents; pub mod payments; pub mod refunds; +pub mod routing_events; pub mod sdk_events; pub mod search; diff --git a/crates/api_models/src/analytics/routing_events.rs b/crates/api_models/src/analytics/routing_events.rs new file mode 100644 index 0000000000..e9cec7221b --- /dev/null +++ b/crates/api_models/src/analytics/routing_events.rs @@ -0,0 +1,6 @@ +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct RoutingEventsRequest { + pub payment_id: common_utils::id_type::PaymentId, + pub refund_id: Option, + pub dispute_id: Option, +} diff --git a/crates/api_models/src/events.rs b/crates/api_models/src/events.rs index e0e57050c4..0038590a54 100644 --- a/crates/api_models/src/events.rs +++ b/crates/api_models/src/events.rs @@ -29,7 +29,8 @@ use crate::{ admin::*, analytics::{ api_event::*, auth_events::*, connector_events::ConnectorEventsRequest, - outgoing_webhook_event::OutgoingWebhookLogsRequest, sdk_events::*, search::*, *, + outgoing_webhook_event::OutgoingWebhookLogsRequest, routing_events::RoutingEventsRequest, + sdk_events::*, search::*, *, }, api_keys::*, cards_info::*, @@ -142,7 +143,8 @@ impl_api_event_type!( OrganizationCreateRequest, OrganizationUpdateRequest, OrganizationId, - CustomerListRequest + CustomerListRequest, + RoutingEventsRequest ) ); diff --git a/crates/api_models/src/routing.rs b/crates/api_models/src/routing.rs index 7068da8edc..46cd37bc68 100644 --- a/crates/api_models/src/routing.rs +++ b/crates/api_models/src/routing.rs @@ -1046,7 +1046,7 @@ pub struct CurrentBlockThreshold { pub max_total_count: Option, } -#[derive(serde::Serialize, serde::Deserialize, Debug, Default, Clone, ToSchema)] +#[derive(serde::Serialize, serde::Deserialize, Debug, Default, Clone, Copy, ToSchema)] #[serde(rename_all = "snake_case")] pub enum SuccessRateSpecificityLevel { #[default] @@ -1283,3 +1283,247 @@ impl RoutableConnectorChoiceWithBucketName { } } } + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct CalSuccessRateConfigEventRequest { + pub min_aggregates_size: Option, + pub default_success_rate: Option, + pub specificity_level: SuccessRateSpecificityLevel, + pub exploration_percent: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct CalSuccessRateEventRequest { + pub id: String, + pub params: String, + pub labels: Vec, + pub config: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct EliminationRoutingEventBucketConfig { + pub bucket_size: Option, + pub bucket_leak_interval_in_secs: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct EliminationRoutingEventRequest { + pub id: String, + pub params: String, + pub labels: Vec, + pub config: Option, +} + +/// API-1 types +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct CalContractScoreEventRequest { + pub id: String, + pub params: String, + pub labels: Vec, + pub config: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct LabelWithScoreEventResponse { + pub score: f64, + pub label: String, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct CalSuccessRateEventResponse { + pub labels_with_score: Vec, + pub routing_apporach: RoutingApproach, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum RoutingApproach { + Exploitation, + Exploration, + Elimination, + ContractBased, + Default, +} + +impl RoutingApproach { + pub fn from_decision_engine_approach(approach: &str) -> Self { + match approach { + "SR_SELECTION_V3_ROUTING" => Self::Exploitation, + "SR_V3_HEDGING" => Self::Exploration, + _ => Self::Default, + } + } +} + +impl std::fmt::Display for RoutingApproach { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Exploitation => write!(f, "Exploitation"), + Self::Exploration => write!(f, "Exploration"), + Self::Elimination => write!(f, "Elimination"), + Self::ContractBased => write!(f, "ContractBased"), + Self::Default => write!(f, "Default"), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct BucketInformationEventResponse { + pub is_eliminated: bool, + pub bucket_name: Vec, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct EliminationInformationEventResponse { + pub entity: Option, + pub global: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct LabelWithStatusEliminationEventResponse { + pub label: String, + pub elimination_information: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct EliminationEventResponse { + pub labels_with_status: Vec, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct ScoreDataEventResponse { + pub score: f64, + pub label: String, + pub current_count: u64, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct CalContractScoreEventResponse { + pub labels_with_score: Vec, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct CalGlobalSuccessRateConfigEventRequest { + pub entity_min_aggregates_size: u32, + pub entity_default_success_rate: f64, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct CalGlobalSuccessRateEventRequest { + pub entity_id: String, + pub entity_params: String, + pub entity_labels: Vec, + pub global_labels: Vec, + pub config: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct UpdateSuccessRateWindowConfig { + pub max_aggregates_size: Option, + pub current_block_threshold: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct UpdateLabelWithStatusEventRequest { + pub label: String, + pub status: bool, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct UpdateSuccessRateWindowEventRequest { + pub id: String, + pub params: String, + pub labels_with_status: Vec, + pub config: Option, + pub global_labels_with_status: Vec, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct UpdateSuccessRateWindowEventResponse { + pub status: UpdationStatusEventResponse, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum UpdationStatusEventResponse { + WindowUpdationSucceeded, + WindowUpdationFailed, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct LabelWithBucketNameEventRequest { + pub label: String, + pub bucket_name: String, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct UpdateEliminationBucketEventRequest { + pub id: String, + pub params: String, + pub labels_with_bucket_name: Vec, + pub config: Option, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct UpdateEliminationBucketEventResponse { + pub status: EliminationUpdationStatusEventResponse, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum EliminationUpdationStatusEventResponse { + BucketUpdationSucceeded, + BucketUpdationFailed, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct ContractLabelInformationEventRequest { + pub label: String, + pub target_count: u64, + pub target_time: u64, + pub current_count: u64, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct UpdateContractRequestEventRequest { + pub id: String, + pub params: String, + pub labels_information: Vec, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct UpdateContractEventResponse { + pub status: ContractUpdationStatusEventResponse, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ContractUpdationStatusEventResponse { + ContractUpdationSucceeded, + ContractUpdationFailed, +} diff --git a/crates/external_services/Cargo.toml b/crates/external_services/Cargo.toml index 336a4d6471..34aca99f39 100644 --- a/crates/external_services/Cargo.toml +++ b/crates/external_services/Cargo.toml @@ -13,7 +13,19 @@ email = ["dep:aws-config"] aws_s3 = ["dep:aws-config", "dep:aws-sdk-s3"] hashicorp-vault = ["dep:vaultrs"] v1 = ["hyperswitch_interfaces/v1", "common_utils/v1"] -dynamic_routing = ["dep:prost", "dep:tonic", "dep:tonic-reflection", "dep:tonic-types", "dep:api_models", "tokio/macros", "tokio/rt-multi-thread", "dep:tonic-build", "dep:router_env", "dep:hyper-util", "dep:http-body-util"] +dynamic_routing = [ + "dep:prost", + "dep:tonic", + "dep:tonic-reflection", + "dep:tonic-types", + "dep:api_models", + "tokio/macros", + "tokio/rt-multi-thread", + "dep:tonic-build", + "dep:router_env", + "dep:hyper-util", + "dep:http-body-util", +] [dependencies] async-trait = "0.1.88" @@ -51,7 +63,10 @@ quick-xml = { version = "0.31.0", features = ["serialize"] } common_utils = { version = "0.1.0", path = "../common_utils" } hyperswitch_interfaces = { version = "0.1.0", path = "../hyperswitch_interfaces", default-features = false } masking = { version = "0.1.0", path = "../masking" } -router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] } +router_env = { version = "0.1.0", path = "../router_env", features = [ + "log_extra_implicit_fields", + "log_custom_entries_to_extra", +] } api_models = { version = "0.1.0", path = "../api_models", optional = true } diff --git a/crates/hyperswitch_interfaces/src/events.rs b/crates/hyperswitch_interfaces/src/events.rs index 3dcb751954..54f24c2ec1 100644 --- a/crates/hyperswitch_interfaces/src/events.rs +++ b/crates/hyperswitch_interfaces/src/events.rs @@ -1,3 +1,4 @@ //! Events interface pub mod connector_api_logs; +pub mod routing_api_logs; diff --git a/crates/hyperswitch_interfaces/src/events/routing_api_logs.rs b/crates/hyperswitch_interfaces/src/events/routing_api_logs.rs new file mode 100644 index 0000000000..f7c61e3890 --- /dev/null +++ b/crates/hyperswitch_interfaces/src/events/routing_api_logs.rs @@ -0,0 +1,186 @@ +//! Routing API logs interface + +use std::fmt; + +use api_models::routing::RoutableConnectorChoice; +use common_utils::request::Method; +use router_env::tracing_actix_web::RequestId; +use serde::Serialize; +use serde_json::json; +use time::OffsetDateTime; + +/// RoutingEngine enum +#[derive(Debug, Clone, Copy, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum RoutingEngine { + /// Dynamo for routing + IntelligentRouter, + /// Decision engine for routing + DecisionEngine, +} + +/// Method type enum +#[derive(Debug, Clone, Copy, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum ApiMethod { + /// grpc call + Grpc, + /// Rest call + Rest(Method), +} + +impl fmt::Display for ApiMethod { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Grpc => write!(f, "Grpc"), + Self::Rest(method) => write!(f, "Rest ({})", method), + } + } +} + +#[derive(Debug, Serialize)] +/// RoutingEvent type +pub struct RoutingEvent { + tenant_id: common_utils::id_type::TenantId, + routable_connectors: String, + payment_connector: Option, + flow: String, + request: String, + response: Option, + error: Option, + url: String, + method: String, + payment_id: String, + profile_id: common_utils::id_type::ProfileId, + merchant_id: common_utils::id_type::MerchantId, + created_at: i128, + status_code: Option, + request_id: String, + routing_engine: RoutingEngine, + routing_approach: Option, +} + +impl RoutingEvent { + /// fn new RoutingEvent + #[allow(clippy::too_many_arguments)] + pub fn new( + tenant_id: common_utils::id_type::TenantId, + routable_connectors: String, + flow: &str, + request: serde_json::Value, + url: String, + method: ApiMethod, + payment_id: String, + profile_id: common_utils::id_type::ProfileId, + merchant_id: common_utils::id_type::MerchantId, + request_id: Option, + routing_engine: RoutingEngine, + ) -> Self { + Self { + tenant_id, + routable_connectors, + flow: flow.to_string(), + request: request.to_string(), + response: None, + error: None, + url, + method: method.to_string(), + payment_id, + profile_id, + merchant_id, + created_at: OffsetDateTime::now_utc().unix_timestamp_nanos(), + status_code: None, + request_id: request_id + .map(|i| i.as_hyphenated().to_string()) + .unwrap_or("NO_REQUEST_ID".to_string()), + routing_engine, + payment_connector: None, + routing_approach: None, + } + } + + /// fn set_response_body + pub fn set_response_body(&mut self, response: &T) { + match masking::masked_serialize(response) { + Ok(masked) => { + self.response = Some(masked.to_string()); + } + Err(er) => self.set_error(json!({"error": er.to_string()})), + } + } + + /// fn set_error_response_body + pub fn set_error_response_body(&mut self, response: &T) { + match masking::masked_serialize(response) { + Ok(masked) => { + self.error = Some(masked.to_string()); + } + Err(er) => self.set_error(json!({"error": er.to_string()})), + } + } + + /// fn set_error + pub fn set_error(&mut self, error: serde_json::Value) { + self.error = Some(error.to_string()); + } + + /// set response status code + pub fn set_status_code(&mut self, code: u16) { + self.status_code = Some(code); + } + + /// set response status code + pub fn set_routable_connectors(&mut self, connectors: Vec) { + let connectors = connectors + .into_iter() + .map(|c| { + format!( + "{:?}:{:?}", + c.connector, + c.merchant_connector_id + .map(|id| id.get_string_repr().to_string()) + .unwrap_or(String::from("")) + ) + }) + .collect::>() + .join(","); + self.routable_connectors = connectors; + } + + /// set payment connector + pub fn set_payment_connector(&mut self, connector: RoutableConnectorChoice) { + self.payment_connector = Some(format!( + "{:?}:{:?}", + connector.connector, + connector + .merchant_connector_id + .map(|id| id.get_string_repr().to_string()) + .unwrap_or(String::from("")) + )); + } + + /// set routing approach + pub fn set_routing_approach(&mut self, approach: String) { + self.routing_approach = Some(approach); + } + + /// Returns the request ID of the event. + pub fn get_request_id(&self) -> &str { + &self.request_id + } + + /// Returns the merchant ID of the event. + pub fn get_merchant_id(&self) -> &str { + self.merchant_id.get_string_repr() + } + + /// Returns the payment ID of the event. + pub fn get_payment_id(&self) -> &str { + &self.payment_id + } + + /// Returns the profile ID of the event. + pub fn get_profile_id(&self) -> &str { + self.profile_id.get_string_repr() + } +} diff --git a/crates/router/src/analytics.rs b/crates/router/src/analytics.rs index 67722d3e25..ffac49e7a8 100644 --- a/crates/router/src/analytics.rs +++ b/crates/router/src/analytics.rs @@ -10,8 +10,8 @@ pub mod routes { use analytics::{ api_event::api_events_core, connector_events::connector_events_core, enums::AuthInfo, errors::AnalyticsError, lambda_utils::invoke_lambda, opensearch::OpenSearchError, - outgoing_webhook_event::outgoing_webhook_events_core, sdk_events::sdk_events_core, - AnalyticsFlow, + outgoing_webhook_event::outgoing_webhook_events_core, routing_events::routing_events_core, + sdk_events::sdk_events_core, AnalyticsFlow, }; use api_models::analytics::{ api_event::QueryType, @@ -133,6 +133,10 @@ pub mod routes { web::resource("connector_event_logs") .route(web::get().to(get_profile_connector_events)), ) + .service( + web::resource("routing_event_logs") + .route(web::get().to(get_profile_routing_events)), + ) .service( web::resource("outgoing_webhook_event_logs") .route(web::get().to(get_profile_outgoing_webhook_events)), @@ -307,6 +311,10 @@ pub mod routes { web::resource("connector_event_logs") .route(web::get().to(get_profile_connector_events)), ) + .service( + web::resource("routing_event_logs") + .route(web::get().to(get_profile_routing_events)), + ) .service( web::resource("outgoing_webhook_event_logs") .route(web::get().to(get_profile_outgoing_webhook_events)), @@ -2169,6 +2177,44 @@ pub mod routes { .await } + pub async fn get_profile_routing_events( + state: web::Data, + req: actix_web::HttpRequest, + json_payload: web::Query, + ) -> impl Responder { + let flow = AnalyticsFlow::GetRoutingEvents; + Box::pin(api::server_wrap( + flow, + state, + &req, + json_payload.into_inner(), + |state, auth: AuthenticationData, req, _| async move { + utils::check_if_profile_id_is_present_in_payment_intent( + req.payment_id.clone(), + &state, + &auth, + ) + .await + .change_context(AnalyticsError::AccessForbiddenError)?; + routing_events_core(&state.pool, req, auth.merchant_account.get_id()) + .await + .map(ApplicationResponse::Json) + }, + auth::auth_type( + &auth::HeaderAuth(auth::ApiKeyAuth { + is_connected_allowed: false, + is_platform_allowed: false, + }), + &auth::JWTAuth { + permission: Permission::ProfileAnalyticsRead, + }, + req.headers(), + ), + api_locking::LockAction::NotApplicable, + )) + .await + } + pub async fn get_global_search_results( state: web::Data, req: actix_web::HttpRequest, diff --git a/crates/router/src/core/payments.rs b/crates/router/src/core/payments.rs index 0993a6eee3..6edf2a479d 100644 --- a/crates/router/src/core/payments.rs +++ b/crates/router/src/core/payments.rs @@ -7539,6 +7539,7 @@ where connectors.clone(), business_profile, dynamic_routing_config_params_interpolator, + payment_data.get_payment_attempt(), ) .await .map_err(|e| logger::error!(dynamic_routing_error=?e)) diff --git a/crates/router/src/core/payments/routing.rs b/crates/router/src/core/payments/routing.rs index de1cf0e8a6..fa161efb21 100644 --- a/crates/router/src/core/payments/routing.rs +++ b/crates/router/src/core/payments/routing.rs @@ -31,10 +31,12 @@ use euclid::{ use external_services::grpc_client::dynamic_routing::{ contract_routing_client::ContractBasedDynamicRouting, elimination_based_client::{EliminationBasedRouting, EliminationResponse}, - success_rate_client::{CalSuccessRateResponse, SuccessBasedDynamicRouting}, + success_rate_client::SuccessBasedDynamicRouting, DynamicRoutingError, }; use hyperswitch_domain_models::address::Address; +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +use hyperswitch_interfaces::events::routing_api_logs::{ApiMethod, RoutingEngine, RoutingEvent}; use kgraph_utils::{ mca as mca_graph, transformers::{IntoContext, IntoDirValue}, @@ -58,6 +60,8 @@ use crate::core::payouts; use crate::core::routing::transformers::OpenRouterDecideGatewayRequestExt; #[cfg(all(feature = "v1", feature = "dynamic_routing"))] use crate::headers; +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +use crate::routes::app::SessionStateInfo; use crate::{ core::{ errors, errors as oss_errors, payments::routing::utils::DecisionEngineApiHandler, routing, @@ -1583,6 +1587,7 @@ pub async fn perform_dynamic_routing_with_open_router( state, connector.clone(), profile.get_id(), + &payment_data.merchant_id, &payment_data.payment_id, common_enums::AttemptStatus::AuthenticationPending, ) @@ -1662,6 +1667,7 @@ pub async fn perform_dynamic_routing_with_intelligent_router( routable_connectors: Vec, profile: &domain::Profile, dynamic_routing_config_params_interpolator: routing::helpers::DynamicRoutingConfigParamsInterpolator, + payment_attempt: &oss_storage::PaymentAttempt, ) -> RoutingResult> { let dynamic_routing_algo_ref: api_routing::DynamicRoutingAlgorithmRef = profile .dynamic_routing_algorithm @@ -1690,6 +1696,8 @@ pub async fn perform_dynamic_routing_with_intelligent_router( state, routable_connectors.clone(), profile.get_id(), + &payment_attempt.merchant_id, + &payment_attempt.payment_id, dynamic_routing_config_params_interpolator.clone(), algorithm.clone(), ) @@ -1711,6 +1719,8 @@ pub async fn perform_dynamic_routing_with_intelligent_router( state, routable_connectors.clone(), profile.get_id(), + &payment_attempt.merchant_id, + &payment_attempt.payment_id, dynamic_routing_config_params_interpolator.clone(), algorithm.clone(), ) @@ -1732,6 +1742,8 @@ pub async fn perform_dynamic_routing_with_intelligent_router( state, connector_list.clone(), profile.get_id(), + &payment_attempt.merchant_id, + &payment_attempt.payment_id, dynamic_routing_config_params_interpolator.clone(), algorithm.clone(), ) @@ -1767,6 +1779,10 @@ pub async fn perform_decide_gateway_call_with_open_router( is_elimination_enabled, ); + let serialized_request = serde_json::to_value(&open_router_req_body) + .change_context(errors::RoutingError::OpenRouterCallFailed) + .attach_printable("Failed to serialize open_router request body")?; + let url = format!("{}/{}", &state.conf.open_router.url, "decide-gateway"); let mut request = request::Request::new(services::Method::Post, &url); request.add_header(headers::CONTENT_TYPE, "application/json".into()); @@ -1778,8 +1794,27 @@ pub async fn perform_decide_gateway_call_with_open_router( open_router_req_body, ))); + let mut routing_event = RoutingEvent::new( + state.tenant.tenant_id.clone(), + "".to_string(), + "open_router_decide_gateway_call", + serialized_request, + url.clone(), + ApiMethod::Rest(services::Method::Post), + payment_attempt.payment_id.get_string_repr().to_string(), + profile_id.to_owned(), + payment_attempt.merchant_id.to_owned(), + state.request_id, + RoutingEngine::DecisionEngine, + ); + let response = services::call_connector_api(state, request, "open_router_decide_gateway_call") .await + .inspect_err(|err| { + routing_event + .set_error(serde_json::json!({"error": err.current_context().to_string()})); + state.event_handler().log_event(&routing_event); + }) .change_context(errors::RoutingError::OpenRouterCallFailed)?; let sr_sorted_connectors = match response { @@ -1791,6 +1826,15 @@ pub async fn perform_decide_gateway_call_with_open_router( "Failed to parse the response from open_router".into(), ))?; + routing_event.set_status_code(resp.status_code); + routing_event.set_response_body(&decided_gateway); + routing_event.set_routing_approach( + api_routing::RoutingApproach::from_decision_engine_approach( + &decided_gateway.routing_approach, + ) + .to_string(), + ); + if let Some(gateway_priority_map) = decided_gateway.gateway_priority_map { logger::debug!(gateway_priority_map=?gateway_priority_map, routing_approach=decided_gateway.routing_approach, "open_router decide_gateway call response"); routable_connectors.sort_by(|connector_choice_a, connector_choice_b| { @@ -1807,6 +1851,10 @@ pub async fn perform_decide_gateway_call_with_open_router( .unwrap_or(std::cmp::Ordering::Equal) }); } + + routing_event.set_routable_connectors(routable_connectors.clone()); + state.event_handler().log_event(&routing_event); + Ok(routable_connectors) } Err(err) => { @@ -1817,6 +1865,12 @@ pub async fn perform_decide_gateway_call_with_open_router( "Failed to parse the response from open_router".into(), ))?; logger::error!("open_router_error_response: {:?}", err_resp); + + routing_event.set_status_code(err.status_code); + routing_event.set_error(serde_json::json!({"error": err_resp.error_message})); + routing_event.set_error_response_body(&err_resp); + state.event_handler().log_event(&routing_event); + Err(errors::RoutingError::OpenRouterError( "Failed to perform decide_gateway call in open_router".into(), )) @@ -1832,6 +1886,7 @@ pub async fn update_gateway_score_with_open_router( state: &SessionState, payment_connector: api_routing::RoutableConnectorChoice, profile_id: &common_utils::id_type::ProfileId, + merchant_id: &common_utils::id_type::MerchantId, payment_id: &common_utils::id_type::PaymentId, payment_status: common_enums::AttemptStatus, ) -> RoutingResult<()> { @@ -1842,6 +1897,10 @@ pub async fn update_gateway_score_with_open_router( payment_id: payment_id.clone(), }; + let serialized_request = serde_json::to_value(&open_router_req_body) + .change_context(errors::RoutingError::OpenRouterCallFailed) + .attach_printable("Failed to serialize open_router request body")?; + let url = format!("{}/{}", &state.conf.open_router.url, "update-gateway-score"); let mut request = request::Request::new(services::Method::Post, &url); request.add_header(headers::CONTENT_TYPE, "application/json".into()); @@ -1853,11 +1912,32 @@ pub async fn update_gateway_score_with_open_router( open_router_req_body, ))); + let mut routing_event = RoutingEvent::new( + state.tenant.tenant_id.clone(), + "".to_string(), + "open_router_update_gateway_score_call", + serialized_request, + url.clone(), + ApiMethod::Rest(services::Method::Post), + payment_id.get_string_repr().to_string(), + profile_id.to_owned(), + merchant_id.to_owned(), + state.request_id, + RoutingEngine::DecisionEngine, + ); + let response = services::call_connector_api(state, request, "open_router_update_gateway_score_call") .await + .inspect_err(|err| { + routing_event + .set_error(serde_json::json!({"error": err.current_context().to_string()})); + state.event_handler().log_event(&routing_event); + }) .change_context(errors::RoutingError::OpenRouterCallFailed)?; + routing_event.set_payment_connector(payment_connector.clone()); // check this in review + match response { Ok(resp) => { let update_score_resp = String::from_utf8(resp.response.to_vec()).change_context( @@ -1872,6 +1952,10 @@ pub async fn update_gateway_score_with_open_router( update_score_resp ); + routing_event.set_status_code(resp.status_code); + routing_event.set_response_body(&update_score_resp); + state.event_handler().log_event(&routing_event); + Ok(()) } Err(err) => { @@ -1882,6 +1966,12 @@ pub async fn update_gateway_score_with_open_router( "Failed to parse the response from open_router".into(), ))?; logger::error!("open_router_update_gateway_score_error: {:?}", err_resp); + + routing_event.set_status_code(err.status_code); + routing_event.set_error(serde_json::json!({"error": err_resp.error_message})); + routing_event.set_error_response_body(&err_resp); + state.event_handler().log_event(&routing_event); + Err(errors::RoutingError::OpenRouterError( "Failed to update gateway score in open_router".into(), )) @@ -1898,6 +1988,8 @@ pub async fn perform_success_based_routing( state: &SessionState, routable_connectors: Vec, profile_id: &common_utils::id_type::ProfileId, + merchant_id: &common_utils::id_type::MerchantId, + payment_id: &common_utils::id_type::PaymentId, success_based_routing_config_params_interpolator: routing::helpers::DynamicRoutingConfigParamsInterpolator, success_based_algo_ref: api_routing::SuccessBasedAlgorithm, ) -> RoutingResult> { @@ -1941,7 +2033,42 @@ pub async fn perform_success_based_routing( .ok_or(errors::RoutingError::SuccessBasedRoutingParamsNotFoundError)?, ); - let success_based_connectors: CalSuccessRateResponse = client + let event_request = api_routing::CalSuccessRateEventRequest { + id: profile_id.get_string_repr().to_string(), + params: success_based_routing_config_params.clone(), + labels: routable_connectors + .iter() + .map(|conn_choice| conn_choice.to_string()) + .collect::>(), + config: success_based_routing_configs.config.as_ref().map(|conf| { + api_routing::CalSuccessRateConfigEventRequest { + min_aggregates_size: conf.min_aggregates_size, + default_success_rate: conf.default_success_rate, + specificity_level: conf.specificity_level, + exploration_percent: conf.exploration_percent, + } + }), + }; + + let serialized_request = serde_json::to_value(&event_request) + .change_context(errors::RoutingError::SuccessBasedRoutingConfigError) + .attach_printable("unable to serialize success_based_routing_config_params")?; + + let mut routing_event = RoutingEvent::new( + state.tenant.tenant_id.clone(), + "".to_string(), + "Intelligent-router FetchSuccessRate", + serialized_request, + "SuccessRateCalculator.FetchSuccessRate".to_string(), + ApiMethod::Grpc, + payment_id.get_string_repr().to_string(), + profile_id.to_owned(), + merchant_id.to_owned(), + state.request_id, + RoutingEngine::IntelligentRouter, + ); + + let success_based_connectors = client .calculate_success_rate( profile_id.get_string_repr().into(), success_based_routing_configs, @@ -1950,11 +2077,45 @@ pub async fn perform_success_based_routing( state.get_grpc_headers(), ) .await + .inspect_err(|e| { + routing_event + .set_error(serde_json::json!({"error": e.current_context().to_string()})); + state.event_handler().log_event(&routing_event); + }) .change_context(errors::RoutingError::SuccessRateCalculationError) .attach_printable( "unable to calculate/fetch success rate from dynamic routing service", )?; + let event_resposne = api_routing::CalSuccessRateEventResponse { + labels_with_score: success_based_connectors + .labels_with_score + .iter() + .map( + |label_with_score| api_routing::LabelWithScoreEventResponse { + label: label_with_score.label.clone(), + score: label_with_score.score, + }, + ) + .collect(), + routing_apporach: match success_based_connectors.routing_approach { + 0 => api_routing::RoutingApproach::Exploration, + 1 => api_routing::RoutingApproach::Exploitation, + _ => { + return Err(errors::RoutingError::GenericNotFoundError { + field: "routing_approach".to_string(), + }) + .change_context(errors::RoutingError::GenericNotFoundError { + field: "unknown routing approach from dynamic routing service".to_string(), + }) + .attach_printable("unknown routing approach from dynamic routing service") + } + }, + }; + + routing_event.set_response_body(&event_resposne); + routing_event.set_routing_approach(event_resposne.routing_apporach.to_string()); + let mut connectors = Vec::with_capacity(success_based_connectors.labels_with_score.len()); for label_with_score in success_based_connectors.labels_with_score { let (connector, merchant_connector_id) = label_with_score.label @@ -1984,6 +2145,10 @@ pub async fn perform_success_based_routing( }); } logger::debug!(success_based_routing_connectors=?connectors); + + routing_event.set_status_code(200); + routing_event.set_routable_connectors(connectors.clone()); + state.event_handler().log_event(&routing_event); Ok(connectors) } else { Ok(routable_connectors) @@ -1996,6 +2161,8 @@ pub async fn perform_elimination_routing( state: &SessionState, routable_connectors: Vec, profile_id: &common_utils::id_type::ProfileId, + merchant_id: &common_utils::id_type::MerchantId, + payment_id: &common_utils::id_type::PaymentId, elimination_routing_configs_params_interpolator: routing::helpers::DynamicRoutingConfigParamsInterpolator, elimination_algo_ref: api_routing::EliminationRoutingAlgorithm, ) -> RoutingResult> { @@ -2041,6 +2208,40 @@ pub async fn perform_elimination_routing( .ok_or(errors::RoutingError::EliminationBasedRoutingParamsNotFoundError)?, ); + let event_request = api_routing::EliminationRoutingEventRequest { + id: profile_id.get_string_repr().to_string(), + params: elimination_routing_config_params.clone(), + labels: routable_connectors + .iter() + .map(|conn_choice| conn_choice.to_string()) + .collect::>(), + config: elimination_routing_config + .elimination_analyser_config + .as_ref() + .map(|conf| api_routing::EliminationRoutingEventBucketConfig { + bucket_leak_interval_in_secs: conf.bucket_leak_interval_in_secs, + bucket_size: conf.bucket_size, + }), + }; + + let serialized_request = serde_json::to_value(&event_request) + .change_context(errors::RoutingError::SuccessBasedRoutingConfigError) + .attach_printable("unable to serialize EliminationRoutingEventRequest")?; + + let mut routing_event = RoutingEvent::new( + state.tenant.tenant_id.clone(), + "".to_string(), + "Intelligent-router GetEliminationStatus", + serialized_request, + "EliminationAnalyser.GetEliminationStatus".to_string(), + ApiMethod::Grpc, + payment_id.get_string_repr().to_string(), + profile_id.to_owned(), + merchant_id.to_owned(), + state.request_id, + RoutingEngine::IntelligentRouter, + ); + let elimination_based_connectors: EliminationResponse = client .perform_elimination_routing( profile_id.get_string_repr().to_string(), @@ -2050,10 +2251,48 @@ pub async fn perform_elimination_routing( state.get_grpc_headers(), ) .await + .inspect_err(|e| { + routing_event + .set_error(serde_json::json!({"error": e.current_context().to_string()})); + state.event_handler().log_event(&routing_event); + }) .change_context(errors::RoutingError::EliminationRoutingCalculationError) .attach_printable( "unable to analyze/fetch elimination routing from dynamic routing service", )?; + + let event_response = api_routing::EliminationEventResponse { + labels_with_status: elimination_based_connectors + .labels_with_status + .iter() + .map( + |label_with_status| api_routing::LabelWithStatusEliminationEventResponse { + label: label_with_status.label.clone(), + elimination_information: label_with_status + .elimination_information + .as_ref() + .map(|info| api_routing::EliminationInformationEventResponse { + entity: info.entity.as_ref().map(|entity_info| { + api_routing::BucketInformationEventResponse { + is_eliminated: entity_info.is_eliminated, + bucket_name: entity_info.bucket_name.clone(), + } + }), + global: info.global.as_ref().map(|global_info| { + api_routing::BucketInformationEventResponse { + is_eliminated: global_info.is_eliminated, + bucket_name: global_info.bucket_name.clone(), + } + }), + }), + }, + ) + .collect(), + }; + + routing_event.set_response_body(&event_response); + routing_event.set_routing_approach(api_routing::RoutingApproach::Elimination.to_string()); + let mut connectors = Vec::with_capacity(elimination_based_connectors.labels_with_status.len()); let mut eliminated_connectors = @@ -2105,6 +2344,10 @@ pub async fn perform_elimination_routing( } logger::debug!(dynamic_eliminated_connectors=?eliminated_connectors); logger::debug!(dynamic_elimination_based_routing_connectors=?connectors); + + routing_event.set_status_code(200); + routing_event.set_routable_connectors(connectors.clone()); + state.event_handler().log_event(&routing_event); Ok(connectors) } else { Ok(routable_connectors) @@ -2116,6 +2359,8 @@ pub async fn perform_contract_based_routing( state: &SessionState, routable_connectors: Vec, profile_id: &common_utils::id_type::ProfileId, + merchant_id: &common_utils::id_type::MerchantId, + payment_id: &common_utils::id_type::PaymentId, _dynamic_routing_config_params_interpolator: routing::helpers::DynamicRoutingConfigParamsInterpolator, contract_based_algo_ref: api_routing::ContractRoutingAlgorithm, ) -> RoutingResult> { @@ -2176,6 +2421,34 @@ pub async fn perform_contract_based_routing( }) .collect::>(); + let event_request = api_routing::CalContractScoreEventRequest { + id: profile_id.get_string_repr().to_string(), + params: "".to_string(), + labels: contract_based_connectors + .iter() + .map(|conn_choice| conn_choice.to_string()) + .collect::>(), + config: Some(contract_based_routing_configs.clone()), + }; + + let serialized_request = serde_json::to_value(&event_request) + .change_context(errors::RoutingError::SuccessBasedRoutingConfigError) + .attach_printable("unable to serialize EliminationRoutingEventRequest")?; + + let mut routing_event = RoutingEvent::new( + state.tenant.tenant_id.clone(), + "".to_string(), + "Intelligent-router CalContractScore", + serialized_request, + "ContractScoreCalculator.FetchContractScore".to_string(), + ApiMethod::Grpc, + payment_id.get_string_repr().to_string(), + profile_id.to_owned(), + merchant_id.to_owned(), + state.request_id, + RoutingEngine::IntelligentRouter, + ); + let contract_based_connectors_result = client .calculate_contract_score( profile_id.get_string_repr().into(), @@ -2185,12 +2458,36 @@ pub async fn perform_contract_based_routing( state.get_grpc_headers(), ) .await + .inspect_err(|e| { + routing_event + .set_error(serde_json::json!({"error": e.current_context().to_string()})); + routing_event + .set_routing_approach(api_routing::RoutingApproach::ContractBased.to_string()); + state.event_handler().log_event(&routing_event); + }) .attach_printable( "unable to calculate/fetch contract score from dynamic routing service", ); let contract_based_connectors = match contract_based_connectors_result { - Ok(resp) => resp, + Ok(resp) => { + let event_response = api_routing::CalContractScoreEventResponse { + labels_with_score: resp + .labels_with_score + .iter() + .map(|label_with_score| api_routing::ScoreDataEventResponse { + score: label_with_score.score, + label: label_with_score.label.clone(), + current_count: label_with_score.current_count, + }) + .collect(), + }; + + routing_event.set_response_body(&event_response); + routing_event + .set_routing_approach(api_routing::RoutingApproach::ContractBased.to_string()); + resp + } Err(err) => match err.current_context() { DynamicRoutingError::ContractNotFound => { client @@ -2255,6 +2552,10 @@ pub async fn perform_contract_based_routing( connectors.append(&mut other_connectors); logger::debug!(contract_based_routing_connectors=?connectors); + + routing_event.set_status_code(200); + routing_event.set_routable_connectors(connectors.clone()); + state.event_handler().log_event(&routing_event); Ok(connectors) } else { Ok(routable_connectors) diff --git a/crates/router/src/core/routing/helpers.rs b/crates/router/src/core/routing/helpers.rs index 7ba06ff5e0..4798cce9b0 100644 --- a/crates/router/src/core/routing/helpers.rs +++ b/crates/router/src/core/routing/helpers.rs @@ -28,6 +28,8 @@ use external_services::grpc_client::dynamic_routing::{ }; #[cfg(all(feature = "v1", feature = "dynamic_routing"))] use hyperswitch_domain_models::api::ApplicationResponse; +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +use hyperswitch_interfaces::events::routing_api_logs as routing_events; #[cfg(all(feature = "dynamic_routing", feature = "v1"))] use router_env::logger; #[cfg(all(feature = "dynamic_routing", feature = "v1"))] @@ -57,6 +59,7 @@ use crate::{ payments::routing::utils::{self as routing_utils, DecisionEngineApiHandler}, routing, }, + routes::app::SessionStateInfo, services, types::transformers::ForeignInto, }; @@ -822,6 +825,7 @@ pub async fn update_gateway_score_helper_with_open_router( state, routable_connector.clone(), profile_id, + &payment_attempt.merchant_id, &payment_attempt.payment_id, payment_attempt.status, ) @@ -1087,23 +1091,80 @@ pub async fn push_metrics_with_update_window_for_success_based_routing( .attach_printable("Unable to push dynamic routing stats to db")?; }; - client + let label_with_status = routing_types::UpdateLabelWithStatusEventRequest { + label: routable_connector.clone().to_string(), + status: payment_status_attribute == common_enums::AttemptStatus::Charged, + }; + let event_request = routing_types::UpdateSuccessRateWindowEventRequest { + id: payment_attempt.profile_id.get_string_repr().to_string(), + params: success_based_routing_config_params.clone(), + labels_with_status: vec![label_with_status.clone()], + global_labels_with_status: vec![label_with_status], + config: success_based_routing_configs.config.as_ref().map(|conf| { + routing_types::UpdateSuccessRateWindowConfig { + max_aggregates_size: conf.max_aggregates_size, + current_block_threshold: conf.current_block_threshold.clone(), + } + }), + }; + + let serialized_request = serde_json::to_value(&event_request) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("unable to serialize success_based_routing_config_params")?; + + let mut routing_event = routing_events::RoutingEvent::new( + state.tenant.tenant_id.clone(), + "".to_string(), + "Intelligent-router UpdateSuccessRateWindow", + serialized_request, + "SuccessRateCalculator.UpdateSuccessRateWindow".to_string(), + routing_events::ApiMethod::Grpc, + payment_attempt.payment_id.get_string_repr().to_string(), + profile_id.to_owned(), + payment_attempt.merchant_id.to_owned(), + state.request_id, + routing_events::RoutingEngine::IntelligentRouter, + ); + + let update_response = client .update_success_rate( profile_id.get_string_repr().into(), success_based_routing_configs, success_based_routing_config_params, vec![routing_types::RoutableConnectorChoiceWithStatus::new( - routable_connector, + routable_connector.clone(), payment_status_attribute == common_enums::AttemptStatus::Charged, )], state.get_grpc_headers(), ) .await + .inspect_err(|e| { + routing_event + .set_error(serde_json::json!({"error": e.current_context().to_string()})); + state.event_handler().log_event(&routing_event); + }) .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable( "unable to update success based routing window in dynamic routing service", )?; + let event_response = routing_types::UpdateSuccessRateWindowEventResponse { + status: match update_response.status { + 0 => routing_types::UpdationStatusEventResponse::WindowUpdationSucceeded, + 1 => routing_types::UpdationStatusEventResponse::WindowUpdationFailed, + _ => { + return Err(errors::ApiErrorResponse::InternalServerError) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("unknown status code from dynamic routing service") + } + }, + }; + + routing_event.set_response_body(&event_response); + routing_event.set_status_code(200); + routing_event.set_payment_connector(routable_connector); + state.event_handler().log_event(&routing_event); + Ok(()) } else { Ok(()) @@ -1171,32 +1232,99 @@ pub async fn update_window_for_elimination_routing( .change_context(errors::ApiErrorResponse::InternalServerError)?, ); - client + let labels_with_bucket_name = + vec![routing_types::RoutableConnectorChoiceWithBucketName::new( + routing_types::RoutableConnectorChoice { + choice_kind: api_models::routing::RoutableChoiceKind::FullStruct, + connector: common_enums::RoutableConnectors::from_str( + payment_connector.as_str(), + ) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("unable to infer routable_connector from connector")?, + merchant_connector_id: payment_attempt.merchant_connector_id.clone(), + }, + gsm_error_category.to_string(), + )]; + + let event_request = routing_types::UpdateEliminationBucketEventRequest { + id: profile_id.get_string_repr().to_string(), + params: elimination_routing_config_params.clone(), + labels_with_bucket_name: labels_with_bucket_name + .iter() + .map( + |conn_choice| routing_types::LabelWithBucketNameEventRequest { + label: conn_choice.routable_connector_choice.to_string(), + bucket_name: conn_choice.bucket_name.clone(), + }, + ) + .collect(), + config: elimination_routing_config + .elimination_analyser_config + .map(|conf| routing_types::EliminationRoutingEventBucketConfig { + bucket_leak_interval_in_secs: conf.bucket_leak_interval_in_secs, + bucket_size: conf.bucket_size, + }), + }; + + let serialized_request = serde_json::to_value(&event_request) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("unable to serialize success_based_routing_config_params")?; + + let mut routing_event = routing_events::RoutingEvent::new( + state.tenant.tenant_id.clone(), + "".to_string(), + "Intelligent-router UpdateEliminationBucket", + serialized_request, + "EliminationAnalyser.UpdateEliminationBucket".to_string(), + routing_events::ApiMethod::Grpc, + payment_attempt.payment_id.get_string_repr().to_string(), + profile_id.to_owned(), + payment_attempt.merchant_id.to_owned(), + state.request_id, + routing_events::RoutingEngine::IntelligentRouter, + ); + + let update_response = client .update_elimination_bucket_config( profile_id.get_string_repr().to_string(), elimination_routing_config_params, - vec![routing_types::RoutableConnectorChoiceWithBucketName::new( - routing_types::RoutableConnectorChoice { - choice_kind: api_models::routing::RoutableChoiceKind::FullStruct, - connector: common_enums::RoutableConnectors::from_str( - payment_connector.as_str(), - ) - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable( - "unable to infer routable_connector from connector", - )?, - merchant_connector_id: payment_attempt.merchant_connector_id.clone(), - }, - gsm_error_category.to_string(), - )], + labels_with_bucket_name, elimination_routing_config.elimination_analyser_config, state.get_grpc_headers(), ) .await + .inspect_err(|e| { + routing_event + .set_error(serde_json::json!({"error": e.current_context().to_string()})); + state.event_handler().log_event(&routing_event); + }) .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable( "unable to update elimination based routing buckets in dynamic routing service", )?; + + let event_response = routing_types::UpdateEliminationBucketEventResponse { + status: match update_response.status { + 0 => routing_types::EliminationUpdationStatusEventResponse::BucketUpdationSucceeded, + 1 => routing_types::EliminationUpdationStatusEventResponse::BucketUpdationFailed, + _ => { + return Err(errors::ApiErrorResponse::InternalServerError) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("unknown status code from dynamic routing service") + } + }, + }; + + routing_event.set_response_body(&event_response); + routing_event.set_status_code(200); + routing_event.set_payment_connector(routing_types::RoutableConnectorChoice { + choice_kind: api_models::routing::RoutableChoiceKind::FullStruct, + connector: common_enums::RoutableConnectors::from_str(payment_connector.as_str()) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("unable to infer routable_connector from connector")?, + merchant_connector_id: payment_attempt.merchant_connector_id.clone(), + }); + state.event_handler().log_event(&routing_event); Ok(()) } else { Ok(()) @@ -1295,7 +1423,36 @@ pub async fn push_metrics_with_update_window_for_contract_based_routing( get_desired_payment_status_for_dynamic_routing_metrics(payment_attempt.status); if payment_status_attribute == common_enums::AttemptStatus::Charged { - client + let event_request = routing_types::UpdateContractRequestEventRequest { + id: profile_id.get_string_repr().to_string(), + params: "".to_string(), + labels_information: vec![routing_types::ContractLabelInformationEventRequest { + label: request_label_info.label.clone(), + target_count: request_label_info.target_count, + target_time: request_label_info.target_time, + current_count: 1, + }], + }; + + let serialized_request = serde_json::to_value(&event_request) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("unable to serialize success_based_routing_config_params")?; + + let mut routing_event = routing_events::RoutingEvent::new( + state.tenant.tenant_id.clone(), + "".to_string(), + "Intelligent-router UpdateContract", + serialized_request, + "ContractScoreCalculator.UpdateContract".to_string(), + routing_events::ApiMethod::Grpc, + payment_attempt.payment_id.get_string_repr().to_string(), + profile_id.to_owned(), + payment_attempt.merchant_id.to_owned(), + state.request_id, + routing_events::RoutingEngine::IntelligentRouter, + ); + + let update_respose = client .update_contracts( profile_id.get_string_repr().into(), vec![request_label_info], @@ -1305,10 +1462,41 @@ pub async fn push_metrics_with_update_window_for_contract_based_routing( state.get_grpc_headers(), ) .await + .inspect_err(|e| { + routing_event.set_error( + serde_json::json!({"error": e.current_context().to_string()}), + ); + state.event_handler().log_event(&routing_event); + }) .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable( "unable to update contract based routing window in dynamic routing service", )?; + + let event_response = routing_types::UpdateContractEventResponse { + status: match update_respose.status { + 0 => routing_types::ContractUpdationStatusEventResponse::ContractUpdationSucceeded, + 1 => routing_types::ContractUpdationStatusEventResponse::ContractUpdationFailed, + _ => { + return Err(errors::ApiErrorResponse::InternalServerError) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("unknown status code from dynamic routing service") + } + }, + }; + + routing_event.set_response_body(&event_response); + routing_event.set_payment_connector(routing_types::RoutableConnectorChoice { + choice_kind: api_models::routing::RoutableChoiceKind::FullStruct, + connector: common_enums::RoutableConnectors::from_str( + final_label_info.label.as_str(), + ) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("unable to infer routable_connector from connector")?, + merchant_connector_id: Some(final_label_info.mca_id.clone()), + }); + routing_event.set_status_code(200); + state.event_handler().log_event(&routing_event); } let contract_based_connectors = routable_connectors diff --git a/crates/router/src/events.rs b/crates/router/src/events.rs index 19aa2cbfaa..7d77df6871 100644 --- a/crates/router/src/events.rs +++ b/crates/router/src/events.rs @@ -21,6 +21,7 @@ pub mod audit_events; pub mod connector_api_logs; pub mod event_logger; pub mod outgoing_webhook_logs; +pub mod routing_api_logs; #[derive(Debug, Serialize, Clone, Copy)] #[serde(rename_all = "snake_case")] pub enum EventType { @@ -37,6 +38,7 @@ pub enum EventType { Payout, Consolidated, Authentication, + RoutingApiLogs, } #[derive(Debug, Default, Deserialize, Clone)] diff --git a/crates/router/src/events/routing_api_logs.rs b/crates/router/src/events/routing_api_logs.rs new file mode 100644 index 0000000000..9edfe0ea00 --- /dev/null +++ b/crates/router/src/events/routing_api_logs.rs @@ -0,0 +1,19 @@ +pub use hyperswitch_interfaces::events::routing_api_logs::RoutingEvent; + +use super::EventType; +use crate::services::kafka::KafkaMessage; + +impl KafkaMessage for RoutingEvent { + fn event_type(&self) -> EventType { + EventType::RoutingApiLogs + } + + fn key(&self) -> String { + format!( + "{}-{}-{}", + self.get_merchant_id(), + self.get_profile_id(), + self.get_payment_id() + ) + } +} diff --git a/crates/router/src/services/kafka.rs b/crates/router/src/services/kafka.rs index 91968510a6..d1ea929cc8 100644 --- a/crates/router/src/services/kafka.rs +++ b/crates/router/src/services/kafka.rs @@ -161,6 +161,7 @@ pub struct KafkaSettings { payout_analytics_topic: String, consolidated_events_topic: String, authentication_analytics_topic: String, + routing_logs_topic: String, } impl KafkaSettings { @@ -248,6 +249,12 @@ impl KafkaSettings { }, )?; + common_utils::fp_utils::when(self.routing_logs_topic.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "Kafka Routing Logs topic must not be empty".into(), + )) + })?; + Ok(()) } } @@ -269,6 +276,7 @@ pub struct KafkaProducer { consolidated_events_topic: String, authentication_analytics_topic: String, ckh_database_name: Option, + routing_logs_topic: String, } struct RdKafkaProducer(ThreadedProducer); @@ -318,6 +326,7 @@ impl KafkaProducer { consolidated_events_topic: conf.consolidated_events_topic.clone(), authentication_analytics_topic: conf.authentication_analytics_topic.clone(), ckh_database_name: None, + routing_logs_topic: conf.routing_logs_topic.clone(), }) } @@ -653,6 +662,7 @@ impl KafkaProducer { EventType::Payout => &self.payout_analytics_topic, EventType::Consolidated => &self.consolidated_events_topic, EventType::Authentication => &self.authentication_analytics_topic, + EventType::RoutingApiLogs => &self.routing_logs_topic, } } }