mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-28 04:04:55 +08:00
fix(opensearch): handle index not present errors in search api (#4965)
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Co-authored-by: ivor-juspay <138492857+ivor-juspay@users.noreply.github.com>
This commit is contained in:
@ -101,4 +101,18 @@ Here's an example of how to do this:
|
|||||||
[default.features]
|
[default.features]
|
||||||
audit_trail=true
|
audit_trail=true
|
||||||
system_metrics=true
|
system_metrics=true
|
||||||
```
|
global_search=true
|
||||||
|
```
|
||||||
|
|
||||||
|
## Viewing the data on OpenSearch Dashboard
|
||||||
|
|
||||||
|
To view the data on the OpenSearch dashboard perform the following steps:
|
||||||
|
|
||||||
|
- Go to the OpenSearch Dashboard home and click on `Stack Management` under the Management tab
|
||||||
|
- Select `Index Patterns`
|
||||||
|
- Click on `Create index pattern`
|
||||||
|
- Define an index pattern with the same name that matches your indices and click on `Next Step`
|
||||||
|
- Select a time field that will be used for time-based queries
|
||||||
|
- Save the index pattern
|
||||||
|
|
||||||
|
Now, head on to the `Discover` tab, to select the newly created index pattern and query the data
|
||||||
@ -76,6 +76,8 @@ pub enum OpenSearchError {
|
|||||||
ResponseError,
|
ResponseError,
|
||||||
#[error("Opensearch query building error")]
|
#[error("Opensearch query building error")]
|
||||||
QueryBuildingError,
|
QueryBuildingError,
|
||||||
|
#[error("Opensearch deserialisation error")]
|
||||||
|
DeserialisationError,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ErrorSwitch<OpenSearchError> for QueryBuildingError {
|
impl ErrorSwitch<OpenSearchError> for QueryBuildingError {
|
||||||
@ -111,6 +113,12 @@ impl ErrorSwitch<ApiErrorResponse> for OpenSearchError {
|
|||||||
"Query building error",
|
"Query building error",
|
||||||
None,
|
None,
|
||||||
)),
|
)),
|
||||||
|
Self::DeserialisationError => ApiErrorResponse::InternalServerError(ApiError::new(
|
||||||
|
"IR",
|
||||||
|
0,
|
||||||
|
"Deserialisation error",
|
||||||
|
None,
|
||||||
|
)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -4,7 +4,7 @@ use api_models::analytics::search::{
|
|||||||
};
|
};
|
||||||
use common_utils::errors::{CustomResult, ReportSwitchExt};
|
use common_utils::errors::{CustomResult, ReportSwitchExt};
|
||||||
use error_stack::ResultExt;
|
use error_stack::ResultExt;
|
||||||
use serde_json::Value;
|
use router_env::tracing;
|
||||||
use strum::IntoEnumIterator;
|
use strum::IntoEnumIterator;
|
||||||
|
|
||||||
use crate::opensearch::{
|
use crate::opensearch::{
|
||||||
@ -22,27 +22,59 @@ pub async fn msearch_results(
|
|||||||
.add_filter_clause("merchant_id".to_string(), merchant_id.to_string())
|
.add_filter_clause("merchant_id".to_string(), merchant_id.to_string())
|
||||||
.switch()?;
|
.switch()?;
|
||||||
|
|
||||||
let response_body = client
|
let response_text: OpenMsearchOutput = client
|
||||||
.execute(query_builder)
|
.execute(query_builder)
|
||||||
.await
|
.await
|
||||||
.change_context(OpenSearchError::ConnectionError)?
|
.change_context(OpenSearchError::ConnectionError)?
|
||||||
.json::<OpenMsearchOutput<Value>>()
|
.text()
|
||||||
.await
|
.await
|
||||||
.change_context(OpenSearchError::ResponseError)?;
|
.change_context(OpenSearchError::ResponseError)
|
||||||
|
.and_then(|body: String| {
|
||||||
|
serde_json::from_str::<OpenMsearchOutput>(&body)
|
||||||
|
.change_context(OpenSearchError::DeserialisationError)
|
||||||
|
.attach_printable(body.clone())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let response_body: OpenMsearchOutput = response_text;
|
||||||
|
|
||||||
Ok(response_body
|
Ok(response_body
|
||||||
.responses
|
.responses
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.zip(SearchIndex::iter())
|
.zip(SearchIndex::iter())
|
||||||
.map(|(index_hit, index)| GetSearchResponse {
|
.map(|(index_hit, index)| match index_hit {
|
||||||
count: index_hit.hits.total.value,
|
OpensearchOutput::Success(success) => {
|
||||||
index,
|
if success.status == 200 {
|
||||||
hits: index_hit
|
GetSearchResponse {
|
||||||
.hits
|
count: success.hits.total.value,
|
||||||
.hits
|
index,
|
||||||
.into_iter()
|
hits: success
|
||||||
.map(|hit| hit._source)
|
.hits
|
||||||
.collect(),
|
.hits
|
||||||
|
.into_iter()
|
||||||
|
.map(|hit| hit.source)
|
||||||
|
.collect(),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tracing::error!("Unexpected status code: {}", success.status,);
|
||||||
|
GetSearchResponse {
|
||||||
|
count: 0,
|
||||||
|
index,
|
||||||
|
hits: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
OpensearchOutput::Error(error) => {
|
||||||
|
tracing::error!(
|
||||||
|
index = ?index,
|
||||||
|
error_response = ?error,
|
||||||
|
"Search error"
|
||||||
|
);
|
||||||
|
GetSearchResponse {
|
||||||
|
count: 0,
|
||||||
|
index,
|
||||||
|
hits: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.collect())
|
.collect())
|
||||||
}
|
}
|
||||||
@ -65,22 +97,54 @@ pub async fn search_results(
|
|||||||
.set_offset_n_count(search_req.offset, search_req.count)
|
.set_offset_n_count(search_req.offset, search_req.count)
|
||||||
.switch()?;
|
.switch()?;
|
||||||
|
|
||||||
let response_body = client
|
let response_text: OpensearchOutput = client
|
||||||
.execute(query_builder)
|
.execute(query_builder)
|
||||||
.await
|
.await
|
||||||
.change_context(OpenSearchError::ConnectionError)?
|
.change_context(OpenSearchError::ConnectionError)?
|
||||||
.json::<OpensearchOutput<Value>>()
|
.text()
|
||||||
.await
|
.await
|
||||||
.change_context(OpenSearchError::ResponseError)?;
|
.change_context(OpenSearchError::ResponseError)
|
||||||
|
.and_then(|body: String| {
|
||||||
|
serde_json::from_str::<OpensearchOutput>(&body)
|
||||||
|
.change_context(OpenSearchError::DeserialisationError)
|
||||||
|
.attach_printable(body.clone())
|
||||||
|
})?;
|
||||||
|
|
||||||
Ok(GetSearchResponse {
|
let response_body: OpensearchOutput = response_text;
|
||||||
count: response_body.hits.total.value,
|
|
||||||
index: req.index,
|
match response_body {
|
||||||
hits: response_body
|
OpensearchOutput::Success(success) => {
|
||||||
.hits
|
if success.status == 200 {
|
||||||
.hits
|
Ok(GetSearchResponse {
|
||||||
.into_iter()
|
count: success.hits.total.value,
|
||||||
.map(|hit| hit._source)
|
index: req.index,
|
||||||
.collect(),
|
hits: success
|
||||||
})
|
.hits
|
||||||
|
.hits
|
||||||
|
.into_iter()
|
||||||
|
.map(|hit| hit.source)
|
||||||
|
.collect(),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
tracing::error!("Unexpected status code: {}", success.status);
|
||||||
|
Ok(GetSearchResponse {
|
||||||
|
count: 0,
|
||||||
|
index: req.index,
|
||||||
|
hits: Vec::new(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
OpensearchOutput::Error(error) => {
|
||||||
|
tracing::error!(
|
||||||
|
index = ?req.index,
|
||||||
|
error_response = ?error,
|
||||||
|
"Search error"
|
||||||
|
);
|
||||||
|
Ok(GetSearchResponse {
|
||||||
|
count: 0,
|
||||||
|
index: req.index,
|
||||||
|
hits: Vec::new(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -48,19 +48,40 @@ pub struct GetSearchResponse {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize)]
|
#[derive(Debug, serde::Deserialize)]
|
||||||
pub struct OpenMsearchOutput<T> {
|
pub struct OpenMsearchOutput {
|
||||||
pub responses: Vec<OpensearchOutput<T>>,
|
pub responses: Vec<OpensearchOutput>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize)]
|
#[derive(Debug, serde::Deserialize)]
|
||||||
pub struct OpensearchOutput<T> {
|
#[serde(untagged)]
|
||||||
pub hits: OpensearchResults<T>,
|
pub enum OpensearchOutput {
|
||||||
|
Success(OpensearchSuccess),
|
||||||
|
Error(OpensearchError),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize)]
|
#[derive(Debug, serde::Deserialize)]
|
||||||
pub struct OpensearchResults<T> {
|
pub struct OpensearchError {
|
||||||
|
pub error: OpensearchErrorDetails,
|
||||||
|
pub status: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, serde::Deserialize)]
|
||||||
|
pub struct OpensearchErrorDetails {
|
||||||
|
#[serde(rename = "type")]
|
||||||
|
pub error_type: String,
|
||||||
|
pub reason: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, serde::Deserialize)]
|
||||||
|
pub struct OpensearchSuccess {
|
||||||
|
pub status: u16,
|
||||||
|
pub hits: OpensearchHits,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, serde::Deserialize)]
|
||||||
|
pub struct OpensearchHits {
|
||||||
pub total: OpensearchResultsTotal,
|
pub total: OpensearchResultsTotal,
|
||||||
pub hits: Vec<OpensearchHits<T>>,
|
pub hits: Vec<OpensearchHit>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize)]
|
#[derive(Debug, serde::Deserialize)]
|
||||||
@ -69,6 +90,7 @@ pub struct OpensearchResultsTotal {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize)]
|
#[derive(Debug, serde::Deserialize)]
|
||||||
pub struct OpensearchHits<T> {
|
pub struct OpensearchHit {
|
||||||
pub _source: T,
|
#[serde(rename = "_source")]
|
||||||
|
pub source: Value,
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user