mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-11-01 11:06:50 +08:00
feat(events): add events for incoming API requests (#2621)
Co-authored-by: Nishant Joshi <nishant.joshi@juspay.in>
This commit is contained in:
@ -1,9 +1,10 @@
|
||||
use serde::Serialize;
|
||||
|
||||
pub mod api_logs;
|
||||
pub mod event_logger;
|
||||
|
||||
pub trait EventHandler: Sync + Send + dyn_clone::DynClone {
|
||||
fn log_event<T: Event>(&self, event: T, previous: Option<T>);
|
||||
fn log_event<T: Event>(&self, event: T);
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
|
||||
41
crates/router/src/events/api_logs.rs
Normal file
41
crates/router/src/events/api_logs.rs
Normal file
@ -0,0 +1,41 @@
|
||||
use router_env::{tracing_actix_web::RequestId, types::FlowMetric};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use super::Event;
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
|
||||
pub struct ApiEvent {
|
||||
api_flow: String,
|
||||
created_at_timestamp: i128,
|
||||
request_id: String,
|
||||
latency: u128,
|
||||
status_code: i64,
|
||||
}
|
||||
|
||||
impl ApiEvent {
|
||||
pub fn new(
|
||||
api_flow: &impl FlowMetric,
|
||||
request_id: &RequestId,
|
||||
latency: u128,
|
||||
status_code: i64,
|
||||
) -> Self {
|
||||
Self {
|
||||
api_flow: api_flow.to_string(),
|
||||
created_at_timestamp: OffsetDateTime::now_utc().unix_timestamp_nanos(),
|
||||
request_id: request_id.as_hyphenated().to_string(),
|
||||
latency,
|
||||
status_code,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Event for ApiEvent {
|
||||
fn event_type() -> super::EventType {
|
||||
super::EventType::ApiLogs
|
||||
}
|
||||
|
||||
fn key(&self) -> String {
|
||||
self.request_id.to_string()
|
||||
}
|
||||
}
|
||||
@ -5,11 +5,7 @@ use crate::services::logger;
|
||||
pub struct EventLogger {}
|
||||
|
||||
impl EventHandler for EventLogger {
|
||||
fn log_event<T: Event>(&self, event: T, previous: Option<T>) {
|
||||
if let Some(prev) = previous {
|
||||
logger::info!(previous = ?serde_json::to_string(&prev).unwrap_or(r#"{ "error": "Serialization failed" }"#.to_string()), current = ?serde_json::to_string(&event).unwrap_or(r#"{ "error": "Serialization failed" }"#.to_string()), event_type =? T::event_type(), event_id =? event.key(), log_type = "event");
|
||||
} else {
|
||||
logger::info!(current = ?serde_json::to_string(&event).unwrap_or(r#"{ "error": "Serialization failed" }"#.to_string()), event_type =? T::event_type(), event_id =? event.key(), log_type = "event");
|
||||
}
|
||||
fn log_event<T: Event>(&self, event: T) {
|
||||
logger::info!(current = ?serde_json::to_string(&event).unwrap_or(r#"{ "error": "Serialization failed" }"#.to_string()), event_type =? T::event_type(), event_id =? event.key(), log_type = "event");
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,6 +5,7 @@ use actix_web::{web, Scope};
|
||||
use external_services::email::{AwsSes, EmailClient};
|
||||
#[cfg(feature = "kms")]
|
||||
use external_services::kms::{self, decrypt::KmsDecrypt};
|
||||
use router_env::tracing_actix_web::RequestId;
|
||||
use scheduler::SchedulerInterface;
|
||||
use storage_impl::MockDb;
|
||||
use tokio::sync::oneshot;
|
||||
@ -58,7 +59,7 @@ pub trait AppStateInfo {
|
||||
fn event_handler(&self) -> &Self::Event;
|
||||
#[cfg(feature = "email")]
|
||||
fn email_client(&self) -> Arc<dyn EmailClient>;
|
||||
fn add_request_id(&mut self, request_id: Option<String>);
|
||||
fn add_request_id(&mut self, request_id: RequestId);
|
||||
fn add_merchant_id(&mut self, merchant_id: Option<String>);
|
||||
fn add_flow_name(&mut self, flow_name: String);
|
||||
fn get_request_id(&self) -> Option<String>;
|
||||
@ -79,7 +80,7 @@ impl AppStateInfo for AppState {
|
||||
fn event_handler(&self) -> &Self::Event {
|
||||
&self.event_handler
|
||||
}
|
||||
fn add_request_id(&mut self, request_id: Option<String>) {
|
||||
fn add_request_id(&mut self, request_id: RequestId) {
|
||||
self.api_client.add_request_id(request_id);
|
||||
}
|
||||
fn add_merchant_id(&mut self, merchant_id: Option<String>) {
|
||||
|
||||
@ -13,7 +13,10 @@ use actix_web::{body, web, FromRequest, HttpRequest, HttpResponse, Responder, Re
|
||||
use api_models::enums::CaptureMethod;
|
||||
pub use client::{proxy_bypass_urls, ApiClient, MockApiClient, ProxyClient};
|
||||
pub use common_utils::request::{ContentType, Method, Request, RequestBuilder};
|
||||
use common_utils::{consts::X_HS_LATENCY, errors::ReportSwitchExt};
|
||||
use common_utils::{
|
||||
consts::X_HS_LATENCY,
|
||||
errors::{ErrorSwitch, ReportSwitchExt},
|
||||
};
|
||||
use error_stack::{report, IntoReport, Report, ResultExt};
|
||||
use masking::{ExposeOptionInterface, PeekInterface};
|
||||
use router_env::{instrument, tracing, tracing_actix_web::RequestId, Tag};
|
||||
@ -30,6 +33,7 @@ use crate::{
|
||||
errors::{self, CustomResult},
|
||||
payments,
|
||||
},
|
||||
events::{api_logs::ApiEvent, EventHandler},
|
||||
logger,
|
||||
routes::{
|
||||
app::AppStateInfo,
|
||||
@ -750,19 +754,20 @@ where
|
||||
T: Debug,
|
||||
A: AppStateInfo + Clone,
|
||||
U: auth::AuthInfo,
|
||||
CustomResult<ApplicationResponse<Q>, E>: ReportSwitchExt<ApplicationResponse<Q>, OErr>,
|
||||
CustomResult<U, errors::ApiErrorResponse>: ReportSwitchExt<U, OErr>,
|
||||
CustomResult<(), errors::ApiErrorResponse>: ReportSwitchExt<(), OErr>,
|
||||
OErr: ResponseError + Sync + Send + 'static,
|
||||
E: ErrorSwitch<OErr> + error_stack::Context,
|
||||
OErr: ResponseError + error_stack::Context,
|
||||
errors::ApiErrorResponse: ErrorSwitch<OErr>,
|
||||
{
|
||||
let request_id = RequestId::extract(request)
|
||||
.await
|
||||
.ok()
|
||||
.map(|id| id.as_hyphenated().to_string());
|
||||
.into_report()
|
||||
.attach_printable("Unable to extract request id from request")
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError.switch())?;
|
||||
|
||||
let mut request_state = state.get_ref().clone();
|
||||
|
||||
request_state.add_request_id(request_id);
|
||||
let start_instant = Instant::now();
|
||||
|
||||
let auth_out = api_auth
|
||||
.authenticate_and_fetch(request.headers(), &request_state)
|
||||
@ -795,11 +800,20 @@ where
|
||||
.switch()?;
|
||||
res
|
||||
};
|
||||
let request_duration = Instant::now()
|
||||
.saturating_duration_since(start_instant)
|
||||
.as_millis();
|
||||
|
||||
let status_code = match output.as_ref() {
|
||||
Ok(res) => metrics::request::track_response_status_code(res),
|
||||
Err(err) => err.current_context().status_code().as_u16().into(),
|
||||
};
|
||||
state.event_handler().log_event(ApiEvent::new(
|
||||
flow,
|
||||
&request_id,
|
||||
request_duration,
|
||||
status_code,
|
||||
));
|
||||
|
||||
metrics::request::status_code_metrics(status_code, flow.to_string(), merchant_id.to_string());
|
||||
|
||||
@ -827,8 +841,7 @@ where
|
||||
U: auth::AuthInfo,
|
||||
A: AppStateInfo + Clone,
|
||||
ApplicationResponse<Q>: Debug,
|
||||
CustomResult<ApplicationResponse<Q>, E>:
|
||||
ReportSwitchExt<ApplicationResponse<Q>, api_models::errors::types::ApiErrorResponse>,
|
||||
E: ErrorSwitch<api_models::errors::types::ApiErrorResponse> + error_stack::Context,
|
||||
{
|
||||
let request_method = request.method().as_str();
|
||||
let url_path = request.path();
|
||||
|
||||
@ -5,6 +5,7 @@ use http::{HeaderValue, Method};
|
||||
use masking::PeekInterface;
|
||||
use once_cell::sync::OnceCell;
|
||||
use reqwest::multipart::Form;
|
||||
use router_env::tracing_actix_web::RequestId;
|
||||
|
||||
use super::{request::Maskable, Request};
|
||||
use crate::{
|
||||
@ -167,10 +168,10 @@ where
|
||||
forward_to_kafka: bool,
|
||||
) -> CustomResult<reqwest::Response, ApiClientError>;
|
||||
|
||||
fn add_request_id(&mut self, _request_id: Option<String>);
|
||||
fn add_request_id(&mut self, request_id: RequestId);
|
||||
fn get_request_id(&self) -> Option<String>;
|
||||
fn add_merchant_id(&mut self, _merchant_id: Option<String>);
|
||||
fn add_flow_name(&mut self, _flow_name: String);
|
||||
fn add_flow_name(&mut self, flow_name: String);
|
||||
}
|
||||
|
||||
dyn_clone::clone_trait_object!(ApiClient);
|
||||
@ -350,8 +351,9 @@ impl ApiClient for ProxyClient {
|
||||
crate::services::send_request(state, request, option_timeout_secs).await
|
||||
}
|
||||
|
||||
fn add_request_id(&mut self, _request_id: Option<String>) {
|
||||
self.request_id = _request_id
|
||||
fn add_request_id(&mut self, request_id: RequestId) {
|
||||
self.request_id
|
||||
.replace(request_id.as_hyphenated().to_string());
|
||||
}
|
||||
|
||||
fn get_request_id(&self) -> Option<String> {
|
||||
@ -402,7 +404,7 @@ impl ApiClient for MockApiClient {
|
||||
Err(ApiClientError::UnexpectedState.into())
|
||||
}
|
||||
|
||||
fn add_request_id(&mut self, _request_id: Option<String>) {
|
||||
fn add_request_id(&mut self, _request_id: RequestId) {
|
||||
// [#2066]: Add Mock implementation for ApiClient
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user