feat(opensearch): add additional global search filters and create sessionizer indexes for local (#6352)

This commit is contained in:
Sandeep Kumar
2024-10-19 19:27:53 +05:30
committed by GitHub
parent ba6f7a817b
commit 2e6cd6d31e
9 changed files with 211 additions and 9 deletions

View File

@ -728,6 +728,10 @@ payment_attempts = "hyperswitch-payment-attempt-events"
payment_intents = "hyperswitch-payment-intent-events"
refunds = "hyperswitch-refund-events"
disputes = "hyperswitch-dispute-events"
sessionizer_payment_attempts = "sessionizer-payment-attempt-events"
sessionizer_payment_intents = "sessionizer-payment-intent-events"
sessionizer_refunds = "sessionizer-refund-events"
sessionizer_disputes = "sessionizer-dispute-events"
[saved_payment_methods]
sdk_eligible_payment_methods = "card"

View File

@ -252,6 +252,10 @@ payment_attempts = "hyperswitch-payment-attempt-events"
payment_intents = "hyperswitch-payment-intent-events"
refunds = "hyperswitch-refund-events"
disputes = "hyperswitch-dispute-events"
sessionizer_payment_attempts = "sessionizer-payment-attempt-events"
sessionizer_payment_intents = "sessionizer-payment-intent-events"
sessionizer_refunds = "sessionizer-refund-events"
sessionizer_disputes = "sessionizer-dispute-events"
# Configuration for the Key Manager Service
[key_manager]

View File

@ -734,6 +734,10 @@ payment_attempts = "hyperswitch-payment-attempt-events"
payment_intents = "hyperswitch-payment-intent-events"
refunds = "hyperswitch-refund-events"
disputes = "hyperswitch-dispute-events"
sessionizer_payment_attempts = "sessionizer-payment-attempt-events"
sessionizer_payment_intents = "sessionizer-payment-intent-events"
sessionizer_refunds = "sessionizer-refund-events"
sessionizer_disputes = "sessionizer-dispute-events"
[saved_payment_methods]
sdk_eligible_payment_methods = "card"

View File

@ -562,6 +562,10 @@ payment_attempts = "hyperswitch-payment-attempt-events"
payment_intents = "hyperswitch-payment-intent-events"
refunds = "hyperswitch-refund-events"
disputes = "hyperswitch-dispute-events"
sessionizer_payment_attempts = "sessionizer-payment-attempt-events"
sessionizer_payment_intents = "sessionizer-payment-intent-events"
sessionizer_refunds = "sessionizer-refund-events"
sessionizer_disputes = "sessionizer-dispute-events"
[saved_payment_methods]
sdk_eligible_payment_methods = "card"

View File

@ -18,6 +18,15 @@ sources:
decoding:
codec: json
sessionized_kafka_tx_events:
type: kafka
bootstrap_servers: kafka0:29092
group_id: sessionizer
topics:
- ^sessionizer
decoding:
codec: json
app_logs:
type: docker_logs
include_labels:
@ -35,10 +44,19 @@ sources:
encoding: json
transforms:
events_create_ts:
inputs:
- kafka_tx_events
source: |-
.timestamp = from_unix_timestamp(.created_at, unit: "seconds") ?? now()
."@timestamp" = from_unix_timestamp(.created_at, unit: "seconds") ?? now()
type: remap
plus_1_events:
type: filter
inputs:
- kafka_tx_events
- events_create_ts
- sessionized_events_create_ts
condition: ".sign_flag == 1"
hs_server_logs:
@ -54,13 +72,13 @@ transforms:
source: |-
.message = parse_json!(.message)
events:
sessionized_events_create_ts:
type: remap
inputs:
- plus_1_events
- sessionized_kafka_tx_events
source: |-
.timestamp = from_unix_timestamp!(.created_at, unit: "seconds")
."@timestamp" = from_unix_timestamp(.created_at, unit: "seconds") ?? now()
.timestamp = from_unix_timestamp(.created_at, unit: "milliseconds") ?? now()
."@timestamp" = from_unix_timestamp(.created_at, unit: "milliseconds") ?? now()
sdk_transformed:
type: throttle
@ -74,7 +92,7 @@ sinks:
opensearch_events_1:
type: elasticsearch
inputs:
- events
- plus_1_events
endpoints:
- "https://opensearch:9200"
id_key: message_key
@ -98,7 +116,7 @@ sinks:
opensearch_events_2:
type: elasticsearch
inputs:
- events
- plus_1_events
endpoints:
- "https://opensearch:9200"
id_key: message_key
@ -120,6 +138,33 @@ sinks:
# Add a date suffixed index for better grouping
index: "vector-{{ .topic }}-%Y-%m-%d"
opensearch_events_3:
type: elasticsearch
inputs:
- plus_1_events
endpoints:
- "https://opensearch:9200"
id_key: message_key
api_version: v7
tls:
verify_certificate: false
verify_hostname: false
auth:
strategy: basic
user: admin
password: 0penS3arc#
encoding:
except_fields:
- message_key
- offset
- partition
- topic
- clickhouse_database
- last_synced
- sign_flag
bulk:
index: "{{ .topic }}"
opensearch_logs:
type: elasticsearch
inputs:
@ -143,6 +188,7 @@ sinks:
type: loki
inputs:
- kafka_tx_events
- sessionized_kafka_tx_events
endpoint: http://loki:3100
labels:
source: vector

View File

@ -42,6 +42,10 @@ pub struct OpenSearchIndexes {
pub payment_intents: String,
pub refunds: String,
pub disputes: String,
pub sessionizer_payment_attempts: String,
pub sessionizer_payment_intents: String,
pub sessionizer_refunds: String,
pub sessionizer_disputes: String,
}
#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, Hash)]
@ -81,6 +85,10 @@ impl Default for OpenSearchConfig {
payment_intents: "hyperswitch-payment-intent-events".to_string(),
refunds: "hyperswitch-refund-events".to_string(),
disputes: "hyperswitch-dispute-events".to_string(),
sessionizer_payment_attempts: "sessionizer-payment-attempt-events".to_string(),
sessionizer_payment_intents: "sessionizer-payment-intent-events".to_string(),
sessionizer_refunds: "sessionizer-refund-events".to_string(),
sessionizer_disputes: "sessionizer-dispute-events".to_string(),
},
}
}
@ -219,6 +227,14 @@ impl OpenSearchClient {
SearchIndex::PaymentIntents => self.indexes.payment_intents.clone(),
SearchIndex::Refunds => self.indexes.refunds.clone(),
SearchIndex::Disputes => self.indexes.disputes.clone(),
SearchIndex::SessionizerPaymentAttempts => {
self.indexes.sessionizer_payment_attempts.clone()
}
SearchIndex::SessionizerPaymentIntents => {
self.indexes.sessionizer_payment_intents.clone()
}
SearchIndex::SessionizerRefunds => self.indexes.sessionizer_refunds.clone(),
SearchIndex::SessionizerDisputes => self.indexes.sessionizer_disputes.clone(),
}
}
@ -324,6 +340,36 @@ impl OpenSearchIndexes {
))
})?;
when(
self.sessionizer_payment_attempts.is_default_or_empty(),
|| {
Err(ApplicationError::InvalidConfigurationValueError(
"Opensearch Sessionizer Payment Attempts index must not be empty".into(),
))
},
)?;
when(
self.sessionizer_payment_intents.is_default_or_empty(),
|| {
Err(ApplicationError::InvalidConfigurationValueError(
"Opensearch Sessionizer Payment Intents index must not be empty".into(),
))
},
)?;
when(self.sessionizer_refunds.is_default_or_empty(), || {
Err(ApplicationError::InvalidConfigurationValueError(
"Opensearch Sessionizer Refunds index must not be empty".into(),
))
})?;
when(self.sessionizer_disputes.is_default_or_empty(), || {
Err(ApplicationError::InvalidConfigurationValueError(
"Opensearch Sessionizer Disputes index must not be empty".into(),
))
})?;
Ok(())
}
}

