mirror of
				https://github.com/juspay/hyperswitch.git
				synced 2025-10-31 10:06:32 +08:00 
			
		
		
		
	feat(generics): add metrics for database calls (#1901)
This commit is contained in:
		| @ -98,3 +98,13 @@ pub(crate) mod diesel_impl { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | pub(crate) mod metrics { | ||||||
|  |     use router_env::{counter_metric, global_meter, histogram_metric, metrics_context, once_cell}; | ||||||
|  |  | ||||||
|  |     metrics_context!(CONTEXT); | ||||||
|  |     global_meter!(GLOBAL_METER, "ROUTER_API"); | ||||||
|  |  | ||||||
|  |     counter_metric!(DATABASE_CALLS_COUNT, GLOBAL_METER); | ||||||
|  |     histogram_metric!(DATABASE_CALL_TIME, GLOBAL_METER); | ||||||
|  | } | ||||||
|  | |||||||
| @ -22,14 +22,61 @@ use diesel::{ | |||||||
| use error_stack::{report, IntoReport, ResultExt}; | use error_stack::{report, IntoReport, ResultExt}; | ||||||
| use router_env::{instrument, logger, tracing}; | use router_env::{instrument, logger, tracing}; | ||||||
|  |  | ||||||
| use crate::{errors, PgPooledConn, StorageResult}; | use crate::{ | ||||||
|  |     errors::{self}, | ||||||
|  |     PgPooledConn, StorageResult, | ||||||
|  | }; | ||||||
|  |  | ||||||
|  | pub mod db_metrics { | ||||||
|  |     use router_env::opentelemetry::KeyValue; | ||||||
|  |  | ||||||
|  |     #[derive(Debug)] | ||||||
|  |     pub enum DatabaseOperation { | ||||||
|  |         FindOne, | ||||||
|  |         Filter, | ||||||
|  |         Update, | ||||||
|  |         Insert, | ||||||
|  |         Delete, | ||||||
|  |         DeleteWithResult, | ||||||
|  |         UpdateWithResults, | ||||||
|  |         UpdateOne, | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     #[inline] | ||||||
|  |     pub async fn track_database_call<T, Fut, U>(future: Fut, operation: DatabaseOperation) -> U | ||||||
|  |     where | ||||||
|  |         Fut: std::future::Future<Output = U>, | ||||||
|  |     { | ||||||
|  |         let start = std::time::Instant::now(); | ||||||
|  |         let output = future.await; | ||||||
|  |         let time_elapsed = start.elapsed(); | ||||||
|  |  | ||||||
|  |         let table_name = std::any::type_name::<T>().rsplit("::").nth(1); | ||||||
|  |  | ||||||
|  |         let attributes = [ | ||||||
|  |             KeyValue::new("table", table_name.unwrap_or("undefined")), | ||||||
|  |             KeyValue::new("operation", format!("{:?}", operation)), | ||||||
|  |         ]; | ||||||
|  |  | ||||||
|  |         crate::metrics::DATABASE_CALLS_COUNT.add(&crate::metrics::CONTEXT, 1, &attributes); | ||||||
|  |         crate::metrics::DATABASE_CALL_TIME.record( | ||||||
|  |             &crate::metrics::CONTEXT, | ||||||
|  |             time_elapsed.as_secs_f64(), | ||||||
|  |             &attributes, | ||||||
|  |         ); | ||||||
|  |  | ||||||
|  |         output | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | use db_metrics::*; | ||||||
|  |  | ||||||
| #[instrument(level = "DEBUG", skip_all)] | #[instrument(level = "DEBUG", skip_all)] | ||||||
| pub async fn generic_insert<T, V, R>(conn: &PgPooledConn, values: V) -> StorageResult<R> | pub async fn generic_insert<T, V, R>(conn: &PgPooledConn, values: V) -> StorageResult<R> | ||||||
| where | where | ||||||
|     T: HasTable<Table = T> + Table + 'static, |     T: HasTable<Table = T> + Table + 'static + Debug, | ||||||
|     V: Debug + Insertable<T>, |     V: Debug + Insertable<T>, | ||||||
|     <T as QuerySource>::FromClause: QueryFragment<Pg>, |     <T as QuerySource>::FromClause: QueryFragment<Pg> + Debug, | ||||||
|     <V as Insertable<T>>::Values: CanInsertInSingleQuery<Pg> + QueryFragment<Pg> + 'static, |     <V as Insertable<T>>::Values: CanInsertInSingleQuery<Pg> + QueryFragment<Pg> + 'static, | ||||||
|     InsertStatement<T, <V as Insertable<T>>::Values>: |     InsertStatement<T, <V as Insertable<T>>::Values>: | ||||||
|         AsQuery + LoadQuery<'static, PgConnection, R> + Send, |         AsQuery + LoadQuery<'static, PgConnection, R> + Send, | ||||||
| @ -40,7 +87,10 @@ where | |||||||
|     let query = diesel::insert_into(<T as HasTable>::table()).values(values); |     let query = diesel::insert_into(<T as HasTable>::table()).values(values); | ||||||
|     logger::debug!(query = %debug_query::<Pg, _>(&query).to_string()); |     logger::debug!(query = %debug_query::<Pg, _>(&query).to_string()); | ||||||
|  |  | ||||||
|     match query.get_result_async(conn).await.into_report() { |     match track_database_call::<T, _, _>(query.get_result_async(conn), DatabaseOperation::Insert) | ||||||
|  |         .await | ||||||
|  |         .into_report() | ||||||
|  |     { | ||||||
|         Ok(value) => Ok(value), |         Ok(value) => Ok(value), | ||||||
|         Err(err) => match err.current_context() { |         Err(err) => match err.current_context() { | ||||||
|             ConnectionError::Query(DieselError::DatabaseError( |             ConnectionError::Query(DieselError::DatabaseError( | ||||||
| @ -74,8 +124,7 @@ where | |||||||
|     let query = diesel::update(<T as HasTable>::table().filter(predicate)).set(values); |     let query = diesel::update(<T as HasTable>::table().filter(predicate)).set(values); | ||||||
|     logger::debug!(query = %debug_query::<Pg, _>(&query).to_string()); |     logger::debug!(query = %debug_query::<Pg, _>(&query).to_string()); | ||||||
|  |  | ||||||
|     query |     track_database_call::<T, _, _>(query.execute_async(conn), DatabaseOperation::Update) | ||||||
|         .execute_async(conn) |  | ||||||
|         .await |         .await | ||||||
|         .into_report() |         .into_report() | ||||||
|         .change_context(errors::DatabaseError::Others) |         .change_context(errors::DatabaseError::Others) | ||||||
| @ -109,7 +158,12 @@ where | |||||||
|  |  | ||||||
|     let query = diesel::update(<T as HasTable>::table().filter(predicate)).set(values); |     let query = diesel::update(<T as HasTable>::table().filter(predicate)).set(values); | ||||||
|  |  | ||||||
|     match query.to_owned().get_results_async(conn).await { |     match track_database_call::<T, _, _>( | ||||||
|  |         query.to_owned().get_results_async(conn), | ||||||
|  |         DatabaseOperation::UpdateWithResults, | ||||||
|  |     ) | ||||||
|  |     .await | ||||||
|  |     { | ||||||
|         Ok(result) => { |         Ok(result) => { | ||||||
|             logger::debug!(query = %debug_query::<Pg, _>(&query).to_string()); |             logger::debug!(query = %debug_query::<Pg, _>(&query).to_string()); | ||||||
|             Ok(result) |             Ok(result) | ||||||
| @ -195,7 +249,12 @@ where | |||||||
|  |  | ||||||
|     let query = diesel::update(<T as HasTable>::table().find(id.to_owned())).set(values); |     let query = diesel::update(<T as HasTable>::table().find(id.to_owned())).set(values); | ||||||
|  |  | ||||||
|     match query.to_owned().get_result_async(conn).await { |     match track_database_call::<T, _, _>( | ||||||
|  |         query.to_owned().get_result_async(conn), | ||||||
|  |         DatabaseOperation::UpdateOne, | ||||||
|  |     ) | ||||||
|  |     .await | ||||||
|  |     { | ||||||
|         Ok(result) => { |         Ok(result) => { | ||||||
|             logger::debug!(query = %debug_query::<Pg, _>(&query).to_string()); |             logger::debug!(query = %debug_query::<Pg, _>(&query).to_string()); | ||||||
|             Ok(result) |             Ok(result) | ||||||
| @ -226,8 +285,7 @@ where | |||||||
|     let query = diesel::delete(<T as HasTable>::table().filter(predicate)); |     let query = diesel::delete(<T as HasTable>::table().filter(predicate)); | ||||||
|     logger::debug!(query = %debug_query::<Pg, _>(&query).to_string()); |     logger::debug!(query = %debug_query::<Pg, _>(&query).to_string()); | ||||||
|  |  | ||||||
|     query |     track_database_call::<T, _, _>(query.execute_async(conn), DatabaseOperation::Delete) | ||||||
|         .execute_async(conn) |  | ||||||
|         .await |         .await | ||||||
|         .into_report() |         .into_report() | ||||||
|         .change_context(errors::DatabaseError::Others) |         .change_context(errors::DatabaseError::Others) | ||||||
| @ -261,8 +319,10 @@ where | |||||||
|     let query = diesel::delete(<T as HasTable>::table().filter(predicate)); |     let query = diesel::delete(<T as HasTable>::table().filter(predicate)); | ||||||
|     logger::debug!(query = %debug_query::<Pg, _>(&query).to_string()); |     logger::debug!(query = %debug_query::<Pg, _>(&query).to_string()); | ||||||
|  |  | ||||||
|     query |     track_database_call::<T, _, _>( | ||||||
|         .get_results_async(conn) |         query.get_results_async(conn), | ||||||
|  |         DatabaseOperation::DeleteWithResult, | ||||||
|  |     ) | ||||||
|     .await |     .await | ||||||
|     .into_report() |     .into_report() | ||||||
|     .change_context(errors::DatabaseError::Others) |     .change_context(errors::DatabaseError::Others) | ||||||
| @ -287,7 +347,10 @@ where | |||||||
|     let query = <T as HasTable>::table().find(id.to_owned()); |     let query = <T as HasTable>::table().find(id.to_owned()); | ||||||
|     logger::debug!(query = %debug_query::<Pg, _>(&query).to_string()); |     logger::debug!(query = %debug_query::<Pg, _>(&query).to_string()); | ||||||
|  |  | ||||||
|     match query.first_async(conn).await.into_report() { |     match track_database_call::<T, _, _>(query.first_async(conn), DatabaseOperation::FindOne) | ||||||
|  |         .await | ||||||
|  |         .into_report() | ||||||
|  |     { | ||||||
|         Ok(value) => Ok(value), |         Ok(value) => Ok(value), | ||||||
|         Err(err) => match err.current_context() { |         Err(err) => match err.current_context() { | ||||||
|             ConnectionError::Query(DieselError::NotFound) => { |             ConnectionError::Query(DieselError::NotFound) => { | ||||||
| @ -337,8 +400,7 @@ where | |||||||
|     let query = <T as HasTable>::table().filter(predicate); |     let query = <T as HasTable>::table().filter(predicate); | ||||||
|     logger::debug!(query = %debug_query::<Pg, _>(&query).to_string()); |     logger::debug!(query = %debug_query::<Pg, _>(&query).to_string()); | ||||||
|  |  | ||||||
|     query |     track_database_call::<T, _, _>(query.get_result_async(conn), DatabaseOperation::FindOne) | ||||||
|         .get_result_async(conn) |  | ||||||
|         .await |         .await | ||||||
|         .into_report() |         .into_report() | ||||||
|         .map_err(|err| match err.current_context() { |         .map_err(|err| match err.current_context() { | ||||||
| @ -410,8 +472,7 @@ where | |||||||
|  |  | ||||||
|     logger::debug!(query = %debug_query::<Pg, _>(&query).to_string()); |     logger::debug!(query = %debug_query::<Pg, _>(&query).to_string()); | ||||||
|  |  | ||||||
|     query |     track_database_call::<T, _, _>(query.get_results_async(conn), DatabaseOperation::Filter) | ||||||
|         .get_results_async(conn) |  | ||||||
|         .await |         .await | ||||||
|         .into_report() |         .into_report() | ||||||
|         .change_context(errors::DatabaseError::NotFound) |         .change_context(errors::DatabaseError::NotFound) | ||||||
|  | |||||||
| @ -2,7 +2,7 @@ use async_bb8_diesel::AsyncRunQueryDsl; | |||||||
| use common_utils::errors::CustomResult; | use common_utils::errors::CustomResult; | ||||||
| use diesel::{associations::HasTable, ExpressionMethods, QueryDsl}; | use diesel::{associations::HasTable, ExpressionMethods, QueryDsl}; | ||||||
| pub use diesel_models::dispute::{Dispute, DisputeNew, DisputeUpdate}; | pub use diesel_models::dispute::{Dispute, DisputeNew, DisputeUpdate}; | ||||||
| use diesel_models::{errors, schema::dispute::dsl}; | use diesel_models::{errors, query::generics::db_metrics, schema::dispute::dsl}; | ||||||
| use error_stack::{IntoReport, ResultExt}; | use error_stack::{IntoReport, ResultExt}; | ||||||
|  |  | ||||||
| use crate::{connection::PgPooledConn, logger}; | use crate::{connection::PgPooledConn, logger}; | ||||||
| @ -61,8 +61,10 @@ impl DisputeDbExt for Dispute { | |||||||
|  |  | ||||||
|         logger::debug!(query = %diesel::debug_query::<diesel::pg::Pg, _>(&filter).to_string()); |         logger::debug!(query = %diesel::debug_query::<diesel::pg::Pg, _>(&filter).to_string()); | ||||||
|  |  | ||||||
|         filter |         db_metrics::track_database_call::<<Self as HasTable>::Table, _, _>( | ||||||
|             .get_results_async(conn) |             filter.get_results_async(conn), | ||||||
|  |             db_metrics::DatabaseOperation::Filter, | ||||||
|  |         ) | ||||||
|         .await |         .await | ||||||
|         .into_report() |         .into_report() | ||||||
|         .change_context(errors::DatabaseError::NotFound) |         .change_context(errors::DatabaseError::NotFound) | ||||||
|  | |||||||
| @ -1,5 +1,6 @@ | |||||||
| use async_bb8_diesel::AsyncRunQueryDsl; | use async_bb8_diesel::AsyncRunQueryDsl; | ||||||
| use diesel::{associations::HasTable, debug_query, pg::Pg, ExpressionMethods, JoinOnDsl, QueryDsl}; | use diesel::{associations::HasTable, debug_query, pg::Pg, ExpressionMethods, JoinOnDsl, QueryDsl}; | ||||||
|  | use diesel_models::query::generics::db_metrics; | ||||||
| pub use diesel_models::{ | pub use diesel_models::{ | ||||||
|     errors, |     errors, | ||||||
|     payment_attempt::PaymentAttempt, |     payment_attempt::PaymentAttempt, | ||||||
| @ -96,8 +97,10 @@ impl PaymentIntentDbExt for PaymentIntent { | |||||||
|  |  | ||||||
|         crate::logger::debug!(query = %debug_query::<Pg, _>(&filter).to_string()); |         crate::logger::debug!(query = %debug_query::<Pg, _>(&filter).to_string()); | ||||||
|  |  | ||||||
|         filter |         db_metrics::track_database_call::<<Self as HasTable>::Table, _, _>( | ||||||
|             .get_results_async(conn) |             filter.get_results_async(conn), | ||||||
|  |             db_metrics::DatabaseOperation::Filter, | ||||||
|  |         ) | ||||||
|         .await |         .await | ||||||
|         .into_report() |         .into_report() | ||||||
|         .change_context(errors::DatabaseError::NotFound) |         .change_context(errors::DatabaseError::NotFound) | ||||||
|  | |||||||
| @ -7,6 +7,7 @@ pub use diesel_models::refund::{ | |||||||
| use diesel_models::{ | use diesel_models::{ | ||||||
|     enums::{Currency, RefundStatus}, |     enums::{Currency, RefundStatus}, | ||||||
|     errors, |     errors, | ||||||
|  |     query::generics::db_metrics, | ||||||
|     schema::refund::dsl, |     schema::refund::dsl, | ||||||
| }; | }; | ||||||
| use error_stack::{IntoReport, ResultExt}; | use error_stack::{IntoReport, ResultExt}; | ||||||
| @ -78,8 +79,10 @@ impl RefundDbExt for Refund { | |||||||
|  |  | ||||||
|         logger::debug!(query = %diesel::debug_query::<diesel::pg::Pg, _>(&filter).to_string()); |         logger::debug!(query = %diesel::debug_query::<diesel::pg::Pg, _>(&filter).to_string()); | ||||||
|  |  | ||||||
|         filter |         db_metrics::track_database_call::<<Self as HasTable>::Table, _, _>( | ||||||
|             .get_results_async(conn) |             filter.get_results_async(conn), | ||||||
|  |             db_metrics::DatabaseOperation::Filter, | ||||||
|  |         ) | ||||||
|         .await |         .await | ||||||
|         .into_report() |         .into_report() | ||||||
|         .change_context(errors::DatabaseError::NotFound) |         .change_context(errors::DatabaseError::NotFound) | ||||||
|  | |||||||
| @ -20,6 +20,7 @@ pub mod vergen; | |||||||
| // pub use literally; | // pub use literally; | ||||||
| #[doc(inline)] | #[doc(inline)] | ||||||
| pub use logger::*; | pub use logger::*; | ||||||
|  | pub use once_cell; | ||||||
| pub use opentelemetry; | pub use opentelemetry; | ||||||
| pub use tracing; | pub use tracing; | ||||||
| #[cfg(feature = "actix_web")] | #[cfg(feature = "actix_web")] | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user
	 Nishant Joshi
					Nishant Joshi