feat(opensearch): refactoring (#4244)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: Sanchith Hegde <22217505+SanchithHegde@users.noreply.github.com>
This commit is contained in:
ivor-juspay
2024-05-02 15:50:58 +05:30
committed by GitHub
parent e4ed1e6395
commit 22cb01ac1e
16 changed files with 693 additions and 151 deletions

1
Cargo.lock generated
View File

@ -337,6 +337,7 @@ dependencies = [
"aws-smithy-types 1.1.8",
"bigdecimal",
"common_utils",
"data_models",
"diesel_models",
"error-stack",
"external_services",

View File

@ -11,14 +11,22 @@ license.workspace = true
[dependencies]
# First party crates
api_models = { version = "0.1.0", path = "../api_models", features = ["errors"] }
api_models = { version = "0.1.0", path = "../api_models", features = [
"errors",
] }
storage_impl = { version = "0.1.0", path = "../storage_impl", default-features = false }
common_utils = { version = "0.1.0", path = "../common_utils" }
external_services = { version = "0.1.0", path = "../external_services", default-features = false }
hyperswitch_interfaces = { version = "0.1.0", path = "../hyperswitch_interfaces" }
masking = { version = "0.1.0", path = "../masking" }
router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] }
diesel_models = { version = "0.1.0", path = "../diesel_models", features = ["kv_store"] }
router_env = { version = "0.1.0", path = "../router_env", features = [
"log_extra_implicit_fields",
"log_custom_entries_to_extra",
] }
diesel_models = { version = "0.1.0", path = "../diesel_models", features = [
"kv_store",
] }
data_models = { version = "0.1.0", path = "../data_models", default-features = false }
#Third Party dependencies
actix-web = "4.5.1"
@ -34,7 +42,13 @@ once_cell = "1.19.0"
reqwest = { version = "0.11.27", features = ["serde_json"] }
serde = { version = "1.0.197", features = ["derive", "rc"] }
serde_json = "1.0.115"
sqlx = { version = "0.7.3", features = ["postgres", "runtime-tokio", "runtime-tokio-native-tls", "time", "bigdecimal"] }
sqlx = { version = "0.7.3", features = [
"postgres",
"runtime-tokio",
"runtime-tokio-native-tls",
"time",
"bigdecimal",
] }
strum = { version = "0.26.2", features = ["derive"] }
thiserror = "1.0.58"
time = { version = "0.3.35", features = ["serde", "serde-well-known", "std"] }

View File

@ -10,6 +10,7 @@ pub mod refunds;
pub mod api_event;
pub mod connector_events;
pub mod health_check;
pub mod opensearch;
pub mod outgoing_webhook_event;
pub mod sdk_events;
pub mod search;
@ -668,47 +669,6 @@ pub struct ReportConfig {
pub region: String,
}
#[derive(Clone, Debug, serde::Deserialize)]
#[serde(tag = "auth")]
#[serde(rename_all = "lowercase")]
pub enum OpensearchAuth {
Basic { username: String, password: String },
Aws { region: String },
}
#[derive(Clone, Debug, serde::Deserialize)]
pub struct OpensearchIndexes {
pub payment_attempts: String,
pub payment_intents: String,
pub refunds: String,
pub disputes: String,
}
#[derive(Clone, Debug, serde::Deserialize)]
pub struct OpensearchConfig {
host: String,
auth: OpensearchAuth,
indexes: OpensearchIndexes,
}
impl Default for OpensearchConfig {
fn default() -> Self {
Self {
host: "https://localhost:9200".to_string(),
auth: OpensearchAuth::Basic {
username: "admin".to_string(),
password: "admin".to_string(),
},
indexes: OpensearchIndexes {
payment_attempts: "hyperswitch-payment-attempt-events".to_string(),
payment_intents: "hyperswitch-payment-intent-events".to_string(),
refunds: "hyperswitch-refund-events".to_string(),
disputes: "hyperswitch-dispute-events".to_string(),
},
}
}
}
/// Analytics Flow routes Enums
/// Info - Dimensions and filters available for the domain
/// Filters - Set of values present for the dimension

View File