View File

@ -92,6 +92,44 @@ pub async fn msearch_results(
.switch()?;
}
};
if let Some(connector) = filters.connector {
if !connector.is_empty() {
query_builder
.add_filter_clause("connector.keyword".to_string(), connector.clone())
.switch()?;
}
};
if let Some(payment_method_type) = filters.payment_method_type {
if !payment_method_type.is_empty() {
query_builder
.add_filter_clause(
"payment_method_type.keyword".to_string(),
payment_method_type.clone(),
)
.switch()?;
}
};
if let Some(card_network) = filters.card_network {
if !card_network.is_empty() {
query_builder
.add_filter_clause("card_network.keyword".to_string(), card_network.clone())
.switch()?;
}
};
if let Some(card_last_4) = filters.card_last_4 {
if !card_last_4.is_empty() {
query_builder
.add_filter_clause("card_last_4.keyword".to_string(), card_last_4.clone())
.switch()?;
}
};
if let Some(payment_id) = filters.payment_id {
if !payment_id.is_empty() {
query_builder
.add_filter_clause("payment_id.keyword".to_string(), payment_id.clone())
.switch()?;
}
};
};
if let Some(time_range) = req.time_range {
@ -217,6 +255,44 @@ pub async fn search_results(
.switch()?;
}
};
if let Some(connector) = filters.connector {
if !connector.is_empty() {
query_builder
.add_filter_clause("connector.keyword".to_string(), connector.clone())
.switch()?;
}
};
if let Some(payment_method_type) = filters.payment_method_type {
if !payment_method_type.is_empty() {
query_builder
.add_filter_clause(
"payment_method_type.keyword".to_string(),
payment_method_type.clone(),
)
.switch()?;
}
};
if let Some(card_network) = filters.card_network {
if !card_network.is_empty() {
query_builder
.add_filter_clause("card_network.keyword".to_string(), card_network.clone())
.switch()?;
}
};
if let Some(card_last_4) = filters.card_last_4 {
if !card_last_4.is_empty() {
query_builder
.add_filter_clause("card_last_4.keyword".to_string(), card_last_4.clone())
.switch()?;
}
};
if let Some(payment_id) = filters.payment_id {
if !payment_id.is_empty() {
query_builder
.add_filter_clause("payment_id.keyword".to_string(), payment_id.clone())
.switch()?;
}
};
};
if let Some(time_range) = search_req.time_range {

View File

@ -9,6 +9,11 @@ pub struct SearchFilters {
pub status: Option<Vec<String>>,
pub customer_email: Option<Vec<HashedString<common_utils::pii::EmailStrategy>>>,
pub search_tags: Option<Vec<HashedString<WithType>>>,
pub connector: Option<Vec<String>>,
pub payment_method_type: Option<Vec<String>>,
pub card_network: Option<Vec<String>>,
pub card_last_4: Option<Vec<String>>,
pub payment_id: Option<Vec<String>>,
}
impl SearchFilters {
pub fn is_all_none(&self) -> bool {
@ -17,6 +22,11 @@ impl SearchFilters {
&& self.status.is_none()
&& self.customer_email.is_none()
&& self.search_tags.is_none()
&& self.connector.is_none()
&& self.payment_method_type.is_none()
&& self.card_network.is_none()
&& self.card_last_4.is_none()
&& self.payment_id.is_none()
}
}
@ -58,6 +68,10 @@ pub enum SearchIndex {
PaymentIntents,
Refunds,
Disputes,
SessionizerPaymentAttempts,
SessionizerPaymentIntents,
SessionizerRefunds,
SessionizerDisputes,
}
#[derive(Debug, strum::EnumIter, Clone, serde::Deserialize, serde::Serialize, Copy)]

View File

@ -1,12 +1,16 @@
use api_models::analytics::search::SearchIndex;
pub const fn get_search_indexes() -> [SearchIndex; 4] {
pub const fn get_search_indexes() -> [SearchIndex; 8] {
[
SearchIndex::PaymentAttempts,
SearchIndex::PaymentIntents,
SearchIndex::Refunds,
SearchIndex::Disputes,
SearchIndex::SessionizerPaymentAttempts,
SearchIndex::SessionizerPaymentIntents,
SearchIndex::SessionizerRefunds,
SearchIndex::SessionizerDisputes,
]
}
pub const SEARCH_INDEXES: [SearchIndex; 4] = get_search_indexes();
pub const SEARCH_INDEXES: [SearchIndex; 8] = get_search_indexes();