diff --git a/Cargo.lock b/Cargo.lock index b4819f9f37..635edcc995 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -337,6 +337,7 @@ dependencies = [ "aws-smithy-types 1.1.8", "bigdecimal", "common_utils", + "data_models", "diesel_models", "error-stack", "external_services", diff --git a/crates/analytics/Cargo.toml b/crates/analytics/Cargo.toml index c3e35519dc..b6cd29bade 100644 --- a/crates/analytics/Cargo.toml +++ b/crates/analytics/Cargo.toml @@ -11,14 +11,22 @@ license.workspace = true [dependencies] # First party crates -api_models = { version = "0.1.0", path = "../api_models", features = ["errors"] } +api_models = { version = "0.1.0", path = "../api_models", features = [ + "errors", +] } storage_impl = { version = "0.1.0", path = "../storage_impl", default-features = false } common_utils = { version = "0.1.0", path = "../common_utils" } external_services = { version = "0.1.0", path = "../external_services", default-features = false } hyperswitch_interfaces = { version = "0.1.0", path = "../hyperswitch_interfaces" } masking = { version = "0.1.0", path = "../masking" } -router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] } -diesel_models = { version = "0.1.0", path = "../diesel_models", features = ["kv_store"] } +router_env = { version = "0.1.0", path = "../router_env", features = [ + "log_extra_implicit_fields", + "log_custom_entries_to_extra", +] } +diesel_models = { version = "0.1.0", path = "../diesel_models", features = [ + "kv_store", +] } +data_models = { version = "0.1.0", path = "../data_models", default-features = false } #Third Party dependencies actix-web = "4.5.1" @@ -34,7 +42,13 @@ once_cell = "1.19.0" reqwest = { version = "0.11.27", features = ["serde_json"] } serde = { version = "1.0.197", features = ["derive", "rc"] } serde_json = "1.0.115" -sqlx = { version = "0.7.3", features = ["postgres", "runtime-tokio", "runtime-tokio-native-tls", "time", "bigdecimal"] } +sqlx = { version = "0.7.3", features = [ + "postgres", + "runtime-tokio", + "runtime-tokio-native-tls", + "time", + "bigdecimal", +] } strum = { version = "0.26.2", features = ["derive"] } thiserror = "1.0.58" time = { version = "0.3.35", features = ["serde", "serde-well-known", "std"] } diff --git a/crates/analytics/src/lib.rs b/crates/analytics/src/lib.rs index eb08d8549d..2a7075a0f2 100644 --- a/crates/analytics/src/lib.rs +++ b/crates/analytics/src/lib.rs @@ -10,6 +10,7 @@ pub mod refunds; pub mod api_event; pub mod connector_events; pub mod health_check; +pub mod opensearch; pub mod outgoing_webhook_event; pub mod sdk_events; pub mod search; @@ -668,47 +669,6 @@ pub struct ReportConfig { pub region: String, } -#[derive(Clone, Debug, serde::Deserialize)] -#[serde(tag = "auth")] -#[serde(rename_all = "lowercase")] -pub enum OpensearchAuth { - Basic { username: String, password: String }, - Aws { region: String }, -} - -#[derive(Clone, Debug, serde::Deserialize)] -pub struct OpensearchIndexes { - pub payment_attempts: String, - pub payment_intents: String, - pub refunds: String, - pub disputes: String, -} - -#[derive(Clone, Debug, serde::Deserialize)] -pub struct OpensearchConfig { - host: String, - auth: OpensearchAuth, - indexes: OpensearchIndexes, -} - -impl Default for OpensearchConfig { - fn default() -> Self { - Self { - host: "https://localhost:9200".to_string(), - auth: OpensearchAuth::Basic { - username: "admin".to_string(), - password: "admin".to_string(), - }, - indexes: OpensearchIndexes { - payment_attempts: "hyperswitch-payment-attempt-events".to_string(), - payment_intents: "hyperswitch-payment-intent-events".to_string(), - refunds: "hyperswitch-refund-events".to_string(), - disputes: "hyperswitch-dispute-events".to_string(), - }, - } - } -} - /// Analytics Flow routes Enums /// Info - Dimensions and filters available for the domain /// Filters - Set of values present for the dimension diff --git a/crates/analytics/src/opensearch.rs b/crates/analytics/src/opensearch.rs new file mode 100644 index 0000000000..58569338cf --- /dev/null +++ b/crates/analytics/src/opensearch.rs @@ -0,0 +1,397 @@ +use api_models::{ + analytics::search::SearchIndex, + errors::types::{ApiError, ApiErrorResponse}, +}; +use aws_config::{self, meta::region::RegionProviderChain, Region}; +use common_utils::errors::{CustomResult, ErrorSwitch}; +use data_models::errors::{StorageError, StorageResult}; +use error_stack::ResultExt; +use opensearch::{ + auth::Credentials, + cert::CertificateValidation, + cluster::{Cluster, ClusterHealthParts}, + http::{ + request::JsonBody, + response::Response, + transport::{SingleNodeConnectionPool, Transport, TransportBuilder}, + Url, + }, + MsearchParts, OpenSearch, SearchParts, +}; +use serde_json::{json, Value}; +use storage_impl::errors::ApplicationError; +use strum::IntoEnumIterator; + +use super::{health_check::HealthCheck, query::QueryResult, types::QueryExecutionError}; +use crate::query::QueryBuildingError; + +#[derive(Clone, Debug, serde::Deserialize)] +#[serde(tag = "auth")] +#[serde(rename_all = "lowercase")] +pub enum OpenSearchAuth { + Basic { username: String, password: String }, + Aws { region: String }, +} + +#[derive(Clone, Debug, serde::Deserialize)] +pub struct OpenSearchIndexes { + pub payment_attempts: String, + pub payment_intents: String, + pub refunds: String, + pub disputes: String, +} + +#[derive(Clone, Debug, serde::Deserialize)] +pub struct OpenSearchConfig { + host: String, + auth: OpenSearchAuth, + indexes: OpenSearchIndexes, +} + +impl Default for OpenSearchConfig { + fn default() -> Self { + Self { + host: "https://localhost:9200".to_string(), + auth: OpenSearchAuth::Basic { + username: "admin".to_string(), + password: "admin".to_string(), + }, + indexes: OpenSearchIndexes { + payment_attempts: "hyperswitch-payment-attempt-events".to_string(), + payment_intents: "hyperswitch-payment-intent-events".to_string(), + refunds: "hyperswitch-refund-events".to_string(), + disputes: "hyperswitch-dispute-events".to_string(), + }, + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum OpenSearchError { + #[error("Opensearch connection error")] + ConnectionError, + #[error("Opensearch NON-200 response content: '{0}'")] + ResponseNotOK(String), + #[error("Opensearch response error")] + ResponseError, + #[error("Opensearch query building error")] + QueryBuildingError, +} + +impl ErrorSwitch for QueryBuildingError { + fn switch(&self) -> OpenSearchError { + OpenSearchError::QueryBuildingError + } +} + +impl ErrorSwitch for OpenSearchError { + fn switch(&self) -> ApiErrorResponse { + match self { + Self::ConnectionError => ApiErrorResponse::InternalServerError(ApiError::new( + "IR", + 0, + "Connection error", + None, + )), + Self::ResponseNotOK(response) => ApiErrorResponse::InternalServerError(ApiError::new( + "IR", + 0, + format!("Something went wrong {}", response), + None, + )), + Self::ResponseError => ApiErrorResponse::InternalServerError(ApiError::new( + "IR", + 0, + "Something went wrong", + None, + )), + Self::QueryBuildingError => ApiErrorResponse::InternalServerError(ApiError::new( + "IR", + 0, + "Query building error", + None, + )), + } + } +} + +#[derive(Clone, Debug)] +pub struct OpenSearchClient { + pub client: OpenSearch, + pub transport: Transport, + pub indexes: OpenSearchIndexes, +} + +impl OpenSearchClient { + pub async fn create(conf: &OpenSearchConfig) -> CustomResult { + let url = Url::parse(&conf.host).map_err(|_| OpenSearchError::ConnectionError)?; + let transport = match &conf.auth { + OpenSearchAuth::Basic { username, password } => { + let credentials = Credentials::Basic(username.clone(), password.clone()); + TransportBuilder::new(SingleNodeConnectionPool::new(url)) + .cert_validation(CertificateValidation::None) + .auth(credentials) + .build() + .map_err(|_| OpenSearchError::ConnectionError)? + } + OpenSearchAuth::Aws { region } => { + let region_provider = RegionProviderChain::first_try(Region::new(region.clone())); + let sdk_config = aws_config::from_env().region(region_provider).load().await; + let conn_pool = SingleNodeConnectionPool::new(url); + TransportBuilder::new(conn_pool) + .auth( + sdk_config + .clone() + .try_into() + .map_err(|_| OpenSearchError::ConnectionError)?, + ) + .service_name("es") + .build() + .map_err(|_| OpenSearchError::ConnectionError)? + } + }; + Ok(Self { + transport: transport.clone(), + client: OpenSearch::new(transport), + indexes: conf.indexes.clone(), + }) + } + + pub fn search_index_to_opensearch_index(&self, index: SearchIndex) -> String { + match index { + SearchIndex::PaymentAttempts => self.indexes.payment_attempts.clone(), + SearchIndex::PaymentIntents => self.indexes.payment_intents.clone(), + SearchIndex::Refunds => self.indexes.refunds.clone(), + SearchIndex::Disputes => self.indexes.disputes.clone(), + } + } + + pub async fn execute( + &self, + query_builder: OpenSearchQueryBuilder, + ) -> CustomResult { + match query_builder.query_type { + OpenSearchQuery::Msearch => { + let search_indexes = SearchIndex::iter(); + + let payload = query_builder + .construct_payload(search_indexes.clone().collect()) + .change_context(OpenSearchError::QueryBuildingError)?; + + let payload_with_indexes = payload.into_iter().zip(search_indexes).fold( + Vec::new(), + |mut payload_with_indexes, (index_hit, index)| { + payload_with_indexes.push( + json!({"index": self.search_index_to_opensearch_index(index)}).into(), + ); + payload_with_indexes.push(JsonBody::new(index_hit.clone())); + payload_with_indexes + }, + ); + + self.client + .msearch(MsearchParts::None) + .body(payload_with_indexes) + .send() + .await + .change_context(OpenSearchError::ResponseError) + } + OpenSearchQuery::Search(index) => { + let payload = query_builder + .clone() + .construct_payload(vec![index]) + .change_context(OpenSearchError::QueryBuildingError)?; + + let final_payload = payload.first().unwrap_or(&Value::Null); + + self.client + .search(SearchParts::Index(&[ + &self.search_index_to_opensearch_index(index) + ])) + .from(query_builder.offset.unwrap_or(0)) + .size(query_builder.count.unwrap_or(10)) + .body(final_payload) + .send() + .await + .change_context(OpenSearchError::ResponseError) + } + } + } +} + +#[async_trait::async_trait] +impl HealthCheck for OpenSearchClient { + async fn deep_health_check(&self) -> CustomResult<(), QueryExecutionError> { + let health = Cluster::new(&self.transport) + .health(ClusterHealthParts::None) + .send() + .await + .change_context(QueryExecutionError::DatabaseError)? + .json::() + .await + .change_context(QueryExecutionError::DatabaseError)?; + + if health.status != OpenSearchHealthStatus::Red { + Ok(()) + } else { + Err(QueryExecutionError::DatabaseError.into()) + } + } +} + +impl OpenSearchIndexes { + pub fn validate(&self) -> Result<(), ApplicationError> { + use common_utils::{ext_traits::ConfigExt, fp_utils::when}; + + when(self.payment_attempts.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "Opensearch Payment Attempts index must not be empty".into(), + )) + })?; + + when(self.payment_intents.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "Opensearch Payment Intents index must not be empty".into(), + )) + })?; + + when(self.refunds.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "Opensearch Refunds index must not be empty".into(), + )) + })?; + + when(self.disputes.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "Opensearch Disputes index must not be empty".into(), + )) + })?; + + Ok(()) + } +} + +impl OpenSearchAuth { + pub fn validate(&self) -> Result<(), ApplicationError> { + use common_utils::{ext_traits::ConfigExt, fp_utils::when}; + + match self { + Self::Basic { username, password } => { + when(username.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "Opensearch Basic auth username must not be empty".into(), + )) + })?; + + when(password.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "Opensearch Basic auth password must not be empty".into(), + )) + })?; + } + + Self::Aws { region } => { + when(region.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "Opensearch Aws auth region must not be empty".into(), + )) + })?; + } + }; + + Ok(()) + } +} + +impl OpenSearchConfig { + pub async fn get_opensearch_client(&self) -> StorageResult { + Ok(OpenSearchClient::create(self) + .await + .map_err(|_| StorageError::InitializationError)?) + } + + pub fn validate(&self) -> Result<(), ApplicationError> { + use common_utils::{ext_traits::ConfigExt, fp_utils::when}; + + when(self.host.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "Opensearch host must not be empty".into(), + )) + })?; + + self.indexes.validate()?; + + self.auth.validate()?; + + Ok(()) + } +} +#[derive(Debug, serde::Deserialize, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum OpenSearchHealthStatus { + Red, + Green, + Yellow, +} + +#[derive(Debug, serde::Deserialize)] +pub struct OpenSearchHealth { + pub status: OpenSearchHealthStatus, +} + +#[derive(Debug, Clone)] +pub enum OpenSearchQuery { + Msearch, + Search(SearchIndex), +} + +#[derive(Debug, Clone)] +pub struct OpenSearchQueryBuilder { + pub query_type: OpenSearchQuery, + pub query: String, + pub offset: Option, + pub count: Option, + pub filters: Vec<(String, String)>, +} + +impl OpenSearchQueryBuilder { + pub fn new(query_type: OpenSearchQuery, query: String) -> Self { + Self { + query_type, + query, + offset: Default::default(), + count: Default::default(), + filters: Default::default(), + } + } + + pub fn set_offset_n_count(&mut self, offset: i64, count: i64) -> QueryResult<()> { + self.offset = Some(offset); + self.count = Some(count); + Ok(()) + } + + pub fn add_filter_clause(&mut self, lhs: String, rhs: String) -> QueryResult<()> { + self.filters.push((lhs, rhs)); + Ok(()) + } + + pub fn construct_payload(&self, indexes: Vec) -> QueryResult> { + let mut query = + vec![json!({"multi_match": {"type": "phrase", "query": self.query, "lenient": true}})]; + + let mut filters = self + .filters + .iter() + .map(|(k, v)| json!({"match_phrase" : {k : v}})) + .collect::>(); + + query.append(&mut filters); + + // TODO add index specific filters + Ok(indexes + .iter() + .map(|_index| json!({"query": {"bool": {"filter": query}}})) + .collect::>()) + } +} diff --git a/crates/analytics/src/search.rs b/crates/analytics/src/search.rs index 4c42e96250..dc802ff694 100644 --- a/crates/analytics/src/search.rs +++ b/crates/analytics/src/search.rs @@ -2,99 +2,33 @@ use api_models::analytics::search::{ GetGlobalSearchRequest, GetSearchRequestWithIndex, GetSearchResponse, OpenMsearchOutput, OpensearchOutput, SearchIndex, }; -use aws_config::{self, meta::region::RegionProviderChain, Region}; -use common_utils::errors::CustomResult; -use opensearch::{ - auth::Credentials, - cert::CertificateValidation, - http::{ - request::JsonBody, - transport::{SingleNodeConnectionPool, TransportBuilder}, - Url, - }, - MsearchParts, OpenSearch, SearchParts, -}; -use serde_json::{json, Value}; +use common_utils::errors::{CustomResult, ReportSwitchExt}; +use error_stack::ResultExt; +use serde_json::Value; use strum::IntoEnumIterator; -use crate::{errors::AnalyticsError, OpensearchAuth, OpensearchConfig, OpensearchIndexes}; - -#[derive(Debug, thiserror::Error)] -pub enum OpensearchError { - #[error("Opensearch connection error")] - ConnectionError, - #[error("Opensearch NON-200 response content: '{0}'")] - ResponseNotOK(String), - #[error("Opensearch response error")] - ResponseError, -} - -pub fn search_index_to_opensearch_index(index: SearchIndex, config: &OpensearchIndexes) -> String { - match index { - SearchIndex::PaymentAttempts => config.payment_attempts.clone(), - SearchIndex::PaymentIntents => config.payment_intents.clone(), - SearchIndex::Refunds => config.refunds.clone(), - SearchIndex::Disputes => config.disputes.clone(), - } -} - -async fn get_opensearch_client(config: OpensearchConfig) -> Result { - let url = Url::parse(&config.host).map_err(|_| OpensearchError::ConnectionError)?; - let transport = match config.auth { - OpensearchAuth::Basic { username, password } => { - let credentials = Credentials::Basic(username, password); - TransportBuilder::new(SingleNodeConnectionPool::new(url)) - .cert_validation(CertificateValidation::None) - .auth(credentials) - .build() - .map_err(|_| OpensearchError::ConnectionError)? - } - OpensearchAuth::Aws { region } => { - let region_provider = RegionProviderChain::first_try(Region::new(region)); - let sdk_config = aws_config::from_env().region(region_provider).load().await; - let conn_pool = SingleNodeConnectionPool::new(url); - TransportBuilder::new(conn_pool) - .auth( - sdk_config - .clone() - .try_into() - .map_err(|_| OpensearchError::ConnectionError)?, - ) - .service_name("es") - .build() - .map_err(|_| OpensearchError::ConnectionError)? - } - }; - Ok(OpenSearch::new(transport)) -} +use crate::opensearch::{ + OpenSearchClient, OpenSearchError, OpenSearchQuery, OpenSearchQueryBuilder, +}; pub async fn msearch_results( + client: &OpenSearchClient, req: GetGlobalSearchRequest, merchant_id: &String, - config: OpensearchConfig, -) -> CustomResult, AnalyticsError> { - let client = get_opensearch_client(config.clone()) +) -> CustomResult, OpenSearchError> { + let mut query_builder = OpenSearchQueryBuilder::new(OpenSearchQuery::Msearch, req.query); + + query_builder + .add_filter_clause("merchant_id".to_string(), merchant_id.to_string()) + .switch()?; + + let response_body = client + .execute(query_builder) .await - .map_err(|_| AnalyticsError::UnknownError)?; - - let mut msearch_vector: Vec> = vec![]; - for index in SearchIndex::iter() { - msearch_vector - .push(json!({"index": search_index_to_opensearch_index(index,&config.indexes)}).into()); - msearch_vector.push(json!({"query": {"bool": {"filter": [{"multi_match": {"type": "phrase", "query": req.query, "lenient": true}},{"match_phrase": {"merchant_id": merchant_id}}]}}}).into()); - } - - let response = client - .msearch(MsearchParts::None) - .body(msearch_vector) - .send() - .await - .map_err(|_| AnalyticsError::UnknownError)?; - - let response_body = response + .change_context(OpenSearchError::ConnectionError)? .json::>() .await - .map_err(|_| AnalyticsError::UnknownError)?; + .change_context(OpenSearchError::ResponseError)?; Ok(response_body .responses @@ -114,29 +48,30 @@ pub async fn msearch_results( } pub async fn search_results( + client: &OpenSearchClient, req: GetSearchRequestWithIndex, merchant_id: &String, - config: OpensearchConfig, -) -> CustomResult { +) -> CustomResult { let search_req = req.search_req; - let client = get_opensearch_client(config.clone()) - .await - .map_err(|_| AnalyticsError::UnknownError)?; + let mut query_builder = + OpenSearchQueryBuilder::new(OpenSearchQuery::Search(req.index), search_req.query); - let response = client - .search(SearchParts::Index(&[&search_index_to_opensearch_index(req.index.clone(),&config.indexes)])) - .from(search_req.offset) - .size(search_req.count) - .body(json!({"query": {"bool": {"filter": [{"multi_match": {"type": "phrase", "query": search_req.query, "lenient": true}},{"match_phrase": {"merchant_id": merchant_id}}]}}})) - .send() - .await - .map_err(|_| AnalyticsError::UnknownError)?; + query_builder + .add_filter_clause("merchant_id".to_string(), merchant_id.to_string()) + .switch()?; - let response_body = response + query_builder + .set_offset_n_count(search_req.offset, search_req.count) + .switch()?; + + let response_body = client + .execute(query_builder) + .await + .change_context(OpenSearchError::ConnectionError)? .json::>() .await - .map_err(|_| AnalyticsError::UnknownError)?; + .change_context(OpenSearchError::ResponseError)?; Ok(GetSearchResponse { count: response_body.hits.total.value, diff --git a/crates/api_models/src/analytics/search.rs b/crates/api_models/src/analytics/search.rs index 3a5b3c307e..6f6a3f2281 100644 --- a/crates/api_models/src/analytics/search.rs +++ b/crates/api_models/src/analytics/search.rs @@ -30,7 +30,7 @@ pub struct GetSearchRequestWithIndex { pub search_req: GetSearchRequest, } -#[derive(Debug, strum::EnumIter, Clone, serde::Deserialize, serde::Serialize)] +#[derive(Debug, strum::EnumIter, Clone, serde::Deserialize, serde::Serialize, Copy)] #[serde(rename_all = "snake_case")] pub enum SearchIndex { PaymentAttempts, diff --git a/crates/api_models/src/health_check.rs b/crates/api_models/src/health_check.rs index 29a59df397..1e86e2964c 100644 --- a/crates/api_models/src/health_check.rs +++ b/crates/api_models/src/health_check.rs @@ -6,6 +6,8 @@ pub struct RouterHealthCheckResponse { pub vault: Option, #[cfg(feature = "olap")] pub analytics: bool, + #[cfg(feature = "olap")] + pub opensearch: bool, pub outgoing_request: bool, } diff --git a/crates/router/src/analytics.rs b/crates/router/src/analytics.rs index 0aec94c205..d509cf03d3 100644 --- a/crates/router/src/analytics.rs +++ b/crates/router/src/analytics.rs @@ -614,9 +614,9 @@ pub mod routes { json_payload.into_inner(), |state, auth: AuthenticationData, req, _| async move { analytics::search::msearch_results( + &state.opensearch_client, req, &auth.merchant_account.merchant_id, - state.conf.opensearch.clone(), ) .await .map(ApplicationResponse::Json) @@ -645,9 +645,9 @@ pub mod routes { indexed_req, |state, auth: AuthenticationData, req, _| async move { analytics::search::search_results( + &state.opensearch_client, req, &auth.merchant_account.merchant_id, - state.conf.opensearch.clone(), ) .await .map(ApplicationResponse::Json) diff --git a/crates/router/src/configs/settings.rs b/crates/router/src/configs/settings.rs index 810ab5b98e..01b05c60bd 100644 --- a/crates/router/src/configs/settings.rs +++ b/crates/router/src/configs/settings.rs @@ -4,7 +4,7 @@ use std::{ }; #[cfg(feature = "olap")] -use analytics::{OpensearchConfig, ReportConfig}; +use analytics::{opensearch::OpenSearchConfig, ReportConfig}; use api_models::{enums, payment_methods::RequiredFieldInfo}; use common_utils::ext_traits::ConfigExt; use config::{Environment, File}; @@ -114,7 +114,7 @@ pub struct Settings { #[cfg(feature = "olap")] pub report_download_config: ReportConfig, #[cfg(feature = "olap")] - pub opensearch: OpensearchConfig, + pub opensearch: OpenSearchConfig, pub events: EventsConfig, #[cfg(feature = "olap")] pub connector_onboarding: SecretStateContainer, @@ -730,6 +730,9 @@ impl Settings { self.lock_settings.validate()?; self.events.validate()?; + #[cfg(feature = "olap")] + self.opensearch.validate()?; + self.encryption_management .validate() .map_err(|err| ApplicationError::InvalidConfigurationValueError(err.into()))?; diff --git a/crates/router/src/core/health_check.rs b/crates/router/src/core/health_check.rs index 9997c6261f..e90fe77e80 100644 --- a/crates/router/src/core/health_check.rs +++ b/crates/router/src/core/health_check.rs @@ -23,6 +23,11 @@ pub trait HealthCheckInterface { #[cfg(feature = "olap")] async fn health_check_analytics(&self) -> CustomResult; + + #[cfg(feature = "olap")] + async fn health_check_opensearch( + &self, + ) -> CustomResult; } #[async_trait::async_trait] @@ -122,6 +127,18 @@ impl HealthCheckInterface for app::AppState { Ok(HealthState::Running) } + #[cfg(feature = "olap")] + async fn health_check_opensearch( + &self, + ) -> CustomResult { + self.opensearch_client + .deep_health_check() + .await + .change_context(errors::HealthCheckDBError::OpensearchError)?; + + Ok(HealthState::Running) + } + async fn health_check_outgoing( &self, ) -> CustomResult { diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index 5b59ebb692..19a632bf89 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -41,6 +41,8 @@ use super::{currency, payment_methods::*}; use super::{ephemeral_key::*, webhooks::*}; #[cfg(feature = "oltp")] use super::{pm_auth, poll::retrieve_poll_status}; +#[cfg(feature = "olap")] +pub use crate::analytics::opensearch::OpenSearchClient; use crate::configs::secrets_transformers; #[cfg(all(feature = "frm", feature = "oltp"))] use crate::routes::fraud_check as frm_routes; @@ -73,6 +75,8 @@ pub struct AppState { pub api_client: Box, #[cfg(feature = "olap")] pub pool: crate::analytics::AnalyticsProvider, + #[cfg(feature = "olap")] + pub opensearch_client: OpenSearchClient, pub request_id: Option, pub file_storage_client: Box, pub encryption_client: Box, @@ -177,6 +181,14 @@ impl AppState { .await .expect("Failed to create event handler"); + #[allow(clippy::expect_used)] + #[cfg(feature = "olap")] + let opensearch_client = conf + .opensearch + .get_opensearch_client() + .await + .expect("Failed to create opensearch client"); + let store: Box = match storage_impl { StorageImpl::Postgresql | StorageImpl::PostgresqlTest => match &event_handler { EventsHandler::Kafka(kafka_client) => Box::new( @@ -223,6 +235,8 @@ impl AppState { event_handler, #[cfg(feature = "olap")] pool, + #[cfg(feature = "olap")] + opensearch_client, request_id: None, file_storage_client, encryption_client, diff --git a/crates/router/src/routes/health.rs b/crates/router/src/routes/health.rs index fbfcd893a6..7d35a91a31 100644 --- a/crates/router/src/routes/health.rs +++ b/crates/router/src/routes/health.rs @@ -74,6 +74,10 @@ async fn deep_health_check_func(state: app::AppState) -> RouterResponse RouterResponse RouterResponse RouterResponse for HealthCheckDBError { diff --git a/docker-compose.yml b/docker-compose.yml index e55008f1e3..040832f8e2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -332,4 +332,35 @@ services: ulimits: nofile: soft: 262144 - hard: 262144 \ No newline at end of file + hard: 262144 + + fluentd: + build: ./docker/fluentd + volumes: + - ./docker/fluentd/conf:/fluentd/etc + networks: + - router_net + + opensearch: + image: public.ecr.aws/opensearchproject/opensearch:1.3.14 + container_name: opensearch + hostname: opensearch + environment: + - "discovery.type=single-node" + expose: + - "9200" + ports: + - "9200:9200" + networks: + - router_net + + opensearch-dashboards: + image: opensearchproject/opensearch-dashboards:1.3.14 + ports: + - 5601:5601 + expose: + - "5601" + environment: + OPENSEARCH_HOSTS: '["https://opensearch:9200"]' + networks: + - router_net \ No newline at end of file diff --git a/docker/fluentd/Dockerfile b/docker/fluentd/Dockerfile new file mode 100644 index 0000000000..5c85269e1a --- /dev/null +++ b/docker/fluentd/Dockerfile @@ -0,0 +1,7 @@ +# docker/fluentd/Dockerfile + +FROM fluent/fluentd:v1.16-debian-2 +USER root +RUN ["gem", "install", "fluent-plugin-kafka", "--no-document"] +RUN ["gem", "install", "fluent-plugin-opensearch", "--no-document"] +USER fluent diff --git a/docker/fluentd/conf/fluent.conf b/docker/fluentd/conf/fluent.conf new file mode 100644 index 0000000000..7aec9dd7cd --- /dev/null +++ b/docker/fluentd/conf/fluent.conf @@ -0,0 +1,137 @@ +# docker/fluentd/conf/fluent.conf + + + @type kafka_group + + brokers kafka0:29092 + consumer_group fluentd + topics hyperswitch-payment-intent-events,hyperswitch-payment-attempt-events,hyperswitch-refund-events,hyperswitch-dispute-events + add_headers false + add_prefix topic + retry_emit_limit 2 + + + + @type record_transformer + renew_time_key created_at + + + + @type record_transformer + renew_time_key created_at + + + + @type record_transformer + renew_time_key created_at + + + + + @type record_transformer + renew_time_key created_at + + + + @type copy + + + @type stdout + + + + @type opensearch + host opensearch + port 9200 + scheme https + index_name hyperswitch-payment-intent-events + id_key payment_id + user admin + password admin + ssl_verify false + prefer_oj_serializer true + reload_on_failure true + reload_connections false + request_timeout 120s + bulk_message_request_threshold 10MB + include_timestamp true + + + + + @type copy + + + @type stdout + + + + @type opensearch + host opensearch + port 9200 + scheme https + index_name hyperswitch-payment-attempt-events + id_key attempt_id + user admin + password admin + ssl_verify false + prefer_oj_serializer true + reload_on_failure true + reload_connections false + request_timeout 120s + bulk_message_request_threshold 10MB + include_timestamp true + + + + + @type copy + + + @type stdout + + + + @type opensearch + host opensearch + port 9200 + scheme https + index_name hyperswitch-refund-events + id_key refund_id + user admin + password admin + ssl_verify false + prefer_oj_serializer true + reload_on_failure true + reload_connections false + request_timeout 120s + bulk_message_request_threshold 10MB + include_timestamp true + + + + + @type copy + + + @type stdout + + + + @type opensearch + host opensearch + port 9200 + scheme https + index_name hyperswitch-dispute-events + id_key dispute_id + user admin + password admin + ssl_verify false + prefer_oj_serializer true + reload_on_failure true + reload_connections false + request_timeout 120s + bulk_message_request_threshold 10MB + include_timestamp true + + \ No newline at end of file