@ -0,0 +1,397 @@
use api_models::{
analytics::search::SearchIndex,
errors::types::{ApiError, ApiErrorResponse},
};
use aws_config::{self, meta::region::RegionProviderChain, Region};
use common_utils::errors::{CustomResult, ErrorSwitch};
use data_models::errors::{StorageError, StorageResult};
use error_stack::ResultExt;
use opensearch::{
auth::Credentials,
cert::CertificateValidation,
cluster::{Cluster, ClusterHealthParts},
http::{
request::JsonBody,
response::Response,
transport::{SingleNodeConnectionPool, Transport, TransportBuilder},
Url,
},
MsearchParts, OpenSearch, SearchParts,
};
use serde_json::{json, Value};
use storage_impl::errors::ApplicationError;
use strum::IntoEnumIterator;
use super::{health_check::HealthCheck, query::QueryResult, types::QueryExecutionError};
use crate::query::QueryBuildingError;
#[derive(Clone, Debug, serde::Deserialize)]
#[serde(tag = "auth")]
#[serde(rename_all = "lowercase")]
pub enum OpenSearchAuth {
Basic { username: String, password: String },
Aws { region: String },
}
#[derive(Clone, Debug, serde::Deserialize)]
pub struct OpenSearchIndexes {
pub payment_attempts: String,
pub payment_intents: String,
pub refunds: String,
pub disputes: String,
}
#[derive(Clone, Debug, serde::Deserialize)]
pub struct OpenSearchConfig {
host: String,
auth: OpenSearchAuth,
indexes: OpenSearchIndexes,
}
impl Default for OpenSearchConfig {
fn default() -> Self {
Self {
host: "https://localhost:9200".to_string(),
auth: OpenSearchAuth::Basic {
username: "admin".to_string(),
password: "admin".to_string(),
},
indexes: OpenSearchIndexes {
payment_attempts: "hyperswitch-payment-attempt-events".to_string(),
payment_intents: "hyperswitch-payment-intent-events".to_string(),
refunds: "hyperswitch-refund-events".to_string(),
disputes: "hyperswitch-dispute-events".to_string(),
},
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum OpenSearchError {
#[error("Opensearch connection error")]
ConnectionError,
#[error("Opensearch NON-200 response content: '{0}'")]
ResponseNotOK(String),
#[error("Opensearch response error")]
ResponseError,
#[error("Opensearch query building error")]
QueryBuildingError,
}
impl ErrorSwitch<OpenSearchError> for QueryBuildingError {
fn switch(&self) -> OpenSearchError {
OpenSearchError::QueryBuildingError
}
}
impl ErrorSwitch<ApiErrorResponse> for OpenSearchError {
fn switch(&self) -> ApiErrorResponse {
match self {
Self::ConnectionError => ApiErrorResponse::InternalServerError(ApiError::new(
"IR",
0,
"Connection error",
None,
)),
Self::ResponseNotOK(response) => ApiErrorResponse::InternalServerError(ApiError::new(
"IR",
0,
format!("Something went wrong {}", response),
None,
)),
Self::ResponseError => ApiErrorResponse::InternalServerError(ApiError::new(
"IR",
0,
"Something went wrong",
None,
)),
Self::QueryBuildingError => ApiErrorResponse::InternalServerError(ApiError::new(
"IR",
0,
"Query building error",
None,
)),
}
}
}
#[derive(Clone, Debug)]
pub struct OpenSearchClient {
pub client: OpenSearch,
pub transport: Transport,
pub indexes: OpenSearchIndexes,
}
impl OpenSearchClient {
pub async fn create(conf: &OpenSearchConfig) -> CustomResult<Self, OpenSearchError> {
let url = Url::parse(&conf.host).map_err(|_| OpenSearchError::ConnectionError)?;
let transport = match &conf.auth {
OpenSearchAuth::Basic { username, password } => {
let credentials = Credentials::Basic(username.clone(), password.clone());
TransportBuilder::new(SingleNodeConnectionPool::new(url))
.cert_validation(CertificateValidation::None)
.auth(credentials)
.build()
.map_err(|_| OpenSearchError::ConnectionError)?
}
OpenSearchAuth::Aws { region } => {
let region_provider = RegionProviderChain::first_try(Region::new(region.clone()));
let sdk_config = aws_config::from_env().region(region_provider).load().await;
let conn_pool = SingleNodeConnectionPool::new(url);
TransportBuilder::new(conn_pool)
.auth(
sdk_config
.clone()
.try_into()
.map_err(|_| OpenSearchError::ConnectionError)?,
)
.service_name("es")
.build()
.map_err(|_| OpenSearchError::ConnectionError)?
}
};
Ok(Self {
transport: transport.clone(),
client: OpenSearch::new(transport),
indexes: conf.indexes.clone(),
})
}
pub fn search_index_to_opensearch_index(&self, index: SearchIndex) -> String {
match index {
SearchIndex::PaymentAttempts => self.indexes.payment_attempts.clone(),
SearchIndex::PaymentIntents => self.indexes.payment_intents.clone(),
SearchIndex::Refunds => self.indexes.refunds.clone(),
SearchIndex::Disputes => self.indexes.disputes.clone(),
}
}
pub async fn execute(
&self,
query_builder: OpenSearchQueryBuilder,
) -> CustomResult<Response, OpenSearchError> {
match query_builder.query_type {
OpenSearchQuery::Msearch => {
let search_indexes = SearchIndex::iter();
let payload = query_builder
.construct_payload(search_indexes.clone().collect())
.change_context(OpenSearchError::QueryBuildingError)?;
let payload_with_indexes = payload.into_iter().zip(search_indexes).fold(
Vec::new(),
|mut payload_with_indexes, (index_hit, index)| {
payload_with_indexes.push(
json!({"index": self.search_index_to_opensearch_index(index)}).into(),
);
payload_with_indexes.push(JsonBody::new(index_hit.clone()));
payload_with_indexes
},
);
self.client
.msearch(MsearchParts::None)
.body(payload_with_indexes)
.send()
.await
.change_context(OpenSearchError::ResponseError)
}
OpenSearchQuery::Search(index) => {
let payload = query_builder
.clone()
.construct_payload(vec![index])
.change_context(OpenSearchError::QueryBuildingError)?;
let final_payload = payload.first().unwrap_or(&Value::Null);
self.client
.search(SearchParts::Index(&[
&self.search_index_to_opensearch_index(index)
]))
.from(query_builder.offset.unwrap_or(0))
.size(query_builder.count.unwrap_or(10))
.body(final_payload)
.send()
.await
.change_context(OpenSearchError::ResponseError)
}
}
}
}
#[async_trait::async_trait]
impl HealthCheck for OpenSearchClient {
async fn deep_health_check(&self) -> CustomResult<(), QueryExecutionError> {
let health = Cluster::new(&self.transport)
.health(ClusterHealthParts::None)
.send()
.await
.change_context(QueryExecutionError::DatabaseError)?
.json::<OpenSearchHealth>()
.await
.change_context(QueryExecutionError::DatabaseError)?;
if health.status != OpenSearchHealthStatus::Red {
Ok(())
} else {
Err(QueryExecutionError::DatabaseError.into())
}
}
}
impl OpenSearchIndexes {
pub fn validate(&self) -> Result<(), ApplicationError> {
use common_utils::{ext_traits::ConfigExt, fp_utils::when};
when(self.payment_attempts.is_default_or_empty(), || {
Err(ApplicationError::InvalidConfigurationValueError(
"Opensearch Payment Attempts index must not be empty".into(),
))
})?;
when(self.payment_intents.is_default_or_empty(), || {
Err(ApplicationError::InvalidConfigurationValueError(
"Opensearch Payment Intents index must not be empty".into(),
))
})?;
when(self.refunds.is_default_or_empty(), || {
Err(ApplicationError::InvalidConfigurationValueError(
"Opensearch Refunds index must not be empty".into(),
))
})?;
when(self.disputes.is_default_or_empty(), || {
Err(ApplicationError::InvalidConfigurationValueError(
"Opensearch Disputes index must not be empty".into(),
))
})?;
Ok(())
}
}
impl OpenSearchAuth {
pub fn validate(&self) -> Result<(), ApplicationError> {
use common_utils::{ext_traits::ConfigExt, fp_utils::when};
match self {
Self::Basic { username, password } => {
when(username.is_default_or_empty(), || {
Err(ApplicationError::InvalidConfigurationValueError(
"Opensearch Basic auth username must not be empty".into(),
))
})?;
when(password.is_default_or_empty(), || {
Err(ApplicationError::InvalidConfigurationValueError(
"Opensearch Basic auth password must not be empty".into(),
))
})?;
}
Self::Aws { region } => {
when(region.is_default_or_empty(), || {
Err(ApplicationError::InvalidConfigurationValueError(
"Opensearch Aws auth region must not be empty".into(),
))
})?;
}
};
Ok(())
}
}
impl OpenSearchConfig {
pub async fn get_opensearch_client(&self) -> StorageResult<OpenSearchClient> {
Ok(OpenSearchClient::create(self)
.await
.map_err(|_| StorageError::InitializationError)?)
}
pub fn validate(&self) -> Result<(), ApplicationError> {
use common_utils::{ext_traits::ConfigExt, fp_utils::when};
when(self.host.is_default_or_empty(), || {
Err(ApplicationError::InvalidConfigurationValueError(
"Opensearch host must not be empty".into(),
))
})?;
self.indexes.validate()?;
self.auth.validate()?;
Ok(())
}
}
#[derive(Debug, serde::Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum OpenSearchHealthStatus {
Red,
Green,
Yellow,
}
#[derive(Debug, serde::Deserialize)]
pub struct OpenSearchHealth {
pub status: OpenSearchHealthStatus,
}
#[derive(Debug, Clone)]
pub enum OpenSearchQuery {
Msearch,
Search(SearchIndex),
}
#[derive(Debug, Clone)]
pub struct OpenSearchQueryBuilder {
pub query_type: OpenSearchQuery,
pub query: String,
pub offset: Option<i64>,
pub count: Option<i64>,
pub filters: Vec<(String, String)>,
}
impl OpenSearchQueryBuilder {
pub fn new(query_type: OpenSearchQuery, query: String) -> Self {
Self {
query_type,
query,
offset: Default::default(),
count: Default::default(),
filters: Default::default(),
}
}
pub fn set_offset_n_count(&mut self, offset: i64, count: i64) -> QueryResult<()> {
self.offset = Some(offset);
self.count = Some(count);
Ok(())
}
pub fn add_filter_clause(&mut self, lhs: String, rhs: String) -> QueryResult<()> {
self.filters.push((lhs, rhs));
Ok(())
}
pub fn construct_payload(&self, indexes: Vec<SearchIndex>) -> QueryResult<Vec<Value>> {
let mut query =
vec![json!({"multi_match": {"type": "phrase", "query": self.query, "lenient": true}})];
let mut filters = self
.filters
.iter()
.map(|(k, v)| json!({"match_phrase" : {k : v}}))
.collect::<Vec<Value>>();
query.append(&mut filters);
// TODO add index specific filters
Ok(indexes
.iter()
.map(|_index| json!({"query": {"bool": {"filter": query}}}))
.collect::<Vec<Value>>())
}
}

View File

@ -2,99 +2,33 @@ use api_models::analytics::search::{
GetGlobalSearchRequest, GetSearchRequestWithIndex, GetSearchResponse, OpenMsearchOutput,
OpensearchOutput, SearchIndex,
};
use aws_config::{self, meta::region::RegionProviderChain, Region};
use common_utils::errors::CustomResult;
use opensearch::{
auth::Credentials,
cert::CertificateValidation,
http::{
request::JsonBody,
transport::{SingleNodeConnectionPool, TransportBuilder},
Url,
},
MsearchParts, OpenSearch, SearchParts,
};
use serde_json::{json, Value};
use common_utils::errors::{CustomResult, ReportSwitchExt};
use error_stack::ResultExt;
use serde_json::Value;
use strum::IntoEnumIterator;
use crate::{errors::AnalyticsError, OpensearchAuth, OpensearchConfig, OpensearchIndexes};
#[derive(Debug, thiserror::Error)]
pub enum OpensearchError {
#[error("Opensearch connection error")]
ConnectionError,
#[error("Opensearch NON-200 response content: '{0}'")]
ResponseNotOK(String),
#[error("Opensearch response error")]
ResponseError,
}
pub fn search_index_to_opensearch_index(index: SearchIndex, config: &OpensearchIndexes) -> String {
match index {
SearchIndex::PaymentAttempts => config.payment_attempts.clone(),
SearchIndex::PaymentIntents => config.payment_intents.clone(),
SearchIndex::Refunds => config.refunds.clone(),
SearchIndex::Disputes => config.disputes.clone(),
}
}
async fn get_opensearch_client(config: OpensearchConfig) -> Result<OpenSearch, OpensearchError> {
let url = Url::parse(&config.host).map_err(|_| OpensearchError::ConnectionError)?;
let transport = match config.auth {
OpensearchAuth::Basic { username, password } => {
let credentials = Credentials::Basic(username, password);
TransportBuilder::new(SingleNodeConnectionPool::new(url))
.cert_validation(CertificateValidation::None)
.auth(credentials)
.build()
.map_err(|_| OpensearchError::ConnectionError)?
}
OpensearchAuth::Aws { region } => {
let region_provider = RegionProviderChain::first_try(Region::new(region));
let sdk_config = aws_config::from_env().region(region_provider).load().await;
let conn_pool = SingleNodeConnectionPool::new(url);
TransportBuilder::new(conn_pool)
.auth(
sdk_config
.clone()
.try_into()
.map_err(|_| OpensearchError::ConnectionError)?,
)
.service_name("es")
.build()
.map_err(|_| OpensearchError::ConnectionError)?
}
use crate::opensearch::{
OpenSearchClient, OpenSearchError, OpenSearchQuery, OpenSearchQueryBuilder,
};
Ok(OpenSearch::new(transport))
}
pub async fn msearch_results(
client: &OpenSearchClient,
req: GetGlobalSearchRequest,
merchant_id: &String,
config: OpensearchConfig,
) -> CustomResult<Vec<GetSearchResponse>, AnalyticsError> {
let client = get_opensearch_client(config.clone())
) -> CustomResult<Vec<GetSearchResponse>, OpenSearchError> {
let mut query_builder = OpenSearchQueryBuilder::new(OpenSearchQuery::Msearch, req.query);
query_builder
.add_filter_clause("merchant_id".to_string(), merchant_id.to_string())
.switch()?;
let response_body = client
.execute(query_builder)
.await
.map_err(|_| AnalyticsError::UnknownError)?;
let mut msearch_vector: Vec<JsonBody<Value>> = vec![];
for index in SearchIndex::iter() {
msearch_vector
.push(json!({"index": search_index_to_opensearch_index(index,&config.indexes)}).into());
msearch_vector.push(json!({"query": {"bool": {"filter": [{"multi_match": {"type": "phrase", "query": req.query, "lenient": true}},{"match_phrase": {"merchant_id": merchant_id}}]}}}).into());
}
let response = client
.msearch(MsearchParts::None)
.body(msearch_vector)
.send()
.await
.map_err(|_| AnalyticsError::UnknownError)?;
let response_body = response
.change_context(OpenSearchError::ConnectionError)?
.json::<OpenMsearchOutput<Value>>()
.await
.map_err(|_| AnalyticsError::UnknownError)?;
.change_context(OpenSearchError::ResponseError)?;
Ok(response_body
.responses
@ -114,29 +48,30 @@ pub async fn msearch_results(
}
pub async fn search_results(
client: &OpenSearchClient,
req: GetSearchRequestWithIndex,
merchant_id: &String,
config: OpensearchConfig,
) -> CustomResult<GetSearchResponse, AnalyticsError> {
) -> CustomResult<GetSearchResponse, OpenSearchError> {
let search_req = req.search_req;
let client = get_opensearch_client(config.clone())
.await
.map_err(|_| AnalyticsError::UnknownError)?;
let mut query_builder =
OpenSearchQueryBuilder::new(OpenSearchQuery::Search(req.index), search_req.query);
let response = client
.search(SearchParts::Index(&[&search_index_to_opensearch_index(req.index.clone(),&config.indexes)]))
.from(search_req.offset)
.size(search_req.count)
.body(json!({"query": {"bool": {"filter": [{"multi_match": {"type": "phrase", "query": search_req.query, "lenient": true}},{"match_phrase": {"merchant_id": merchant_id}}]}}}))
.send()
.await
.map_err(|_| AnalyticsError::UnknownError)?;
query_builder
.add_filter_clause("merchant_id".to_string(), merchant_id.to_string())
.switch()?;
let response_body = response
query_builder
.set_offset_n_count(search_req.offset, search_req.count)
.switch()?;
let response_body = client
.execute(query_builder)
.await
.change_context(OpenSearchError::ConnectionError)?
.json::<OpensearchOutput<Value>>()
.await
.map_err(|_| AnalyticsError::UnknownError)?;
.change_context(OpenSearchError::ResponseError)?;
Ok(GetSearchResponse {
count: response_body.hits.total.value,

View File

@ -30,7 +30,7 @@ pub struct GetSearchRequestWithIndex {
pub search_req: GetSearchRequest,
}
#[derive(Debug, strum::EnumIter, Clone, serde::Deserialize, serde::Serialize)]
#[derive(Debug, strum::EnumIter, Clone, serde::Deserialize, serde::Serialize, Copy)]
#[serde(rename_all = "snake_case")]
pub enum SearchIndex {
PaymentAttempts,

View File

@ -6,6 +6,8 @@ pub struct RouterHealthCheckResponse {
pub vault: Option<bool>,
#[cfg(feature = "olap")]
pub analytics: bool,
#[cfg(feature = "olap")]
pub opensearch: bool,
pub outgoing_request: bool,
}

View File

@ -614,9 +614,9 @@ pub mod routes {
json_payload.into_inner(),
|state, auth: AuthenticationData, req, _| async move {
analytics::search::msearch_results(
&state.opensearch_client,
req,
&auth.merchant_account.merchant_id,
state.conf.opensearch.clone(),
)
.await
.map(ApplicationResponse::Json)
@ -645,9 +645,9 @@ pub mod routes {
indexed_req,
|state, auth: AuthenticationData, req, _| async move {
analytics::search::search_results(
&state.opensearch_client,
req,
&auth.merchant_account.merchant_id,
state.conf.opensearch.clone(),
)
.await
.map(ApplicationResponse::Json)

View File

@ -4,7 +4,7 @@ use std::{
};
#[cfg(feature = "olap")]
use analytics::{OpensearchConfig, ReportConfig};
use analytics::{opensearch::OpenSearchConfig, ReportConfig};
use api_models::{enums, payment_methods::RequiredFieldInfo};
use common_utils::ext_traits::ConfigExt;
use config::{Environment, File};
@ -114,7 +114,7 @@ pub struct Settings<S: SecretState> {
#[cfg(feature = "olap")]
pub report_download_config: ReportConfig,
#[cfg(feature = "olap")]
pub opensearch: OpensearchConfig,
pub opensearch: OpenSearchConfig,
pub events: EventsConfig,
#[cfg(feature = "olap")]
pub connector_onboarding: SecretStateContainer<ConnectorOnboarding, S>,
@ -730,6 +730,9 @@ impl Settings<SecuredSecret> {
self.lock_settings.validate()?;
self.events.validate()?;
#[cfg(feature = "olap")]
self.opensearch.validate()?;
self.encryption_management
.validate()
.map_err(|err| ApplicationError::InvalidConfigurationValueError(err.into()))?;

View File

@ -23,6 +23,11 @@ pub trait HealthCheckInterface {
#[cfg(feature = "olap")]
async fn health_check_analytics(&self)
-> CustomResult<HealthState, errors::HealthCheckDBError>;
#[cfg(feature = "olap")]
async fn health_check_opensearch(
&self,
) -> CustomResult<HealthState, errors::HealthCheckDBError>;
}
#[async_trait::async_trait]
@ -122,6 +127,18 @@ impl HealthCheckInterface for app::AppState {
Ok(HealthState::Running)
}
#[cfg(feature = "olap")]
async fn health_check_opensearch(
&self,
) -> CustomResult<HealthState, errors::HealthCheckDBError> {
self.opensearch_client
.deep_health_check()
.await
.change_context(errors::HealthCheckDBError::OpensearchError)?;
Ok(HealthState::Running)
}
async fn health_check_outgoing(
&self,
) -> CustomResult<HealthState, errors::HealthCheckOutGoing> {

View File

@ -41,6 +41,8 @@ use super::{currency, payment_methods::*};
use super::{ephemeral_key::*, webhooks::*};
#[cfg(feature = "oltp")]
use super::{pm_auth, poll::retrieve_poll_status};
#[cfg(feature = "olap")]
pub use crate::analytics::opensearch::OpenSearchClient;
use crate::configs::secrets_transformers;
#[cfg(all(feature = "frm", feature = "oltp"))]
use crate::routes::fraud_check as frm_routes;
@ -73,6 +75,8 @@ pub struct AppState {
pub api_client: Box<dyn crate::services::ApiClient>,
#[cfg(feature = "olap")]
pub pool: crate::analytics::AnalyticsProvider,
#[cfg(feature = "olap")]
pub opensearch_client: OpenSearchClient,
pub request_id: Option<RequestId>,
pub file_storage_client: Box<dyn FileStorageInterface>,
pub encryption_client: Box<dyn EncryptionManagementInterface>,
@ -177,6 +181,14 @@ impl AppState {
.await
.expect("Failed to create event handler");
#[allow(clippy::expect_used)]
#[cfg(feature = "olap")]
let opensearch_client = conf
.opensearch
.get_opensearch_client()
.await
.expect("Failed to create opensearch client");
let store: Box<dyn StorageInterface> = match storage_impl {
StorageImpl::Postgresql | StorageImpl::PostgresqlTest => match &event_handler {
EventsHandler::Kafka(kafka_client) => Box::new(
@ -223,6 +235,8 @@ impl AppState {
event_handler,
#[cfg(feature = "olap")]
pool,
#[cfg(feature = "olap")]
opensearch_client,
request_id: None,
file_storage_client,
encryption_client,

View File

@ -74,6 +74,10 @@ async fn deep_health_check_func(state: app::AppState) -> RouterResponse<RouterHe
})
})?;
logger::debug!("Locker health check end");
logger::debug!("Analytics health check begin");
#[cfg(feature = "olap")]
let analytics_status = state.health_check_analytics().await.map_err(|err| {
error_stack::report!(errors::ApiErrorResponse::HealthCheckError {
@ -82,6 +86,22 @@ async fn deep_health_check_func(state: app::AppState) -> RouterResponse<RouterHe
})
})?;
logger::debug!("Analytics health check end");
logger::debug!("Opensearch health check begin");
#[cfg(feature = "olap")]
let opensearch_status = state.health_check_opensearch().await.map_err(|err| {
error_stack::report!(errors::ApiErrorResponse::HealthCheckError {
component: "Opensearch",
message: err.to_string()
})
})?;
logger::debug!("Opensearch health check end");
logger::debug!("Outgoing Request health check begin");
let outgoing_check = state.health_check_outgoing().await.map_err(|err| {
error_stack::report!(errors::ApiErrorResponse::HealthCheckError {
component: "Outgoing Request",
@ -89,7 +109,7 @@ async fn deep_health_check_func(state: app::AppState) -> RouterResponse<RouterHe
})
})?;
logger::debug!("Locker health check end");
logger::debug!("Outgoing Request health check end");
let response = RouterHealthCheckResponse {
database: db_status.into(),
@ -97,6 +117,8 @@ async fn deep_health_check_func(state: app::AppState) -> RouterResponse<RouterHe
vault: locker_status.into(),
#[cfg(feature = "olap")]
analytics: analytics_status.into(),
#[cfg(feature = "olap")]
opensearch: opensearch_status.into(),
outgoing_request: outgoing_check.into(),
};

View File

@ -369,6 +369,8 @@ pub enum HealthCheckDBError {
SqlxAnalyticsError,
#[error("Error while executing query in Clickhouse Analytics")]
ClickhouseAnalyticsError,
#[error("Error while executing query in Opensearch")]
OpensearchError,
}
impl From<diesel::result::Error> for HealthCheckDBError {

View File

@ -333,3 +333,34 @@ services:
nofile:
soft: 262144
hard: 262144
fluentd:
build: ./docker/fluentd
volumes:
- ./docker/fluentd/conf:/fluentd/etc
networks:
- router_net
opensearch:
image: public.ecr.aws/opensearchproject/opensearch:1.3.14
container_name: opensearch
hostname: opensearch
environment:
- "discovery.type=single-node"
expose:
- "9200"
ports:
- "9200:9200"
networks:
- router_net
opensearch-dashboards:
image: opensearchproject/opensearch-dashboards:1.3.14
ports:
- 5601:5601
expose:
- "5601"
environment:
OPENSEARCH_HOSTS: '["https://opensearch:9200"]'
networks:
- router_net

View File

@ -0,0 +1,7 @@
# docker/fluentd/Dockerfile
FROM fluent/fluentd:v1.16-debian-2
USER root
RUN ["gem", "install", "fluent-plugin-kafka", "--no-document"]
RUN ["gem", "install", "fluent-plugin-opensearch", "--no-document"]
USER fluent

View File

@ -0,0 +1,137 @@
# docker/fluentd/conf/fluent.conf
<source>
@type kafka_group
brokers kafka0:29092
consumer_group fluentd
topics hyperswitch-payment-intent-events,hyperswitch-payment-attempt-events,hyperswitch-refund-events,hyperswitch-dispute-events
add_headers false
add_prefix topic
retry_emit_limit 2
</source>
<filter topic.hyperswitch-payment-intent-events*>
@type record_transformer
renew_time_key created_at
</filter>
<filter topic.hyperswitch-payment-attempt-events*>
@type record_transformer
renew_time_key created_at
</filter>
<filter topic.hyperswitch-refund-events*>
@type record_transformer
renew_time_key created_at
</filter>
<filter topic.hyperswitch-dispute-events*>
@type record_transformer
renew_time_key created_at
</filter>
<match topic.hyperswitch-payment-intent-events*>
@type copy
<store>
@type stdout
</store>
<store>
@type opensearch
host opensearch
port 9200
scheme https
index_name hyperswitch-payment-intent-events
id_key payment_id
user admin
password admin
ssl_verify false
prefer_oj_serializer true
reload_on_failure true
reload_connections false
request_timeout 120s
bulk_message_request_threshold 10MB
include_timestamp true
</store>
</match>
<match topic.hyperswitch-payment-attempt-events*>
@type copy
<store>
@type stdout
</store>
<store>
@type opensearch
host opensearch
port 9200
scheme https
index_name hyperswitch-payment-attempt-events
id_key attempt_id
user admin
password admin
ssl_verify false
prefer_oj_serializer true
reload_on_failure true
reload_connections false
request_timeout 120s
bulk_message_request_threshold 10MB
include_timestamp true
</store>
</match>
<match topic.hyperswitch-refund-events*>
@type copy
<store>
@type stdout
</store>
<store>
@type opensearch
host opensearch
port 9200
scheme https
index_name hyperswitch-refund-events
id_key refund_id
user admin
password admin
ssl_verify false
prefer_oj_serializer true
reload_on_failure true
reload_connections false
request_timeout 120s
bulk_message_request_threshold 10MB
include_timestamp true
</store>
</match>
<match topic.hyperswitch-dispute-events*>
@type copy
<store>
@type stdout
</store>
<store>
@type opensearch
host opensearch
port 9200
scheme https
index_name hyperswitch-dispute-events
id_key dispute_id
user admin
password admin
ssl_verify false
prefer_oj_serializer true
reload_on_failure true
reload_connections false
request_timeout 120s
bulk_message_request_threshold 10MB
include_timestamp true
</store>
</match>