From 2067bc352044f00f2c724b4a97f1de0cf599f337 Mon Sep 17 00:00:00 2001 From: Shivansh Mathur <104988143+ShivanshMathurJuspay@users.noreply.github.com> Date: Thu, 17 Apr 2025 13:32:22 +0530 Subject: [PATCH] chore(analytics): opensearch client creation based on config (#7810) Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> --- config/config.example.toml | 1 + config/deployments/env_specific.toml | 1 + config/development.toml | 1 + config/docker_compose.toml | 1 + crates/analytics/src/opensearch.rs | 29 ++++++++++++++++++++++---- crates/router/src/analytics.rs | 18 ++++++++++++---- crates/router/src/core/health_check.rs | 15 +++++++------ crates/router/src/routes/app.rs | 18 ++++++++-------- 8 files changed, 61 insertions(+), 23 deletions(-) diff --git a/config/config.example.toml b/config/config.example.toml index 51efc6c708..9afb30076b 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -887,6 +887,7 @@ region = "kms_region" # The AWS region used by the KMS SDK for decrypting data. [opensearch] host = "https://localhost:9200" +enabled = false [opensearch.auth] auth = "basic" diff --git a/config/deployments/env_specific.toml b/config/deployments/env_specific.toml index b2ee7f3974..5621a4940e 100644 --- a/config/deployments/env_specific.toml +++ b/config/deployments/env_specific.toml @@ -242,6 +242,7 @@ region = "report_download_config_region" # Region of the buc [opensearch] host = "https://localhost:9200" +enabled = false [opensearch.auth] auth = "basic" diff --git a/config/development.toml b/config/development.toml index 3a041db357..a332e2a1c7 100644 --- a/config/development.toml +++ b/config/development.toml @@ -940,6 +940,7 @@ keys = "accept-language,user-agent,x-profile-id" [opensearch] host = "https://localhost:9200" +enabled = false [opensearch.auth] auth = "basic" diff --git a/config/docker_compose.toml b/config/docker_compose.toml index 55cf5166b9..7842a1f468 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -763,6 +763,7 @@ keys = "accept-language,user-agent,x-profile-id" [opensearch] host = "https://opensearch:9200" +enabled = false [opensearch.auth] auth = "basic" diff --git a/crates/analytics/src/opensearch.rs b/crates/analytics/src/opensearch.rs index 726e38e5e1..20be0116d9 100644 --- a/crates/analytics/src/opensearch.rs +++ b/crates/analytics/src/opensearch.rs @@ -71,6 +71,8 @@ pub struct OpenSearchConfig { host: String, auth: OpenSearchAuth, indexes: OpenSearchIndexes, + #[serde(default)] + enabled: bool, } impl Default for OpenSearchConfig { @@ -91,12 +93,15 @@ impl Default for OpenSearchConfig { sessionizer_refunds: "sessionizer-refund-events".to_string(), sessionizer_disputes: "sessionizer-dispute-events".to_string(), }, + enabled: false, } } } #[derive(Debug, thiserror::Error)] pub enum OpenSearchError { + #[error("Opensearch is not enabled")] + NotEnabled, #[error("Opensearch connection error")] ConnectionError, #[error("Opensearch NON-200 response content: '{0}'")] @@ -176,6 +181,12 @@ impl ErrorSwitch for OpenSearchError { "Access Forbidden error", None, )), + Self::NotEnabled => ApiErrorResponse::InternalServerError(ApiError::new( + "IR", + 8, + "Opensearch is not enabled", + None, + )), } } } @@ -408,15 +419,24 @@ impl OpenSearchAuth { } impl OpenSearchConfig { - pub async fn get_opensearch_client(&self) -> StorageResult { - Ok(OpenSearchClient::create(self) - .await - .map_err(|_| StorageError::InitializationError)?) + pub async fn get_opensearch_client(&self) -> StorageResult> { + if !self.enabled { + return Ok(None); + } + Ok(Some( + OpenSearchClient::create(self) + .await + .change_context(StorageError::InitializationError)?, + )) } pub fn validate(&self) -> Result<(), ApplicationError> { use common_utils::{ext_traits::ConfigExt, fp_utils::when}; + if !self.enabled { + return Ok(()); + } + when(self.host.is_default_or_empty(), || { Err(ApplicationError::InvalidConfigurationValueError( "Opensearch host must not be empty".into(), @@ -430,6 +450,7 @@ impl OpenSearchConfig { Ok(()) } } + #[derive(Debug, serde::Deserialize, PartialEq)] #[serde(rename_all = "lowercase")] pub enum OpenSearchHealthStatus { diff --git a/crates/router/src/analytics.rs b/crates/router/src/analytics.rs index cc242f86a5..ca6b870f88 100644 --- a/crates/router/src/analytics.rs +++ b/crates/router/src/analytics.rs @@ -2245,7 +2245,10 @@ pub mod routes { .collect(); analytics::search::msearch_results( - &state.opensearch_client, + state + .opensearch_client + .as_ref() + .ok_or_else(|| error_stack::report!(OpenSearchError::NotEnabled))?, req, search_params, SEARCH_INDEXES.to_vec(), @@ -2392,9 +2395,16 @@ pub mod routes { }) }) .collect(); - analytics::search::search_results(&state.opensearch_client, req, search_params) - .await - .map(ApplicationResponse::Json) + analytics::search::search_results( + state + .opensearch_client + .as_ref() + .ok_or_else(|| error_stack::report!(OpenSearchError::NotEnabled))?, + req, + search_params, + ) + .await + .map(ApplicationResponse::Json) }, &auth::JWTAuth { permission: Permission::ProfileAnalyticsRead, diff --git a/crates/router/src/core/health_check.rs b/crates/router/src/core/health_check.rs index 2e5c295601..c4892c94d6 100644 --- a/crates/router/src/core/health_check.rs +++ b/crates/router/src/core/health_check.rs @@ -138,12 +138,15 @@ impl HealthCheckInterface for app::SessionState { async fn health_check_opensearch( &self, ) -> CustomResult { - self.opensearch_client - .deep_health_check() - .await - .change_context(errors::HealthCheckDBError::OpensearchError)?; - - Ok(HealthState::Running) + if let Some(client) = self.opensearch_client.as_ref() { + client + .deep_health_check() + .await + .change_context(errors::HealthCheckDBError::OpensearchError)?; + Ok(HealthState::Running) + } else { + Ok(HealthState::NotApplicable) + } } async fn health_check_outgoing( diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index 321d1e04dd..d2d3179091 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -118,7 +118,7 @@ pub struct SessionState { pub base_url: String, pub tenant: Tenant, #[cfg(feature = "olap")] - pub opensearch_client: Arc, + pub opensearch_client: Option>, pub grpc_client: Arc, pub theme_storage_client: Arc, pub locale: String, @@ -223,7 +223,7 @@ pub struct AppState { #[cfg(feature = "olap")] pub pools: HashMap, #[cfg(feature = "olap")] - pub opensearch_client: Arc, + pub opensearch_client: Option>, pub request_id: Option, pub file_storage_client: Arc, pub encryption_client: Arc, @@ -342,12 +342,12 @@ impl AppState { #[allow(clippy::expect_used)] #[cfg(feature = "olap")] - let opensearch_client = Arc::new( - conf.opensearch - .get_opensearch_client() - .await - .expect("Failed to create opensearch client"), - ); + let opensearch_client = conf + .opensearch + .get_opensearch_client() + .await + .expect("Failed to initialize OpenSearch client.") + .map(Arc::new); #[allow(clippy::expect_used)] let cache_store = get_cache_store(&conf.clone(), shut_down_signal, testable) @@ -501,7 +501,7 @@ impl AppState { #[cfg(feature = "email")] email_client: Arc::clone(&self.email_client), #[cfg(feature = "olap")] - opensearch_client: Arc::clone(&self.opensearch_client), + opensearch_client: self.opensearch_client.clone(), grpc_client: Arc::clone(&self.grpc_client), theme_storage_client: self.theme_storage_client.clone(), locale: locale.unwrap_or(common_utils::consts::DEFAULT_LOCALE.to_string()),