mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-11-03 13:30:39 +08:00
refactor(middleware): add middleware to record metrics for request count and duration (#7803)
This commit is contained in:
@ -1,5 +1,3 @@
|
|||||||
use std::time;
|
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub async fn record_operation_time<F, R, T>(
|
pub async fn record_operation_time<F, R, T>(
|
||||||
future: F,
|
future: F,
|
||||||
@ -11,7 +9,7 @@ where
|
|||||||
F: futures::Future<Output = R>,
|
F: futures::Future<Output = R>,
|
||||||
T: ToString,
|
T: ToString,
|
||||||
{
|
{
|
||||||
let (result, time) = time_future(future).await;
|
let (result, time) = common_utils::metrics::utils::time_future(future).await;
|
||||||
let attributes = router_env::metric_attributes!(
|
let attributes = router_env::metric_attributes!(
|
||||||
("metric_name", metric_name.to_string()),
|
("metric_name", metric_name.to_string()),
|
||||||
("source", source.to_string()),
|
("source", source.to_string()),
|
||||||
@ -22,14 +20,3 @@ where
|
|||||||
router_env::logger::debug!("Attributes: {:?}, Time: {}", attributes, value);
|
router_env::logger::debug!("Attributes: {:?}, Time: {}", attributes, value);
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
pub async fn time_future<F, R>(future: F) -> (R, time::Duration)
|
|
||||||
where
|
|
||||||
F: futures::Future<Output = R>,
|
|
||||||
{
|
|
||||||
let start = time::Instant::now();
|
|
||||||
let result = future.await;
|
|
||||||
let time_spent = start.elapsed();
|
|
||||||
(result, time_spent)
|
|
||||||
}
|
|
||||||
|
|||||||
@ -10,7 +10,7 @@ use crate::{
|
|||||||
events::api_logs::ApiEventMetric,
|
events::api_logs::ApiEventMetric,
|
||||||
routes::{
|
routes::{
|
||||||
app::{AppStateInfo, ReqState},
|
app::{AppStateInfo, ReqState},
|
||||||
metrics, AppState, SessionState,
|
AppState, SessionState,
|
||||||
},
|
},
|
||||||
services::{self, api, authentication as auth, logger},
|
services::{self, api, authentication as auth, logger},
|
||||||
};
|
};
|
||||||
@ -44,8 +44,7 @@ where
|
|||||||
let start_instant = Instant::now();
|
let start_instant = Instant::now();
|
||||||
logger::info!(tag = ?Tag::BeginRequest, payload = ?payload);
|
logger::info!(tag = ?Tag::BeginRequest, payload = ?payload);
|
||||||
|
|
||||||
let server_wrap_util_res = metrics::request::record_request_time_metric(
|
let server_wrap_util_res = api::server_wrap_util(
|
||||||
api::server_wrap_util(
|
|
||||||
&flow,
|
&flow,
|
||||||
state.clone().into(),
|
state.clone().into(),
|
||||||
request.headers(),
|
request.headers(),
|
||||||
@ -54,8 +53,6 @@ where
|
|||||||
func,
|
func,
|
||||||
api_authentication,
|
api_authentication,
|
||||||
lock_action,
|
lock_action,
|
||||||
),
|
|
||||||
&flow,
|
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map(|response| {
|
.map(|response| {
|
||||||
|
|||||||
@ -379,6 +379,7 @@ pub fn get_application_builder(
|
|||||||
// this middleware works only for Http1.1 requests
|
// this middleware works only for Http1.1 requests
|
||||||
.wrap(middleware::Http400RequestDetailsLogger)
|
.wrap(middleware::Http400RequestDetailsLogger)
|
||||||
.wrap(middleware::AddAcceptLanguageHeader)
|
.wrap(middleware::AddAcceptLanguageHeader)
|
||||||
|
.wrap(middleware::RequestResponseMetrics)
|
||||||
.wrap(middleware::LogSpanInitializer)
|
.wrap(middleware::LogSpanInitializer)
|
||||||
.wrap(router_env::tracing_actix_web::TracingLogger::default())
|
.wrap(router_env::tracing_actix_web::TracingLogger::default())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -5,7 +5,8 @@ use router_env::{
|
|||||||
tracing::{field::Empty, Instrument},
|
tracing::{field::Empty, Instrument},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::headers;
|
use crate::{headers, routes::metrics};
|
||||||
|
|
||||||
/// Middleware to include request ID in response header.
|
/// Middleware to include request ID in response header.
|
||||||
pub struct RequestId;
|
pub struct RequestId;
|
||||||
|
|
||||||
@ -363,6 +364,7 @@ where
|
|||||||
type Future = futures::future::LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
|
type Future = futures::future::LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
actix_web::dev::forward_ready!(service);
|
actix_web::dev::forward_ready!(service);
|
||||||
|
|
||||||
fn call(&self, mut req: actix_web::dev::ServiceRequest) -> Self::Future {
|
fn call(&self, mut req: actix_web::dev::ServiceRequest) -> Self::Future {
|
||||||
let svc = self.service.clone();
|
let svc = self.service.clone();
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
@ -396,3 +398,87 @@ where
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Middleware for recording request-response metrics
|
||||||
|
pub struct RequestResponseMetrics;
|
||||||
|
|
||||||
|
impl<S: 'static, B> actix_web::dev::Transform<S, actix_web::dev::ServiceRequest>
|
||||||
|
for RequestResponseMetrics
|
||||||
|
where
|
||||||
|
S: actix_web::dev::Service<
|
||||||
|
actix_web::dev::ServiceRequest,
|
||||||
|
Response = actix_web::dev::ServiceResponse<B>,
|
||||||
|
Error = actix_web::Error,
|
||||||
|
>,
|
||||||
|
S::Future: 'static,
|
||||||
|
B: 'static,
|
||||||
|
{
|
||||||
|
type Response = actix_web::dev::ServiceResponse<B>;
|
||||||
|
type Error = actix_web::Error;
|
||||||
|
type Transform = RequestResponseMetricsMiddleware<S>;
|
||||||
|
type InitError = ();
|
||||||
|
type Future = std::future::Ready<Result<Self::Transform, Self::InitError>>;
|
||||||
|
|
||||||
|
fn new_transform(&self, service: S) -> Self::Future {
|
||||||
|
std::future::ready(Ok(RequestResponseMetricsMiddleware {
|
||||||
|
service: std::rc::Rc::new(service),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct RequestResponseMetricsMiddleware<S> {
|
||||||
|
service: std::rc::Rc<S>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, B> actix_web::dev::Service<actix_web::dev::ServiceRequest>
|
||||||
|
for RequestResponseMetricsMiddleware<S>
|
||||||
|
where
|
||||||
|
S: actix_web::dev::Service<
|
||||||
|
actix_web::dev::ServiceRequest,
|
||||||
|
Response = actix_web::dev::ServiceResponse<B>,
|
||||||
|
Error = actix_web::Error,
|
||||||
|
> + 'static,
|
||||||
|
S::Future: 'static,
|
||||||
|
B: 'static,
|
||||||
|
{
|
||||||
|
type Response = actix_web::dev::ServiceResponse<B>;
|
||||||
|
type Error = actix_web::Error;
|
||||||
|
type Future = futures::future::LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
|
actix_web::dev::forward_ready!(service);
|
||||||
|
|
||||||
|
fn call(&self, req: actix_web::dev::ServiceRequest) -> Self::Future {
|
||||||
|
use std::borrow::Cow;
|
||||||
|
|
||||||
|
let svc = self.service.clone();
|
||||||
|
|
||||||
|
let request_path = req
|
||||||
|
.match_pattern()
|
||||||
|
.map(Cow::<'static, str>::from)
|
||||||
|
.unwrap_or_else(|| "UNKNOWN".into());
|
||||||
|
let request_method = Cow::<'static, str>::from(req.method().as_str().to_owned());
|
||||||
|
|
||||||
|
Box::pin(async move {
|
||||||
|
let mut attributes =
|
||||||
|
router_env::metric_attributes!(("path", request_path), ("method", request_method))
|
||||||
|
.to_vec();
|
||||||
|
|
||||||
|
let response_fut = svc.call(req);
|
||||||
|
|
||||||
|
metrics::REQUESTS_RECEIVED.add(1, &attributes);
|
||||||
|
|
||||||
|
let (response_result, request_duration) =
|
||||||
|
common_utils::metrics::utils::time_future(response_fut).await;
|
||||||
|
let response = response_result?;
|
||||||
|
|
||||||
|
attributes.extend_from_slice(router_env::metric_attributes!((
|
||||||
|
"status_code",
|
||||||
|
i64::from(response.status().as_u16())
|
||||||
|
)));
|
||||||
|
|
||||||
|
metrics::REQUEST_TIME.record(request_duration.as_secs_f64(), &attributes);
|
||||||
|
|
||||||
|
Ok(response)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -1,6 +1,5 @@
|
|||||||
pub mod bg_metrics_collector;
|
pub mod bg_metrics_collector;
|
||||||
pub mod request;
|
pub mod request;
|
||||||
pub mod utils;
|
|
||||||
|
|
||||||
use router_env::{counter_metric, global_meter, histogram_metric_f64};
|
use router_env::{counter_metric, global_meter, histogram_metric_f64};
|
||||||
|
|
||||||
@ -11,7 +10,6 @@ counter_metric!(KV_MISS, GLOBAL_METER); // No. of KV misses
|
|||||||
|
|
||||||
// API Level Metrics
|
// API Level Metrics
|
||||||
counter_metric!(REQUESTS_RECEIVED, GLOBAL_METER);
|
counter_metric!(REQUESTS_RECEIVED, GLOBAL_METER);
|
||||||
counter_metric!(REQUEST_STATUS, GLOBAL_METER);
|
|
||||||
histogram_metric_f64!(REQUEST_TIME, GLOBAL_METER);
|
histogram_metric_f64!(REQUEST_TIME, GLOBAL_METER);
|
||||||
histogram_metric_f64!(EXTERNAL_REQUEST_TIME, GLOBAL_METER);
|
histogram_metric_f64!(EXTERNAL_REQUEST_TIME, GLOBAL_METER);
|
||||||
|
|
||||||
|
|||||||
@ -1,38 +1,5 @@
|
|||||||
use super::utils as metric_utils;
|
|
||||||
use crate::services::ApplicationResponse;
|
use crate::services::ApplicationResponse;
|
||||||
|
|
||||||
pub async fn record_request_time_metric<F, R>(
|
|
||||||
future: F,
|
|
||||||
flow: &impl router_env::types::FlowMetric,
|
|
||||||
) -> R
|
|
||||||
where
|
|
||||||
F: futures::Future<Output = R>,
|
|
||||||
{
|
|
||||||
let key = "request_type";
|
|
||||||
super::REQUESTS_RECEIVED.add(1, router_env::metric_attributes!((key, flow.to_string())));
|
|
||||||
let (result, time) = metric_utils::time_future(future).await;
|
|
||||||
super::REQUEST_TIME.record(
|
|
||||||
time.as_secs_f64(),
|
|
||||||
router_env::metric_attributes!((key, flow.to_string())),
|
|
||||||
);
|
|
||||||
result
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn status_code_metrics(
|
|
||||||
status_code: String,
|
|
||||||
flow: String,
|
|
||||||
merchant_id: common_utils::id_type::MerchantId,
|
|
||||||
) {
|
|
||||||
super::REQUEST_STATUS.add(
|
|
||||||
1,
|
|
||||||
router_env::metric_attributes!(
|
|
||||||
("status_code", status_code),
|
|
||||||
("flow", flow),
|
|
||||||
("merchant_id", merchant_id.clone()),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn track_response_status_code<Q>(response: &ApplicationResponse<Q>) -> i64 {
|
pub fn track_response_status_code<Q>(response: &ApplicationResponse<Q>) -> i64 {
|
||||||
match response {
|
match response {
|
||||||
ApplicationResponse::Json(_)
|
ApplicationResponse::Json(_)
|
||||||
|
|||||||
@ -1,12 +0,0 @@
|
|||||||
use std::time;
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
pub async fn time_future<F, R>(future: F) -> (R, time::Duration)
|
|
||||||
where
|
|
||||||
F: futures::Future<Output = R>,
|
|
||||||
{
|
|
||||||
let start = time::Instant::now();
|
|
||||||
let result = future.await;
|
|
||||||
let time_spent = start.elapsed();
|
|
||||||
(result, time_spent)
|
|
||||||
}
|
|
||||||
@ -873,12 +873,6 @@ where
|
|||||||
);
|
);
|
||||||
state.event_handler().log_event(&api_event);
|
state.event_handler().log_event(&api_event);
|
||||||
|
|
||||||
metrics::request::status_code_metrics(
|
|
||||||
status_code.to_string(),
|
|
||||||
flow.to_string(),
|
|
||||||
merchant_id.to_owned(),
|
|
||||||
);
|
|
||||||
|
|
||||||
output
|
output
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -932,8 +926,7 @@ where
|
|||||||
tag = ?Tag::BeginRequest, payload = ?payload,
|
tag = ?Tag::BeginRequest, payload = ?payload,
|
||||||
headers = ?incoming_header_to_log);
|
headers = ?incoming_header_to_log);
|
||||||
|
|
||||||
let server_wrap_util_res = metrics::request::record_request_time_metric(
|
let server_wrap_util_res = server_wrap_util(
|
||||||
server_wrap_util(
|
|
||||||
&flow,
|
&flow,
|
||||||
state.clone(),
|
state.clone(),
|
||||||
incoming_request_header,
|
incoming_request_header,
|
||||||
@ -942,8 +935,6 @@ where
|
|||||||
func,
|
func,
|
||||||
api_auth,
|
api_auth,
|
||||||
lock_action,
|
lock_action,
|
||||||
),
|
|
||||||
&flow,
|
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map(|response| {
|
.map(|response| {
|
||||||
|
|||||||
Reference in New Issue
Block a user