From 2e6cd6d31e4e3168b97427de936724de94df6415 Mon Sep 17 00:00:00 2001 From: Sandeep Kumar <83278309+tsdk02@users.noreply.github.com> Date: Sat, 19 Oct 2024 19:27:53 +0530 Subject: [PATCH] feat(opensearch): add additional global search filters and create sessionizer indexes for local (#6352) --- config/config.example.toml | 4 ++ config/deployments/env_specific.toml | 4 ++ config/development.toml | 4 ++ config/docker_compose.toml | 4 ++ config/vector.yaml | 60 +++++++++++++++--- crates/analytics/src/opensearch.rs | 46 ++++++++++++++ crates/analytics/src/search.rs | 76 +++++++++++++++++++++++ crates/api_models/src/analytics/search.rs | 14 +++++ crates/router/src/consts/opensearch.rs | 8 ++- 9 files changed, 211 insertions(+), 9 deletions(-) diff --git a/config/config.example.toml b/config/config.example.toml index c0358d91a2..8f04ba4983 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -728,6 +728,10 @@ payment_attempts = "hyperswitch-payment-attempt-events" payment_intents = "hyperswitch-payment-intent-events" refunds = "hyperswitch-refund-events" disputes = "hyperswitch-dispute-events" +sessionizer_payment_attempts = "sessionizer-payment-attempt-events" +sessionizer_payment_intents = "sessionizer-payment-intent-events" +sessionizer_refunds = "sessionizer-refund-events" +sessionizer_disputes = "sessionizer-dispute-events" [saved_payment_methods] sdk_eligible_payment_methods = "card" diff --git a/config/deployments/env_specific.toml b/config/deployments/env_specific.toml index 5dce5be9c9..cfefe9a130 100644 --- a/config/deployments/env_specific.toml +++ b/config/deployments/env_specific.toml @@ -252,6 +252,10 @@ payment_attempts = "hyperswitch-payment-attempt-events" payment_intents = "hyperswitch-payment-intent-events" refunds = "hyperswitch-refund-events" disputes = "hyperswitch-dispute-events" +sessionizer_payment_attempts = "sessionizer-payment-attempt-events" +sessionizer_payment_intents = "sessionizer-payment-intent-events" +sessionizer_refunds = "sessionizer-refund-events" +sessionizer_disputes = "sessionizer-dispute-events" # Configuration for the Key Manager Service [key_manager] diff --git a/config/development.toml b/config/development.toml index 4c2f1cf718..8e5d9582e2 100644 --- a/config/development.toml +++ b/config/development.toml @@ -734,6 +734,10 @@ payment_attempts = "hyperswitch-payment-attempt-events" payment_intents = "hyperswitch-payment-intent-events" refunds = "hyperswitch-refund-events" disputes = "hyperswitch-dispute-events" +sessionizer_payment_attempts = "sessionizer-payment-attempt-events" +sessionizer_payment_intents = "sessionizer-payment-intent-events" +sessionizer_refunds = "sessionizer-refund-events" +sessionizer_disputes = "sessionizer-dispute-events" [saved_payment_methods] sdk_eligible_payment_methods = "card" diff --git a/config/docker_compose.toml b/config/docker_compose.toml index d55e722b3e..dde0902af9 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -562,6 +562,10 @@ payment_attempts = "hyperswitch-payment-attempt-events" payment_intents = "hyperswitch-payment-intent-events" refunds = "hyperswitch-refund-events" disputes = "hyperswitch-dispute-events" +sessionizer_payment_attempts = "sessionizer-payment-attempt-events" +sessionizer_payment_intents = "sessionizer-payment-intent-events" +sessionizer_refunds = "sessionizer-refund-events" +sessionizer_disputes = "sessionizer-dispute-events" [saved_payment_methods] sdk_eligible_payment_methods = "card" diff --git a/config/vector.yaml b/config/vector.yaml index 54ff25cab5..3f0709ae03 100644 --- a/config/vector.yaml +++ b/config/vector.yaml @@ -18,6 +18,15 @@ sources: decoding: codec: json + sessionized_kafka_tx_events: + type: kafka + bootstrap_servers: kafka0:29092 + group_id: sessionizer + topics: + - ^sessionizer + decoding: + codec: json + app_logs: type: docker_logs include_labels: @@ -35,10 +44,19 @@ sources: encoding: json transforms: + events_create_ts: + inputs: + - kafka_tx_events + source: |- + .timestamp = from_unix_timestamp(.created_at, unit: "seconds") ?? now() + ."@timestamp" = from_unix_timestamp(.created_at, unit: "seconds") ?? now() + type: remap + plus_1_events: type: filter inputs: - - kafka_tx_events + - events_create_ts + - sessionized_events_create_ts condition: ".sign_flag == 1" hs_server_logs: @@ -54,13 +72,13 @@ transforms: source: |- .message = parse_json!(.message) - events: + sessionized_events_create_ts: type: remap inputs: - - plus_1_events + - sessionized_kafka_tx_events source: |- - .timestamp = from_unix_timestamp!(.created_at, unit: "seconds") - ."@timestamp" = from_unix_timestamp(.created_at, unit: "seconds") ?? now() + .timestamp = from_unix_timestamp(.created_at, unit: "milliseconds") ?? now() + ."@timestamp" = from_unix_timestamp(.created_at, unit: "milliseconds") ?? now() sdk_transformed: type: throttle @@ -74,7 +92,7 @@ sinks: opensearch_events_1: type: elasticsearch inputs: - - events + - plus_1_events endpoints: - "https://opensearch:9200" id_key: message_key @@ -98,7 +116,7 @@ sinks: opensearch_events_2: type: elasticsearch inputs: - - events + - plus_1_events endpoints: - "https://opensearch:9200" id_key: message_key @@ -120,6 +138,33 @@ sinks: # Add a date suffixed index for better grouping index: "vector-{{ .topic }}-%Y-%m-%d" + opensearch_events_3: + type: elasticsearch + inputs: + - plus_1_events + endpoints: + - "https://opensearch:9200" + id_key: message_key + api_version: v7 + tls: + verify_certificate: false + verify_hostname: false + auth: + strategy: basic + user: admin + password: 0penS3arc# + encoding: + except_fields: + - message_key + - offset + - partition + - topic + - clickhouse_database + - last_synced + - sign_flag + bulk: + index: "{{ .topic }}" + opensearch_logs: type: elasticsearch inputs: @@ -143,6 +188,7 @@ sinks: type: loki inputs: - kafka_tx_events + - sessionized_kafka_tx_events endpoint: http://loki:3100 labels: source: vector diff --git a/crates/analytics/src/opensearch.rs b/crates/analytics/src/opensearch.rs index 3be9688d8f..a6e6c486eb 100644 --- a/crates/analytics/src/opensearch.rs +++ b/crates/analytics/src/opensearch.rs @@ -42,6 +42,10 @@ pub struct OpenSearchIndexes { pub payment_intents: String, pub refunds: String, pub disputes: String, + pub sessionizer_payment_attempts: String, + pub sessionizer_payment_intents: String, + pub sessionizer_refunds: String, + pub sessionizer_disputes: String, } #[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, Hash)] @@ -81,6 +85,10 @@ impl Default for OpenSearchConfig { payment_intents: "hyperswitch-payment-intent-events".to_string(), refunds: "hyperswitch-refund-events".to_string(), disputes: "hyperswitch-dispute-events".to_string(), + sessionizer_payment_attempts: "sessionizer-payment-attempt-events".to_string(), + sessionizer_payment_intents: "sessionizer-payment-intent-events".to_string(), + sessionizer_refunds: "sessionizer-refund-events".to_string(), + sessionizer_disputes: "sessionizer-dispute-events".to_string(), }, } } @@ -219,6 +227,14 @@ impl OpenSearchClient { SearchIndex::PaymentIntents => self.indexes.payment_intents.clone(), SearchIndex::Refunds => self.indexes.refunds.clone(), SearchIndex::Disputes => self.indexes.disputes.clone(), + SearchIndex::SessionizerPaymentAttempts => { + self.indexes.sessionizer_payment_attempts.clone() + } + SearchIndex::SessionizerPaymentIntents => { + self.indexes.sessionizer_payment_intents.clone() + } + SearchIndex::SessionizerRefunds => self.indexes.sessionizer_refunds.clone(), + SearchIndex::SessionizerDisputes => self.indexes.sessionizer_disputes.clone(), } } @@ -324,6 +340,36 @@ impl OpenSearchIndexes { )) })?; + when( + self.sessionizer_payment_attempts.is_default_or_empty(), + || { + Err(ApplicationError::InvalidConfigurationValueError( + "Opensearch Sessionizer Payment Attempts index must not be empty".into(), + )) + }, + )?; + + when( + self.sessionizer_payment_intents.is_default_or_empty(), + || { + Err(ApplicationError::InvalidConfigurationValueError( + "Opensearch Sessionizer Payment Intents index must not be empty".into(), + )) + }, + )?; + + when(self.sessionizer_refunds.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "Opensearch Sessionizer Refunds index must not be empty".into(), + )) + })?; + + when(self.sessionizer_disputes.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "Opensearch Sessionizer Disputes index must not be empty".into(), + )) + })?; + Ok(()) } } diff --git a/crates/analytics/src/search.rs b/crates/analytics/src/search.rs index c81ff2f416..f53b07b123 100644 --- a/crates/analytics/src/search.rs +++ b/crates/analytics/src/search.rs @@ -92,6 +92,44 @@ pub async fn msearch_results( .switch()?; } }; + if let Some(connector) = filters.connector { + if !connector.is_empty() { + query_builder + .add_filter_clause("connector.keyword".to_string(), connector.clone()) + .switch()?; + } + }; + if let Some(payment_method_type) = filters.payment_method_type { + if !payment_method_type.is_empty() { + query_builder + .add_filter_clause( + "payment_method_type.keyword".to_string(), + payment_method_type.clone(), + ) + .switch()?; + } + }; + if let Some(card_network) = filters.card_network { + if !card_network.is_empty() { + query_builder + .add_filter_clause("card_network.keyword".to_string(), card_network.clone()) + .switch()?; + } + }; + if let Some(card_last_4) = filters.card_last_4 { + if !card_last_4.is_empty() { + query_builder + .add_filter_clause("card_last_4.keyword".to_string(), card_last_4.clone()) + .switch()?; + } + }; + if let Some(payment_id) = filters.payment_id { + if !payment_id.is_empty() { + query_builder + .add_filter_clause("payment_id.keyword".to_string(), payment_id.clone()) + .switch()?; + } + }; }; if let Some(time_range) = req.time_range { @@ -217,6 +255,44 @@ pub async fn search_results( .switch()?; } }; + if let Some(connector) = filters.connector { + if !connector.is_empty() { + query_builder + .add_filter_clause("connector.keyword".to_string(), connector.clone()) + .switch()?; + } + }; + if let Some(payment_method_type) = filters.payment_method_type { + if !payment_method_type.is_empty() { + query_builder + .add_filter_clause( + "payment_method_type.keyword".to_string(), + payment_method_type.clone(), + ) + .switch()?; + } + }; + if let Some(card_network) = filters.card_network { + if !card_network.is_empty() { + query_builder + .add_filter_clause("card_network.keyword".to_string(), card_network.clone()) + .switch()?; + } + }; + if let Some(card_last_4) = filters.card_last_4 { + if !card_last_4.is_empty() { + query_builder + .add_filter_clause("card_last_4.keyword".to_string(), card_last_4.clone()) + .switch()?; + } + }; + if let Some(payment_id) = filters.payment_id { + if !payment_id.is_empty() { + query_builder + .add_filter_clause("payment_id.keyword".to_string(), payment_id.clone()) + .switch()?; + } + }; }; if let Some(time_range) = search_req.time_range { diff --git a/crates/api_models/src/analytics/search.rs b/crates/api_models/src/analytics/search.rs index 24dd0effcb..a33dd100c7 100644 --- a/crates/api_models/src/analytics/search.rs +++ b/crates/api_models/src/analytics/search.rs @@ -9,6 +9,11 @@ pub struct SearchFilters { pub status: Option>, pub customer_email: Option>>, pub search_tags: Option>>, + pub connector: Option>, + pub payment_method_type: Option>, + pub card_network: Option>, + pub card_last_4: Option>, + pub payment_id: Option>, } impl SearchFilters { pub fn is_all_none(&self) -> bool { @@ -17,6 +22,11 @@ impl SearchFilters { && self.status.is_none() && self.customer_email.is_none() && self.search_tags.is_none() + && self.connector.is_none() + && self.payment_method_type.is_none() + && self.card_network.is_none() + && self.card_last_4.is_none() + && self.payment_id.is_none() } } @@ -58,6 +68,10 @@ pub enum SearchIndex { PaymentIntents, Refunds, Disputes, + SessionizerPaymentAttempts, + SessionizerPaymentIntents, + SessionizerRefunds, + SessionizerDisputes, } #[derive(Debug, strum::EnumIter, Clone, serde::Deserialize, serde::Serialize, Copy)] diff --git a/crates/router/src/consts/opensearch.rs b/crates/router/src/consts/opensearch.rs index 277b0e946b..c9eeec1b34 100644 --- a/crates/router/src/consts/opensearch.rs +++ b/crates/router/src/consts/opensearch.rs @@ -1,12 +1,16 @@ use api_models::analytics::search::SearchIndex; -pub const fn get_search_indexes() -> [SearchIndex; 4] { +pub const fn get_search_indexes() -> [SearchIndex; 8] { [ SearchIndex::PaymentAttempts, SearchIndex::PaymentIntents, SearchIndex::Refunds, SearchIndex::Disputes, + SearchIndex::SessionizerPaymentAttempts, + SearchIndex::SessionizerPaymentIntents, + SearchIndex::SessionizerRefunds, + SearchIndex::SessionizerDisputes, ] } -pub const SEARCH_INDEXES: [SearchIndex; 4] = get_search_indexes(); +pub const SEARCH_INDEXES: [SearchIndex; 8] = get_search_indexes();