diff --git a/crates/analytics/src/opensearch.rs b/crates/analytics/src/opensearch.rs index e8f87aaef2..b8c582df1b 100644 --- a/crates/analytics/src/opensearch.rs +++ b/crates/analytics/src/opensearch.rs @@ -20,7 +20,6 @@ use opensearch::{ }; use serde_json::{json, Value}; use storage_impl::errors::ApplicationError; -use strum::IntoEnumIterator; use super::{health_check::HealthCheck, query::QueryResult, types::QueryExecutionError}; use crate::query::QueryBuildingError; @@ -78,6 +77,10 @@ pub enum OpenSearchError { QueryBuildingError, #[error("Opensearch deserialisation error")] DeserialisationError, + #[error("Opensearch index access not present error: {0:?}")] + IndexAccessNotPermittedError(SearchIndex), + #[error("Opensearch unknown error")] + UnknownError, } impl ErrorSwitch for QueryBuildingError { @@ -97,28 +100,39 @@ impl ErrorSwitch for OpenSearchError { )), Self::ResponseNotOK(response) => ApiErrorResponse::InternalServerError(ApiError::new( "IR", - 0, + 1, format!("Something went wrong {}", response), None, )), Self::ResponseError => ApiErrorResponse::InternalServerError(ApiError::new( "IR", - 0, + 2, "Something went wrong", None, )), Self::QueryBuildingError => ApiErrorResponse::InternalServerError(ApiError::new( "IR", - 0, + 3, "Query building error", None, )), Self::DeserialisationError => ApiErrorResponse::InternalServerError(ApiError::new( "IR", - 0, + 4, "Deserialisation error", None, )), + Self::IndexAccessNotPermittedError(index) => { + ApiErrorResponse::ForbiddenCommonResource(ApiError::new( + "IR", + 5, + format!("Index access not permitted: {index:?}"), + None, + )) + } + Self::UnknownError => { + ApiErrorResponse::InternalServerError(ApiError::new("IR", 6, "Unknown error", None)) + } } } } @@ -179,18 +193,16 @@ impl OpenSearchClient { query_builder: OpenSearchQueryBuilder, ) -> CustomResult { match query_builder.query_type { - OpenSearchQuery::Msearch => { - let search_indexes = SearchIndex::iter(); - + OpenSearchQuery::Msearch(ref indexes) => { let payload = query_builder - .construct_payload(search_indexes.clone().collect()) + .construct_payload(indexes) .change_context(OpenSearchError::QueryBuildingError)?; - let payload_with_indexes = payload.into_iter().zip(search_indexes).fold( + let payload_with_indexes = payload.into_iter().zip(indexes).fold( Vec::new(), |mut payload_with_indexes, (index_hit, index)| { payload_with_indexes.push( - json!({"index": self.search_index_to_opensearch_index(index)}).into(), + json!({"index": self.search_index_to_opensearch_index(*index)}).into(), ); payload_with_indexes.push(JsonBody::new(index_hit.clone())); payload_with_indexes @@ -207,7 +219,7 @@ impl OpenSearchClient { OpenSearchQuery::Search(index) => { let payload = query_builder .clone() - .construct_payload(vec![index]) + .construct_payload(&[index]) .change_context(OpenSearchError::QueryBuildingError)?; let final_payload = payload.first().unwrap_or(&Value::Null); @@ -349,7 +361,7 @@ pub struct OpenSearchHealth { #[derive(Debug, Clone)] pub enum OpenSearchQuery { - Msearch, + Msearch(Vec), Search(SearchIndex), } @@ -384,7 +396,7 @@ impl OpenSearchQueryBuilder { Ok(()) } - pub fn construct_payload(&self, indexes: Vec) -> QueryResult> { + pub fn construct_payload(&self, indexes: &[SearchIndex]) -> QueryResult> { let mut query = vec![json!({"multi_match": {"type": "phrase", "query": self.query, "lenient": true}})]; diff --git a/crates/analytics/src/search.rs b/crates/analytics/src/search.rs index 8810dc1e3a..373ca3acbb 100644 --- a/crates/analytics/src/search.rs +++ b/crates/analytics/src/search.rs @@ -1,11 +1,10 @@ use api_models::analytics::search::{ GetGlobalSearchRequest, GetSearchRequestWithIndex, GetSearchResponse, OpenMsearchOutput, - OpensearchOutput, SearchIndex, + OpensearchOutput, SearchIndex, SearchStatus, }; use common_utils::errors::{CustomResult, ReportSwitchExt}; use error_stack::ResultExt; use router_env::tracing; -use strum::IntoEnumIterator; use crate::opensearch::{ OpenSearchClient, OpenSearchError, OpenSearchQuery, OpenSearchQueryBuilder, @@ -15,8 +14,10 @@ pub async fn msearch_results( client: &OpenSearchClient, req: GetGlobalSearchRequest, merchant_id: &String, + indexes: Vec, ) -> CustomResult, OpenSearchError> { - let mut query_builder = OpenSearchQueryBuilder::new(OpenSearchQuery::Msearch, req.query); + let mut query_builder = + OpenSearchQueryBuilder::new(OpenSearchQuery::Msearch(indexes.clone()), req.query); query_builder .add_filter_clause("merchant_id".to_string(), merchant_id.to_string()) @@ -40,29 +41,19 @@ pub async fn msearch_results( Ok(response_body .responses .into_iter() - .zip(SearchIndex::iter()) + .zip(indexes) .map(|(index_hit, index)| match index_hit { - OpensearchOutput::Success(success) => { - if success.status == 200 { - GetSearchResponse { - count: success.hits.total.value, - index, - hits: success - .hits - .hits - .into_iter() - .map(|hit| hit.source) - .collect(), - } - } else { - tracing::error!("Unexpected status code: {}", success.status,); - GetSearchResponse { - count: 0, - index, - hits: Vec::new(), - } - } - } + OpensearchOutput::Success(success) => GetSearchResponse { + count: success.hits.total.value, + index, + hits: success + .hits + .hits + .into_iter() + .map(|hit| hit.source) + .collect(), + status: SearchStatus::Success, + }, OpensearchOutput::Error(error) => { tracing::error!( index = ?index, @@ -73,6 +64,7 @@ pub async fn msearch_results( count: 0, index, hits: Vec::new(), + status: SearchStatus::Failure, } } }) @@ -113,27 +105,17 @@ pub async fn search_results( let response_body: OpensearchOutput = response_text; match response_body { - OpensearchOutput::Success(success) => { - if success.status == 200 { - Ok(GetSearchResponse { - count: success.hits.total.value, - index: req.index, - hits: success - .hits - .hits - .into_iter() - .map(|hit| hit.source) - .collect(), - }) - } else { - tracing::error!("Unexpected status code: {}", success.status); - Ok(GetSearchResponse { - count: 0, - index: req.index, - hits: Vec::new(), - }) - } - } + OpensearchOutput::Success(success) => Ok(GetSearchResponse { + count: success.hits.total.value, + index: req.index, + hits: success + .hits + .hits + .into_iter() + .map(|hit| hit.source) + .collect(), + status: SearchStatus::Success, + }), OpensearchOutput::Error(error) => { tracing::error!( index = ?req.index, @@ -144,6 +126,7 @@ pub async fn search_results( count: 0, index: req.index, hits: Vec::new(), + status: SearchStatus::Failure, }) } } diff --git a/crates/api_models/src/analytics/search.rs b/crates/api_models/src/analytics/search.rs index c29bb0e9f7..a01349d987 100644 --- a/crates/api_models/src/analytics/search.rs +++ b/crates/api_models/src/analytics/search.rs @@ -30,7 +30,9 @@ pub struct GetSearchRequestWithIndex { pub search_req: GetSearchRequest, } -#[derive(Debug, strum::EnumIter, Clone, serde::Deserialize, serde::Serialize, Copy)] +#[derive( + Debug, strum::EnumIter, Clone, serde::Deserialize, serde::Serialize, Copy, Eq, PartialEq, +)] #[serde(rename_all = "snake_case")] pub enum SearchIndex { PaymentAttempts, @@ -39,17 +41,26 @@ pub enum SearchIndex { Disputes, } +#[derive(Debug, strum::EnumIter, Clone, serde::Deserialize, serde::Serialize, Copy)] +pub enum SearchStatus { + Success, + Failure, +} + #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct GetSearchResponse { pub count: u64, pub index: SearchIndex, pub hits: Vec, + pub status: SearchStatus, } #[derive(Debug, serde::Deserialize)] pub struct OpenMsearchOutput { + #[serde(default)] pub responses: Vec, + pub error: Option, } #[derive(Debug, serde::Deserialize)] @@ -74,7 +85,6 @@ pub struct OpensearchErrorDetails { #[derive(Debug, serde::Deserialize)] pub struct OpensearchSuccess { - pub status: u16, pub hits: OpensearchHits, } diff --git a/crates/router/src/analytics.rs b/crates/router/src/analytics.rs index ae3b03f7b8..8d8910f7b2 100644 --- a/crates/router/src/analytics.rs +++ b/crates/router/src/analytics.rs @@ -4,7 +4,7 @@ pub mod routes { use actix_web::{web, Responder, Scope}; use analytics::{ api_event::api_events_core, connector_events::connector_events_core, - errors::AnalyticsError, lambda_utils::invoke_lambda, + errors::AnalyticsError, lambda_utils::invoke_lambda, opensearch::OpenSearchError, outgoing_webhook_event::outgoing_webhook_events_core, sdk_events::sdk_events_core, AnalyticsFlow, }; @@ -20,13 +20,14 @@ pub mod routes { use error_stack::ResultExt; use crate::{ - core::api_locking, + consts::opensearch::OPENSEARCH_INDEX_PERMISSIONS, + core::{api_locking, errors::user::UserErrors}, db::user::UserInterface, routes::AppState, services::{ api, - authentication::{self as auth, AuthenticationData}, - authorization::permissions::Permission, + authentication::{self as auth, AuthenticationData, UserFromToken}, + authorization::{permissions::Permission, roles::RoleInfo}, ApplicationResponse, }, types::domain::UserEmail, @@ -694,11 +695,25 @@ pub mod routes { state.clone(), &req, json_payload.into_inner(), - |state, auth: AuthenticationData, req, _| async move { + |state, auth: UserFromToken, req, _| async move { + let role_id = auth.role_id; + let role_info = + RoleInfo::from_role_id(&state, &role_id, &auth.merchant_id, &auth.org_id) + .await + .change_context(UserErrors::InternalServerError) + .change_context(OpenSearchError::UnknownError)?; + let permissions = role_info.get_permissions_set(); + let accessible_indexes: Vec<_> = OPENSEARCH_INDEX_PERMISSIONS + .iter() + .filter(|(_, perm)| perm.iter().any(|p| permissions.contains(p))) + .map(|(i, _)| *i) + .collect(); + analytics::search::msearch_results( &state.opensearch_client, req, - &auth.merchant_account.merchant_id, + &auth.merchant_id, + accessible_indexes, ) .await .map(ApplicationResponse::Json) @@ -715,24 +730,33 @@ pub mod routes { json_payload: web::Json, index: web::Path, ) -> impl Responder { + let index = index.into_inner(); let flow = AnalyticsFlow::GetSearchResults; let indexed_req = GetSearchRequestWithIndex { search_req: json_payload.into_inner(), - index: index.into_inner(), + index, }; Box::pin(api::server_wrap( flow, state.clone(), &req, indexed_req, - |state, auth: AuthenticationData, req, _| async move { - analytics::search::search_results( - &state.opensearch_client, - req, - &auth.merchant_account.merchant_id, - ) - .await - .map(ApplicationResponse::Json) + |state, auth: UserFromToken, req, _| async move { + let role_id = auth.role_id; + let role_info = + RoleInfo::from_role_id(&state, &role_id, &auth.merchant_id, &auth.org_id) + .await + .change_context(UserErrors::InternalServerError) + .change_context(OpenSearchError::UnknownError)?; + let permissions = role_info.get_permissions_set(); + let _ = OPENSEARCH_INDEX_PERMISSIONS + .iter() + .filter(|(ind, _)| *ind == index) + .find(|i| i.1.iter().any(|p| permissions.contains(p))) + .ok_or(OpenSearchError::IndexAccessNotPermittedError(index))?; + analytics::search::search_results(&state.opensearch_client, req, &auth.merchant_id) + .await + .map(ApplicationResponse::Json) }, &auth::JWTAuth(Permission::Analytics), api_locking::LockAction::NotApplicable, diff --git a/crates/router/src/consts.rs b/crates/router/src/consts.rs index 73ff7c428e..bffc524133 100644 --- a/crates/router/src/consts.rs +++ b/crates/router/src/consts.rs @@ -1,3 +1,4 @@ +pub mod opensearch; #[cfg(feature = "olap")] pub mod user; pub mod user_role; diff --git a/crates/router/src/consts/opensearch.rs b/crates/router/src/consts/opensearch.rs new file mode 100644 index 0000000000..fc1c071439 --- /dev/null +++ b/crates/router/src/consts/opensearch.rs @@ -0,0 +1,22 @@ +use api_models::analytics::search::SearchIndex; + +use crate::services::authorization::permissions::Permission; + +pub const OPENSEARCH_INDEX_PERMISSIONS: &[(SearchIndex, &[Permission])] = &[ + ( + SearchIndex::PaymentAttempts, + &[Permission::PaymentRead, Permission::PaymentWrite], + ), + ( + SearchIndex::PaymentIntents, + &[Permission::PaymentRead, Permission::PaymentWrite], + ), + ( + SearchIndex::Refunds, + &[Permission::RefundRead, Permission::RefundWrite], + ), + ( + SearchIndex::Disputes, + &[Permission::DisputeRead, Permission::DisputeWrite], + ), +]; diff --git a/crates/router/src/events.rs b/crates/router/src/events.rs index 632989c480..1bdd9016c4 100644 --- a/crates/router/src/events.rs +++ b/crates/router/src/events.rs @@ -83,7 +83,7 @@ impl EventsConfig { impl EventsHandler { pub fn log_event(&self, event: &T) { match self { - Self::Kafka(kafka) => kafka.log_event(event).map_or((), |e| { + Self::Kafka(kafka) => kafka.log_event(event).unwrap_or_else(|e| { logger::error!("Failed to log event: {:?}", e); }), Self::Logs(logger) => logger.log_event(event),