mirror of
				https://github.com/juspay/hyperswitch.git
				synced 2025-11-04 05:59:48 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			70 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			70 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
use bb8::PooledConnection;
 | 
						|
use diesel::PgConnection;
 | 
						|
use error_stack::ResultExt;
 | 
						|
use hyperswitch_domain_models::errors::StorageError;
 | 
						|
 | 
						|
use crate::{errors::RedisErrorExt, metrics, DatabaseStore};
 | 
						|
 | 
						|
pub async fn pg_connection_read<T: DatabaseStore>(
 | 
						|
    store: &T,
 | 
						|
) -> error_stack::Result<
 | 
						|
    PooledConnection<'_, async_bb8_diesel::ConnectionManager<PgConnection>>,
 | 
						|
    StorageError,
 | 
						|
> {
 | 
						|
    // If only OLAP is enabled get replica pool.
 | 
						|
    #[cfg(all(feature = "olap", not(feature = "oltp")))]
 | 
						|
    let pool = store.get_replica_pool();
 | 
						|
 | 
						|
    // If either one of these are true we need to get master pool.
 | 
						|
    //  1. Only OLTP is enabled.
 | 
						|
    //  2. Both OLAP and OLTP is enabled.
 | 
						|
    //  3. Both OLAP and OLTP is disabled.
 | 
						|
    #[cfg(any(
 | 
						|
        all(not(feature = "olap"), feature = "oltp"),
 | 
						|
        all(feature = "olap", feature = "oltp"),
 | 
						|
        all(not(feature = "olap"), not(feature = "oltp"))
 | 
						|
    ))]
 | 
						|
    let pool = store.get_master_pool();
 | 
						|
 | 
						|
    pool.get()
 | 
						|
        .await
 | 
						|
        .change_context(StorageError::DatabaseConnectionError)
 | 
						|
}
 | 
						|
 | 
						|
pub async fn pg_connection_write<T: DatabaseStore>(
 | 
						|
    store: &T,
 | 
						|
) -> error_stack::Result<
 | 
						|
    PooledConnection<'_, async_bb8_diesel::ConnectionManager<PgConnection>>,
 | 
						|
    StorageError,
 | 
						|
> {
 | 
						|
    // Since all writes should happen to master DB only choose master DB.
 | 
						|
    let pool = store.get_master_pool();
 | 
						|
 | 
						|
    pool.get()
 | 
						|
        .await
 | 
						|
        .change_context(StorageError::DatabaseConnectionError)
 | 
						|
}
 | 
						|
 | 
						|
pub async fn try_redis_get_else_try_database_get<F, RFut, DFut, T>(
 | 
						|
    redis_fut: RFut,
 | 
						|
    database_call_closure: F,
 | 
						|
) -> error_stack::Result<T, StorageError>
 | 
						|
where
 | 
						|
    F: FnOnce() -> DFut,
 | 
						|
    RFut: futures::Future<Output = error_stack::Result<T, redis_interface::errors::RedisError>>,
 | 
						|
    DFut: futures::Future<Output = error_stack::Result<T, StorageError>>,
 | 
						|
{
 | 
						|
    let redis_output = redis_fut.await;
 | 
						|
    match redis_output {
 | 
						|
        Ok(output) => Ok(output),
 | 
						|
        Err(redis_error) => match redis_error.current_context() {
 | 
						|
            redis_interface::errors::RedisError::NotFound => {
 | 
						|
                metrics::KV_MISS.add(&metrics::CONTEXT, 1, &[]);
 | 
						|
                database_call_closure().await
 | 
						|
            }
 | 
						|
            // Keeping the key empty here since the error would never go here.
 | 
						|
            _ => Err(redis_error.to_redis_failed_response("")),
 | 
						|
        },
 | 
						|
    }
 | 
						|
}
 |