fix(opensearch): show search results only if user has access permission to the index (#5097)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: Abhishek Kanojia <89402434+Abhitator216@users.noreply.github.com>
This commit is contained in:
Sandeep Kumar
2024-06-27 13:06:31 +05:30
committed by GitHub
parent ef5ed08380
commit 9c49ded104
7 changed files with 130 additions and 78 deletions

View File

@ -20,7 +20,6 @@ use opensearch::{
}; };
use serde_json::{json, Value}; use serde_json::{json, Value};
use storage_impl::errors::ApplicationError; use storage_impl::errors::ApplicationError;
use strum::IntoEnumIterator;
use super::{health_check::HealthCheck, query::QueryResult, types::QueryExecutionError}; use super::{health_check::HealthCheck, query::QueryResult, types::QueryExecutionError};
use crate::query::QueryBuildingError; use crate::query::QueryBuildingError;
@ -78,6 +77,10 @@ pub enum OpenSearchError {
QueryBuildingError, QueryBuildingError,
#[error("Opensearch deserialisation error")] #[error("Opensearch deserialisation error")]
DeserialisationError, DeserialisationError,
#[error("Opensearch index access not present error: {0:?}")]
IndexAccessNotPermittedError(SearchIndex),
#[error("Opensearch unknown error")]
UnknownError,
} }
impl ErrorSwitch<OpenSearchError> for QueryBuildingError { impl ErrorSwitch<OpenSearchError> for QueryBuildingError {
@ -97,28 +100,39 @@ impl ErrorSwitch<ApiErrorResponse> for OpenSearchError {
)), )),
Self::ResponseNotOK(response) => ApiErrorResponse::InternalServerError(ApiError::new( Self::ResponseNotOK(response) => ApiErrorResponse::InternalServerError(ApiError::new(
"IR", "IR",
0, 1,
format!("Something went wrong {}", response), format!("Something went wrong {}", response),
None, None,
)), )),
Self::ResponseError => ApiErrorResponse::InternalServerError(ApiError::new( Self::ResponseError => ApiErrorResponse::InternalServerError(ApiError::new(
"IR", "IR",
0, 2,
"Something went wrong", "Something went wrong",
None, None,
)), )),
Self::QueryBuildingError => ApiErrorResponse::InternalServerError(ApiError::new( Self::QueryBuildingError => ApiErrorResponse::InternalServerError(ApiError::new(
"IR", "IR",
0, 3,
"Query building error", "Query building error",
None, None,
)), )),
Self::DeserialisationError => ApiErrorResponse::InternalServerError(ApiError::new( Self::DeserialisationError => ApiErrorResponse::InternalServerError(ApiError::new(
"IR", "IR",
0, 4,
"Deserialisation error", "Deserialisation error",
None, None,
)), )),
Self::IndexAccessNotPermittedError(index) => {
ApiErrorResponse::ForbiddenCommonResource(ApiError::new(
"IR",
5,
format!("Index access not permitted: {index:?}"),
None,
))
}
Self::UnknownError => {
ApiErrorResponse::InternalServerError(ApiError::new("IR", 6, "Unknown error", None))
}
} }
} }
} }
@ -179,18 +193,16 @@ impl OpenSearchClient {
query_builder: OpenSearchQueryBuilder, query_builder: OpenSearchQueryBuilder,
) -> CustomResult<Response, OpenSearchError> { ) -> CustomResult<Response, OpenSearchError> {
match query_builder.query_type { match query_builder.query_type {
OpenSearchQuery::Msearch => { OpenSearchQuery::Msearch(ref indexes) => {
let search_indexes = SearchIndex::iter();
let payload = query_builder let payload = query_builder
.construct_payload(search_indexes.clone().collect()) .construct_payload(indexes)
.change_context(OpenSearchError::QueryBuildingError)?; .change_context(OpenSearchError::QueryBuildingError)?;
let payload_with_indexes = payload.into_iter().zip(search_indexes).fold( let payload_with_indexes = payload.into_iter().zip(indexes).fold(
Vec::new(), Vec::new(),
|mut payload_with_indexes, (index_hit, index)| { |mut payload_with_indexes, (index_hit, index)| {
payload_with_indexes.push( payload_with_indexes.push(
json!({"index": self.search_index_to_opensearch_index(index)}).into(), json!({"index": self.search_index_to_opensearch_index(*index)}).into(),
); );
payload_with_indexes.push(JsonBody::new(index_hit.clone())); payload_with_indexes.push(JsonBody::new(index_hit.clone()));
payload_with_indexes payload_with_indexes
@ -207,7 +219,7 @@ impl OpenSearchClient {
OpenSearchQuery::Search(index) => { OpenSearchQuery::Search(index) => {
let payload = query_builder let payload = query_builder
.clone() .clone()
.construct_payload(vec![index]) .construct_payload(&[index])
.change_context(OpenSearchError::QueryBuildingError)?; .change_context(OpenSearchError::QueryBuildingError)?;
let final_payload = payload.first().unwrap_or(&Value::Null); let final_payload = payload.first().unwrap_or(&Value::Null);
@ -349,7 +361,7 @@ pub struct OpenSearchHealth {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum OpenSearchQuery { pub enum OpenSearchQuery {
Msearch, Msearch(Vec<SearchIndex>),
Search(SearchIndex), Search(SearchIndex),
} }
@ -384,7 +396,7 @@ impl OpenSearchQueryBuilder {
Ok(()) Ok(())
} }
pub fn construct_payload(&self, indexes: Vec<SearchIndex>) -> QueryResult<Vec<Value>> { pub fn construct_payload(&self, indexes: &[SearchIndex]) -> QueryResult<Vec<Value>> {
let mut query = let mut query =
vec![json!({"multi_match": {"type": "phrase", "query": self.query, "lenient": true}})]; vec![json!({"multi_match": {"type": "phrase", "query": self.query, "lenient": true}})];

View File

@ -1,11 +1,10 @@
use api_models::analytics::search::{ use api_models::analytics::search::{
GetGlobalSearchRequest, GetSearchRequestWithIndex, GetSearchResponse, OpenMsearchOutput, GetGlobalSearchRequest, GetSearchRequestWithIndex, GetSearchResponse, OpenMsearchOutput,
OpensearchOutput, SearchIndex, OpensearchOutput, SearchIndex, SearchStatus,
}; };
use common_utils::errors::{CustomResult, ReportSwitchExt}; use common_utils::errors::{CustomResult, ReportSwitchExt};
use error_stack::ResultExt; use error_stack::ResultExt;
use router_env::tracing; use router_env::tracing;
use strum::IntoEnumIterator;
use crate::opensearch::{ use crate::opensearch::{
OpenSearchClient, OpenSearchError, OpenSearchQuery, OpenSearchQueryBuilder, OpenSearchClient, OpenSearchError, OpenSearchQuery, OpenSearchQueryBuilder,
@ -15,8 +14,10 @@ pub async fn msearch_results(
client: &OpenSearchClient, client: &OpenSearchClient,
req: GetGlobalSearchRequest, req: GetGlobalSearchRequest,
merchant_id: &String, merchant_id: &String,
indexes: Vec<SearchIndex>,
) -> CustomResult<Vec<GetSearchResponse>, OpenSearchError> { ) -> CustomResult<Vec<GetSearchResponse>, OpenSearchError> {
let mut query_builder = OpenSearchQueryBuilder::new(OpenSearchQuery::Msearch, req.query); let mut query_builder =
OpenSearchQueryBuilder::new(OpenSearchQuery::Msearch(indexes.clone()), req.query);
query_builder query_builder
.add_filter_clause("merchant_id".to_string(), merchant_id.to_string()) .add_filter_clause("merchant_id".to_string(), merchant_id.to_string())
@ -40,29 +41,19 @@ pub async fn msearch_results(
Ok(response_body Ok(response_body
.responses .responses
.into_iter() .into_iter()
.zip(SearchIndex::iter()) .zip(indexes)
.map(|(index_hit, index)| match index_hit { .map(|(index_hit, index)| match index_hit {
OpensearchOutput::Success(success) => { OpensearchOutput::Success(success) => GetSearchResponse {
if success.status == 200 { count: success.hits.total.value,
GetSearchResponse { index,
count: success.hits.total.value, hits: success
index, .hits
hits: success .hits
.hits .into_iter()
.hits .map(|hit| hit.source)
.into_iter() .collect(),
.map(|hit| hit.source) status: SearchStatus::Success,
.collect(), },
}
} else {
tracing::error!("Unexpected status code: {}", success.status,);
GetSearchResponse {
count: 0,
index,
hits: Vec::new(),
}
}
}
OpensearchOutput::Error(error) => { OpensearchOutput::Error(error) => {
tracing::error!( tracing::error!(
index = ?index, index = ?index,
@ -73,6 +64,7 @@ pub async fn msearch_results(
count: 0, count: 0,
index, index,
hits: Vec::new(), hits: Vec::new(),
status: SearchStatus::Failure,
} }
} }
}) })
@ -113,27 +105,17 @@ pub async fn search_results(
let response_body: OpensearchOutput = response_text; let response_body: OpensearchOutput = response_text;
match response_body { match response_body {
OpensearchOutput::Success(success) => { OpensearchOutput::Success(success) => Ok(GetSearchResponse {
if success.status == 200 { count: success.hits.total.value,
Ok(GetSearchResponse { index: req.index,
count: success.hits.total.value, hits: success
index: req.index, .hits
hits: success .hits
.hits .into_iter()
.hits .map(|hit| hit.source)
.into_iter() .collect(),
.map(|hit| hit.source) status: SearchStatus::Success,
.collect(), }),
})
} else {
tracing::error!("Unexpected status code: {}", success.status);
Ok(GetSearchResponse {
count: 0,
index: req.index,
hits: Vec::new(),
})
}
}
OpensearchOutput::Error(error) => { OpensearchOutput::Error(error) => {
tracing::error!( tracing::error!(
index = ?req.index, index = ?req.index,
@ -144,6 +126,7 @@ pub async fn search_results(
count: 0, count: 0,
index: req.index, index: req.index,
hits: Vec::new(), hits: Vec::new(),
status: SearchStatus::Failure,
}) })
} }
} }

View File

@ -30,7 +30,9 @@ pub struct GetSearchRequestWithIndex {
pub search_req: GetSearchRequest, pub search_req: GetSearchRequest,
} }
#[derive(Debug, strum::EnumIter, Clone, serde::Deserialize, serde::Serialize, Copy)] #[derive(
Debug, strum::EnumIter, Clone, serde::Deserialize, serde::Serialize, Copy, Eq, PartialEq,
)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub enum SearchIndex { pub enum SearchIndex {
PaymentAttempts, PaymentAttempts,
@ -39,17 +41,26 @@ pub enum SearchIndex {
Disputes, Disputes,
} }
#[derive(Debug, strum::EnumIter, Clone, serde::Deserialize, serde::Serialize, Copy)]
pub enum SearchStatus {
Success,
Failure,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct GetSearchResponse { pub struct GetSearchResponse {
pub count: u64, pub count: u64,
pub index: SearchIndex, pub index: SearchIndex,
pub hits: Vec<Value>, pub hits: Vec<Value>,
pub status: SearchStatus,
} }
#[derive(Debug, serde::Deserialize)] #[derive(Debug, serde::Deserialize)]
pub struct OpenMsearchOutput { pub struct OpenMsearchOutput {
#[serde(default)]
pub responses: Vec<OpensearchOutput>, pub responses: Vec<OpensearchOutput>,
pub error: Option<OpensearchErrorDetails>,
} }
#[derive(Debug, serde::Deserialize)] #[derive(Debug, serde::Deserialize)]
@ -74,7 +85,6 @@ pub struct OpensearchErrorDetails {
#[derive(Debug, serde::Deserialize)] #[derive(Debug, serde::Deserialize)]
pub struct OpensearchSuccess { pub struct OpensearchSuccess {
pub status: u16,
pub hits: OpensearchHits, pub hits: OpensearchHits,
} }

View File

@ -4,7 +4,7 @@ pub mod routes {
use actix_web::{web, Responder, Scope}; use actix_web::{web, Responder, Scope};
use analytics::{ use analytics::{
api_event::api_events_core, connector_events::connector_events_core, api_event::api_events_core, connector_events::connector_events_core,
errors::AnalyticsError, lambda_utils::invoke_lambda, errors::AnalyticsError, lambda_utils::invoke_lambda, opensearch::OpenSearchError,
outgoing_webhook_event::outgoing_webhook_events_core, sdk_events::sdk_events_core, outgoing_webhook_event::outgoing_webhook_events_core, sdk_events::sdk_events_core,
AnalyticsFlow, AnalyticsFlow,
}; };
@ -20,13 +20,14 @@ pub mod routes {
use error_stack::ResultExt; use error_stack::ResultExt;
use crate::{ use crate::{
core::api_locking, consts::opensearch::OPENSEARCH_INDEX_PERMISSIONS,
core::{api_locking, errors::user::UserErrors},
db::user::UserInterface, db::user::UserInterface,
routes::AppState, routes::AppState,
services::{ services::{
api, api,
authentication::{self as auth, AuthenticationData}, authentication::{self as auth, AuthenticationData, UserFromToken},
authorization::permissions::Permission, authorization::{permissions::Permission, roles::RoleInfo},
ApplicationResponse, ApplicationResponse,
}, },
types::domain::UserEmail, types::domain::UserEmail,
@ -694,11 +695,25 @@ pub mod routes {
state.clone(), state.clone(),
&req, &req,
json_payload.into_inner(), json_payload.into_inner(),
|state, auth: AuthenticationData, req, _| async move { |state, auth: UserFromToken, req, _| async move {
let role_id = auth.role_id;
let role_info =
RoleInfo::from_role_id(&state, &role_id, &auth.merchant_id, &auth.org_id)
.await
.change_context(UserErrors::InternalServerError)
.change_context(OpenSearchError::UnknownError)?;
let permissions = role_info.get_permissions_set();
let accessible_indexes: Vec<_> = OPENSEARCH_INDEX_PERMISSIONS
.iter()
.filter(|(_, perm)| perm.iter().any(|p| permissions.contains(p)))
.map(|(i, _)| *i)
.collect();
analytics::search::msearch_results( analytics::search::msearch_results(
&state.opensearch_client, &state.opensearch_client,
req, req,
&auth.merchant_account.merchant_id, &auth.merchant_id,
accessible_indexes,
) )
.await .await
.map(ApplicationResponse::Json) .map(ApplicationResponse::Json)
@ -715,24 +730,33 @@ pub mod routes {
json_payload: web::Json<GetSearchRequest>, json_payload: web::Json<GetSearchRequest>,
index: web::Path<SearchIndex>, index: web::Path<SearchIndex>,
) -> impl Responder { ) -> impl Responder {
let index = index.into_inner();
let flow = AnalyticsFlow::GetSearchResults; let flow = AnalyticsFlow::GetSearchResults;
let indexed_req = GetSearchRequestWithIndex { let indexed_req = GetSearchRequestWithIndex {
search_req: json_payload.into_inner(), search_req: json_payload.into_inner(),
index: index.into_inner(), index,
}; };
Box::pin(api::server_wrap( Box::pin(api::server_wrap(
flow, flow,
state.clone(), state.clone(),
&req, &req,
indexed_req, indexed_req,
|state, auth: AuthenticationData, req, _| async move { |state, auth: UserFromToken, req, _| async move {
analytics::search::search_results( let role_id = auth.role_id;
&state.opensearch_client, let role_info =
req, RoleInfo::from_role_id(&state, &role_id, &auth.merchant_id, &auth.org_id)
&auth.merchant_account.merchant_id, .await
) .change_context(UserErrors::InternalServerError)
.await .change_context(OpenSearchError::UnknownError)?;
.map(ApplicationResponse::Json) let permissions = role_info.get_permissions_set();
let _ = OPENSEARCH_INDEX_PERMISSIONS
.iter()
.filter(|(ind, _)| *ind == index)
.find(|i| i.1.iter().any(|p| permissions.contains(p)))
.ok_or(OpenSearchError::IndexAccessNotPermittedError(index))?;
analytics::search::search_results(&state.opensearch_client, req, &auth.merchant_id)
.await
.map(ApplicationResponse::Json)
}, },
&auth::JWTAuth(Permission::Analytics), &auth::JWTAuth(Permission::Analytics),
api_locking::LockAction::NotApplicable, api_locking::LockAction::NotApplicable,

View File

@ -1,3 +1,4 @@
pub mod opensearch;
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
pub mod user; pub mod user;
pub mod user_role; pub mod user_role;

View File

@ -0,0 +1,22 @@
use api_models::analytics::search::SearchIndex;
use crate::services::authorization::permissions::Permission;
pub const OPENSEARCH_INDEX_PERMISSIONS: &[(SearchIndex, &[Permission])] = &[
(
SearchIndex::PaymentAttempts,
&[Permission::PaymentRead, Permission::PaymentWrite],
),
(
SearchIndex::PaymentIntents,
&[Permission::PaymentRead, Permission::PaymentWrite],
),
(
SearchIndex::Refunds,
&[Permission::RefundRead, Permission::RefundWrite],
),
(
SearchIndex::Disputes,
&[Permission::DisputeRead, Permission::DisputeWrite],
),
];

View File

@ -83,7 +83,7 @@ impl EventsConfig {
impl EventsHandler { impl EventsHandler {
pub fn log_event<T: KafkaMessage>(&self, event: &T) { pub fn log_event<T: KafkaMessage>(&self, event: &T) {
match self { match self {
Self::Kafka(kafka) => kafka.log_event(event).map_or((), |e| { Self::Kafka(kafka) => kafka.log_event(event).unwrap_or_else(|e| {
logger::error!("Failed to log event: {:?}", e); logger::error!("Failed to log event: {:?}", e);
}), }),
Self::Logs(logger) => logger.log_event(event), Self::Logs(logger) => logger.log_event(event),