mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-11-02 04:04:43 +08:00
feat(opensearch): Updated status filter field name to match index and added time-range based search (#5468)
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
@ -1,6 +1,7 @@
|
||||
use api_models::{
|
||||
analytics::search::SearchIndex,
|
||||
errors::types::{ApiError, ApiErrorResponse},
|
||||
payments::TimeRange,
|
||||
};
|
||||
use aws_config::{self, meta::region::RegionProviderChain, Region};
|
||||
use common_utils::errors::{CustomResult, ErrorSwitch};
|
||||
@ -18,8 +19,9 @@ use opensearch::{
|
||||
},
|
||||
MsearchParts, OpenSearch, SearchParts,
|
||||
};
|
||||
use serde_json::{json, Value};
|
||||
use serde_json::{json, Map, Value};
|
||||
use storage_impl::errors::ApplicationError;
|
||||
use time::PrimitiveDateTime;
|
||||
|
||||
use super::{health_check::HealthCheck, query::QueryResult, types::QueryExecutionError};
|
||||
use crate::query::QueryBuildingError;
|
||||
@ -40,6 +42,23 @@ pub struct OpenSearchIndexes {
|
||||
pub disputes: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, Hash)]
|
||||
pub struct OpensearchTimeRange {
|
||||
#[serde(with = "common_utils::custom_serde::iso8601")]
|
||||
pub gte: PrimitiveDateTime,
|
||||
#[serde(default, with = "common_utils::custom_serde::iso8601::option")]
|
||||
pub lte: Option<PrimitiveDateTime>,
|
||||
}
|
||||
|
||||
impl From<TimeRange> for OpensearchTimeRange {
|
||||
fn from(time_range: TimeRange) -> Self {
|
||||
Self {
|
||||
gte: time_range.start_time,
|
||||
lte: time_range.end_time,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde::Deserialize)]
|
||||
pub struct OpenSearchConfig {
|
||||
host: String,
|
||||
@ -377,6 +396,7 @@ pub struct OpenSearchQueryBuilder {
|
||||
pub offset: Option<i64>,
|
||||
pub count: Option<i64>,
|
||||
pub filters: Vec<(String, Vec<String>)>,
|
||||
pub time_range: Option<OpensearchTimeRange>,
|
||||
}
|
||||
|
||||
impl OpenSearchQueryBuilder {
|
||||
@ -387,6 +407,7 @@ impl OpenSearchQueryBuilder {
|
||||
offset: Default::default(),
|
||||
count: Default::default(),
|
||||
filters: Default::default(),
|
||||
time_range: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -396,27 +417,110 @@ impl OpenSearchQueryBuilder {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn set_time_range(&mut self, time_range: OpensearchTimeRange) -> QueryResult<()> {
|
||||
self.time_range = Some(time_range);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn add_filter_clause(&mut self, lhs: String, rhs: Vec<String>) -> QueryResult<()> {
|
||||
self.filters.push((lhs, rhs));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_status_field(&self, index: &SearchIndex) -> &str {
|
||||
match index {
|
||||
SearchIndex::Refunds => "refund_status.keyword",
|
||||
SearchIndex::Disputes => "dispute_status.keyword",
|
||||
_ => "status.keyword",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn replace_status_field(&self, filters: &[Value], index: &SearchIndex) -> Vec<Value> {
|
||||
filters
|
||||
.iter()
|
||||
.map(|filter| {
|
||||
if let Some(terms) = filter.get("terms").and_then(|v| v.as_object()) {
|
||||
let mut new_filter = filter.clone();
|
||||
if let Some(new_terms) =
|
||||
new_filter.get_mut("terms").and_then(|v| v.as_object_mut())
|
||||
{
|
||||
let key = "status.keyword";
|
||||
if let Some(status_terms) = terms.get(key) {
|
||||
new_terms.remove(key);
|
||||
new_terms.insert(
|
||||
self.get_status_field(index).to_string(),
|
||||
status_terms.clone(),
|
||||
);
|
||||
}
|
||||
}
|
||||
new_filter
|
||||
} else {
|
||||
filter.clone()
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// # Panics
|
||||
///
|
||||
/// This function will panic if:
|
||||
///
|
||||
/// * The structure of the JSON query is not as expected (e.g., missing keys or incorrect types).
|
||||
///
|
||||
/// Ensure that the input data and the structure of the query are valid and correctly handled.
|
||||
pub fn construct_payload(&self, indexes: &[SearchIndex]) -> QueryResult<Vec<Value>> {
|
||||
let mut query =
|
||||
vec![json!({"multi_match": {"type": "phrase", "query": self.query, "lenient": true}})];
|
||||
let mut query_obj = Map::new();
|
||||
let mut bool_obj = Map::new();
|
||||
let mut filter_array = Vec::new();
|
||||
|
||||
filter_array.push(json!({
|
||||
"multi_match": {
|
||||
"type": "phrase",
|
||||
"query": self.query,
|
||||
"lenient": true
|
||||
}
|
||||
}));
|
||||
|
||||
let mut filters = self
|
||||
.filters
|
||||
.iter()
|
||||
.map(|(k, v)| json!({"terms" : {k : v}}))
|
||||
.map(|(k, v)| json!({"terms": {k: v}}))
|
||||
.collect::<Vec<Value>>();
|
||||
|
||||
query.append(&mut filters);
|
||||
filter_array.append(&mut filters);
|
||||
|
||||
if let Some(ref time_range) = self.time_range {
|
||||
let range = json!(time_range);
|
||||
filter_array.push(json!({
|
||||
"range": {
|
||||
"timestamp": range
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
bool_obj.insert("filter".to_string(), Value::Array(filter_array));
|
||||
query_obj.insert("bool".to_string(), Value::Object(bool_obj));
|
||||
|
||||
let mut query = Map::new();
|
||||
query.insert("query".to_string(), Value::Object(query_obj));
|
||||
|
||||
// TODO add index specific filters
|
||||
Ok(indexes
|
||||
.iter()
|
||||
.map(|_index| json!({"query": {"bool": {"filter": query}}}))
|
||||
.map(|index| {
|
||||
let updated_query = query
|
||||
.get("query")
|
||||
.and_then(|q| q.get("bool"))
|
||||
.and_then(|b| b.get("filter"))
|
||||
.and_then(|f| f.as_array())
|
||||
.map(|filters| self.replace_status_field(filters, index))
|
||||
.unwrap_or_default();
|
||||
|
||||
let mut final_query = Map::new();
|
||||
final_query.insert("bool".to_string(), json!({ "filter": updated_query }));
|
||||
|
||||
let payload = json!({ "query": Value::Object(final_query) });
|
||||
payload
|
||||
})
|
||||
.collect::<Vec<Value>>())
|
||||
}
|
||||
}
|
||||
|
||||
@ -97,6 +97,10 @@ pub async fn msearch_results(
|
||||
};
|
||||
};
|
||||
|
||||
if let Some(time_range) = req.time_range {
|
||||
query_builder.set_time_range(time_range.into()).switch()?;
|
||||
};
|
||||
|
||||
let response_text: OpenMsearchOutput = client
|
||||
.execute(query_builder)
|
||||
.await
|
||||
@ -221,6 +225,11 @@ pub async fn search_results(
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
if let Some(time_range) = search_req.time_range {
|
||||
query_builder.set_time_range(time_range.into()).switch()?;
|
||||
};
|
||||
|
||||
query_builder
|
||||
.set_offset_n_count(search_req.offset, search_req.count)
|
||||
.switch()?;
|
||||
|
||||
@ -2,6 +2,8 @@ use common_utils::hashing::HashedString;
|
||||
use masking::WithType;
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::payments::TimeRange;
|
||||
|
||||
#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)]
|
||||
pub struct SearchFilters {
|
||||
pub payment_method: Option<Vec<String>>,
|
||||
@ -26,6 +28,8 @@ pub struct GetGlobalSearchRequest {
|
||||
pub query: String,
|
||||
#[serde(default)]
|
||||
pub filters: Option<SearchFilters>,
|
||||
#[serde(default)]
|
||||
pub time_range: Option<TimeRange>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
@ -36,6 +40,8 @@ pub struct GetSearchRequest {
|
||||
pub query: String,
|
||||
#[serde(default)]
|
||||
pub filters: Option<SearchFilters>,
|
||||
#[serde(default)]
|
||||
pub time_range: Option<TimeRange>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
|
||||
Reference in New Issue
Block a user