mirror of
				https://github.com/juspay/hyperswitch.git
				synced 2025-10-31 10:06:32 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			70 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			70 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
| use bb8::PooledConnection;
 | |
| use diesel::PgConnection;
 | |
| use error_stack::ResultExt;
 | |
| use hyperswitch_domain_models::errors::StorageError;
 | |
| 
 | |
| use crate::{errors::RedisErrorExt, metrics, DatabaseStore};
 | |
| 
 | |
| pub async fn pg_connection_read<T: DatabaseStore>(
 | |
|     store: &T,
 | |
| ) -> error_stack::Result<
 | |
|     PooledConnection<'_, async_bb8_diesel::ConnectionManager<PgConnection>>,
 | |
|     StorageError,
 | |
| > {
 | |
|     // If only OLAP is enabled get replica pool.
 | |
|     #[cfg(all(feature = "olap", not(feature = "oltp")))]
 | |
|     let pool = store.get_replica_pool();
 | |
| 
 | |
|     // If either one of these are true we need to get master pool.
 | |
|     //  1. Only OLTP is enabled.
 | |
|     //  2. Both OLAP and OLTP is enabled.
 | |
|     //  3. Both OLAP and OLTP is disabled.
 | |
|     #[cfg(any(
 | |
|         all(not(feature = "olap"), feature = "oltp"),
 | |
|         all(feature = "olap", feature = "oltp"),
 | |
|         all(not(feature = "olap"), not(feature = "oltp"))
 | |
|     ))]
 | |
|     let pool = store.get_master_pool();
 | |
| 
 | |
|     pool.get()
 | |
|         .await
 | |
|         .change_context(StorageError::DatabaseConnectionError)
 | |
| }
 | |
| 
 | |
| pub async fn pg_connection_write<T: DatabaseStore>(
 | |
|     store: &T,
 | |
| ) -> error_stack::Result<
 | |
|     PooledConnection<'_, async_bb8_diesel::ConnectionManager<PgConnection>>,
 | |
|     StorageError,
 | |
| > {
 | |
|     // Since all writes should happen to master DB only choose master DB.
 | |
|     let pool = store.get_master_pool();
 | |
| 
 | |
|     pool.get()
 | |
|         .await
 | |
|         .change_context(StorageError::DatabaseConnectionError)
 | |
| }
 | |
| 
 | |
| pub async fn try_redis_get_else_try_database_get<F, RFut, DFut, T>(
 | |
|     redis_fut: RFut,
 | |
|     database_call_closure: F,
 | |
| ) -> error_stack::Result<T, StorageError>
 | |
| where
 | |
|     F: FnOnce() -> DFut,
 | |
|     RFut: futures::Future<Output = error_stack::Result<T, redis_interface::errors::RedisError>>,
 | |
|     DFut: futures::Future<Output = error_stack::Result<T, StorageError>>,
 | |
| {
 | |
|     let redis_output = redis_fut.await;
 | |
|     match redis_output {
 | |
|         Ok(output) => Ok(output),
 | |
|         Err(redis_error) => match redis_error.current_context() {
 | |
|             redis_interface::errors::RedisError::NotFound => {
 | |
|                 metrics::KV_MISS.add(&metrics::CONTEXT, 1, &[]);
 | |
|                 database_call_closure().await
 | |
|             }
 | |
|             // Keeping the key empty here since the error would never go here.
 | |
|             _ => Err(redis_error.to_redis_failed_response("")),
 | |
|         },
 | |
|     }
 | |
| }
 | 
