chore(analytics): opensearch client creation based on config (#7810)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Shivansh Mathur
2025-04-17 13:32:22 +05:30
committed by GitHub
parent 5690864fd2
commit 2067bc3520
8 changed files with 61 additions and 23 deletions

View File

@ -887,6 +887,7 @@ region = "kms_region" # The AWS region used by the KMS SDK for decrypting data.
[opensearch] [opensearch]
host = "https://localhost:9200" host = "https://localhost:9200"
enabled = false
[opensearch.auth] [opensearch.auth]
auth = "basic" auth = "basic"

View File

@ -242,6 +242,7 @@ region = "report_download_config_region" # Region of the buc
[opensearch] [opensearch]
host = "https://localhost:9200" host = "https://localhost:9200"
enabled = false
[opensearch.auth] [opensearch.auth]
auth = "basic" auth = "basic"

View File

@ -940,6 +940,7 @@ keys = "accept-language,user-agent,x-profile-id"
[opensearch] [opensearch]
host = "https://localhost:9200" host = "https://localhost:9200"
enabled = false
[opensearch.auth] [opensearch.auth]
auth = "basic" auth = "basic"

View File

@ -763,6 +763,7 @@ keys = "accept-language,user-agent,x-profile-id"
[opensearch] [opensearch]
host = "https://opensearch:9200" host = "https://opensearch:9200"
enabled = false
[opensearch.auth] [opensearch.auth]
auth = "basic" auth = "basic"

View File

@ -71,6 +71,8 @@ pub struct OpenSearchConfig {
host: String, host: String,
auth: OpenSearchAuth, auth: OpenSearchAuth,
indexes: OpenSearchIndexes, indexes: OpenSearchIndexes,
#[serde(default)]
enabled: bool,
} }
impl Default for OpenSearchConfig { impl Default for OpenSearchConfig {
@ -91,12 +93,15 @@ impl Default for OpenSearchConfig {
sessionizer_refunds: "sessionizer-refund-events".to_string(), sessionizer_refunds: "sessionizer-refund-events".to_string(),
sessionizer_disputes: "sessionizer-dispute-events".to_string(), sessionizer_disputes: "sessionizer-dispute-events".to_string(),
}, },
enabled: false,
} }
} }
} }
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum OpenSearchError { pub enum OpenSearchError {
#[error("Opensearch is not enabled")]
NotEnabled,
#[error("Opensearch connection error")] #[error("Opensearch connection error")]
ConnectionError, ConnectionError,
#[error("Opensearch NON-200 response content: '{0}'")] #[error("Opensearch NON-200 response content: '{0}'")]
@ -176,6 +181,12 @@ impl ErrorSwitch<ApiErrorResponse> for OpenSearchError {
"Access Forbidden error", "Access Forbidden error",
None, None,
)), )),
Self::NotEnabled => ApiErrorResponse::InternalServerError(ApiError::new(
"IR",
8,
"Opensearch is not enabled",
None,
)),
} }
} }
} }
@ -408,15 +419,24 @@ impl OpenSearchAuth {
} }
impl OpenSearchConfig { impl OpenSearchConfig {
pub async fn get_opensearch_client(&self) -> StorageResult<OpenSearchClient> { pub async fn get_opensearch_client(&self) -> StorageResult<Option<OpenSearchClient>> {
Ok(OpenSearchClient::create(self) if !self.enabled {
.await return Ok(None);
.map_err(|_| StorageError::InitializationError)?) }
Ok(Some(
OpenSearchClient::create(self)
.await
.change_context(StorageError::InitializationError)?,
))
} }
pub fn validate(&self) -> Result<(), ApplicationError> { pub fn validate(&self) -> Result<(), ApplicationError> {
use common_utils::{ext_traits::ConfigExt, fp_utils::when}; use common_utils::{ext_traits::ConfigExt, fp_utils::when};
if !self.enabled {
return Ok(());
}
when(self.host.is_default_or_empty(), || { when(self.host.is_default_or_empty(), || {
Err(ApplicationError::InvalidConfigurationValueError( Err(ApplicationError::InvalidConfigurationValueError(
"Opensearch host must not be empty".into(), "Opensearch host must not be empty".into(),
@ -430,6 +450,7 @@ impl OpenSearchConfig {
Ok(()) Ok(())
} }
} }
#[derive(Debug, serde::Deserialize, PartialEq)] #[derive(Debug, serde::Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
pub enum OpenSearchHealthStatus { pub enum OpenSearchHealthStatus {

View File

@ -2245,7 +2245,10 @@ pub mod routes {
.collect(); .collect();
analytics::search::msearch_results( analytics::search::msearch_results(
&state.opensearch_client, state
.opensearch_client
.as_ref()
.ok_or_else(|| error_stack::report!(OpenSearchError::NotEnabled))?,
req, req,
search_params, search_params,
SEARCH_INDEXES.to_vec(), SEARCH_INDEXES.to_vec(),
@ -2392,9 +2395,16 @@ pub mod routes {
}) })
}) })
.collect(); .collect();
analytics::search::search_results(&state.opensearch_client, req, search_params) analytics::search::search_results(
.await state
.map(ApplicationResponse::Json) .opensearch_client
.as_ref()
.ok_or_else(|| error_stack::report!(OpenSearchError::NotEnabled))?,
req,
search_params,
)
.await
.map(ApplicationResponse::Json)
}, },
&auth::JWTAuth { &auth::JWTAuth {
permission: Permission::ProfileAnalyticsRead, permission: Permission::ProfileAnalyticsRead,

View File

@ -138,12 +138,15 @@ impl HealthCheckInterface for app::SessionState {
async fn health_check_opensearch( async fn health_check_opensearch(
&self, &self,
) -> CustomResult<HealthState, errors::HealthCheckDBError> { ) -> CustomResult<HealthState, errors::HealthCheckDBError> {
self.opensearch_client if let Some(client) = self.opensearch_client.as_ref() {
.deep_health_check() client
.await .deep_health_check()
.change_context(errors::HealthCheckDBError::OpensearchError)?; .await
.change_context(errors::HealthCheckDBError::OpensearchError)?;
Ok(HealthState::Running) Ok(HealthState::Running)
} else {
Ok(HealthState::NotApplicable)
}
} }
async fn health_check_outgoing( async fn health_check_outgoing(

View File

@ -118,7 +118,7 @@ pub struct SessionState {
pub base_url: String, pub base_url: String,
pub tenant: Tenant, pub tenant: Tenant,
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
pub opensearch_client: Arc<OpenSearchClient>, pub opensearch_client: Option<Arc<OpenSearchClient>>,
pub grpc_client: Arc<GrpcClients>, pub grpc_client: Arc<GrpcClients>,
pub theme_storage_client: Arc<dyn FileStorageInterface>, pub theme_storage_client: Arc<dyn FileStorageInterface>,
pub locale: String, pub locale: String,
@ -223,7 +223,7 @@ pub struct AppState {
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
pub pools: HashMap<id_type::TenantId, AnalyticsProvider>, pub pools: HashMap<id_type::TenantId, AnalyticsProvider>,
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
pub opensearch_client: Arc<OpenSearchClient>, pub opensearch_client: Option<Arc<OpenSearchClient>>,
pub request_id: Option<RequestId>, pub request_id: Option<RequestId>,
pub file_storage_client: Arc<dyn FileStorageInterface>, pub file_storage_client: Arc<dyn FileStorageInterface>,
pub encryption_client: Arc<dyn EncryptionManagementInterface>, pub encryption_client: Arc<dyn EncryptionManagementInterface>,
@ -342,12 +342,12 @@ impl AppState {
#[allow(clippy::expect_used)] #[allow(clippy::expect_used)]
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
let opensearch_client = Arc::new( let opensearch_client = conf
conf.opensearch .opensearch
.get_opensearch_client() .get_opensearch_client()
.await .await
.expect("Failed to create opensearch client"), .expect("Failed to initialize OpenSearch client.")
); .map(Arc::new);
#[allow(clippy::expect_used)] #[allow(clippy::expect_used)]
let cache_store = get_cache_store(&conf.clone(), shut_down_signal, testable) let cache_store = get_cache_store(&conf.clone(), shut_down_signal, testable)
@ -501,7 +501,7 @@ impl AppState {
#[cfg(feature = "email")] #[cfg(feature = "email")]
email_client: Arc::clone(&self.email_client), email_client: Arc::clone(&self.email_client),
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
opensearch_client: Arc::clone(&self.opensearch_client), opensearch_client: self.opensearch_client.clone(),
grpc_client: Arc::clone(&self.grpc_client), grpc_client: Arc::clone(&self.grpc_client),
theme_storage_client: self.theme_storage_client.clone(), theme_storage_client: self.theme_storage_client.clone(),
locale: locale.unwrap_or(common_utils::consts::DEFAULT_LOCALE.to_string()), locale: locale.unwrap_or(common_utils::consts::DEFAULT_LOCALE.to_string()),