mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-27 19:46:48 +08:00
refactor(currency_conversion): release redis lock if api call fails (#6671)
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
@ -7,6 +7,7 @@ use error_stack::ResultExt;
|
|||||||
use masking::PeekInterface;
|
use masking::PeekInterface;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use redis_interface::DelReply;
|
use redis_interface::DelReply;
|
||||||
|
use router_env::{instrument, tracing};
|
||||||
use rust_decimal::Decimal;
|
use rust_decimal::Decimal;
|
||||||
use strum::IntoEnumIterator;
|
use strum::IntoEnumIterator;
|
||||||
use tokio::{sync::RwLock, time::sleep};
|
use tokio::{sync::RwLock, time::sleep};
|
||||||
@ -150,11 +151,13 @@ impl TryFrom<DefaultExchangeRates> for ExchangeRates {
|
|||||||
let mut conversion_usable: HashMap<enums::Currency, CurrencyFactors> = HashMap::new();
|
let mut conversion_usable: HashMap<enums::Currency, CurrencyFactors> = HashMap::new();
|
||||||
for (curr, conversion) in value.conversion {
|
for (curr, conversion) in value.conversion {
|
||||||
let enum_curr = enums::Currency::from_str(curr.as_str())
|
let enum_curr = enums::Currency::from_str(curr.as_str())
|
||||||
.change_context(ForexCacheError::ConversionError)?;
|
.change_context(ForexCacheError::ConversionError)
|
||||||
|
.attach_printable("Unable to Convert currency received")?;
|
||||||
conversion_usable.insert(enum_curr, CurrencyFactors::from(conversion));
|
conversion_usable.insert(enum_curr, CurrencyFactors::from(conversion));
|
||||||
}
|
}
|
||||||
let base_curr = enums::Currency::from_str(value.base_currency.as_str())
|
let base_curr = enums::Currency::from_str(value.base_currency.as_str())
|
||||||
.change_context(ForexCacheError::ConversionError)?;
|
.change_context(ForexCacheError::ConversionError)
|
||||||
|
.attach_printable("Unable to convert base currency")?;
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
base_currency: base_curr,
|
base_currency: base_curr,
|
||||||
conversion: conversion_usable,
|
conversion: conversion_usable,
|
||||||
@ -170,6 +173,8 @@ impl From<Conversion> for CurrencyFactors {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip_all)]
|
||||||
pub async fn get_forex_rates(
|
pub async fn get_forex_rates(
|
||||||
state: &SessionState,
|
state: &SessionState,
|
||||||
call_delay: i64,
|
call_delay: i64,
|
||||||
@ -235,6 +240,7 @@ async fn successive_fetch_and_save_forex(
|
|||||||
Ok(rates) => Ok(successive_save_data_to_redis_local(state, rates).await?),
|
Ok(rates) => Ok(successive_save_data_to_redis_local(state, rates).await?),
|
||||||
Err(error) => stale_redis_data.ok_or({
|
Err(error) => stale_redis_data.ok_or({
|
||||||
logger::error!(?error);
|
logger::error!(?error);
|
||||||
|
release_redis_lock(state).await?;
|
||||||
ForexCacheError::ApiUnresponsive.into()
|
ForexCacheError::ApiUnresponsive.into()
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
@ -254,9 +260,9 @@ async fn successive_save_data_to_redis_local(
|
|||||||
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
|
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
|
||||||
Ok(save_forex_to_redis(state, &forex)
|
Ok(save_forex_to_redis(state, &forex)
|
||||||
.await
|
.await
|
||||||
.async_and_then(|_rates| async { release_redis_lock(state).await })
|
.async_and_then(|_rates| release_redis_lock(state))
|
||||||
.await
|
.await
|
||||||
.async_and_then(|_val| async { Ok(save_forex_to_local(forex.clone()).await) })
|
.async_and_then(|_val| save_forex_to_local(forex.clone()))
|
||||||
.await
|
.await
|
||||||
.map_or_else(
|
.map_or_else(
|
||||||
|error| {
|
|error| {
|
||||||
@ -336,11 +342,15 @@ async fn fetch_forex_rates(
|
|||||||
false,
|
false,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.change_context(ForexCacheError::ApiUnresponsive)?;
|
.change_context(ForexCacheError::ApiUnresponsive)
|
||||||
|
.attach_printable("Primary forex fetch api unresponsive")?;
|
||||||
let forex_response = response
|
let forex_response = response
|
||||||
.json::<ForexResponse>()
|
.json::<ForexResponse>()
|
||||||
.await
|
.await
|
||||||
.change_context(ForexCacheError::ParsingError)?;
|
.change_context(ForexCacheError::ParsingError)
|
||||||
|
.attach_printable(
|
||||||
|
"Unable to parse response received from primary api into ForexResponse",
|
||||||
|
)?;
|
||||||
|
|
||||||
logger::info!("{:?}", forex_response);
|
logger::info!("{:?}", forex_response);
|
||||||
|
|
||||||
@ -392,11 +402,16 @@ pub async fn fallback_fetch_forex_rates(
|
|||||||
false,
|
false,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.change_context(ForexCacheError::ApiUnresponsive)?;
|
.change_context(ForexCacheError::ApiUnresponsive)
|
||||||
|
.attach_printable("Fallback forex fetch api unresponsive")?;
|
||||||
|
|
||||||
let fallback_forex_response = response
|
let fallback_forex_response = response
|
||||||
.json::<FallbackForexResponse>()
|
.json::<FallbackForexResponse>()
|
||||||
.await
|
.await
|
||||||
.change_context(ForexCacheError::ParsingError)?;
|
.change_context(ForexCacheError::ParsingError)
|
||||||
|
.attach_printable(
|
||||||
|
"Unable to parse response received from falback api into ForexResponse",
|
||||||
|
)?;
|
||||||
|
|
||||||
logger::info!("{:?}", fallback_forex_response);
|
logger::info!("{:?}", fallback_forex_response);
|
||||||
let mut conversions: HashMap<enums::Currency, CurrencyFactors> = HashMap::new();
|
let mut conversions: HashMap<enums::Currency, CurrencyFactors> = HashMap::new();
|
||||||
@ -453,6 +468,7 @@ async fn release_redis_lock(
|
|||||||
.delete_key(REDIX_FOREX_CACHE_KEY)
|
.delete_key(REDIX_FOREX_CACHE_KEY)
|
||||||
.await
|
.await
|
||||||
.change_context(ForexCacheError::RedisLockReleaseFailed)
|
.change_context(ForexCacheError::RedisLockReleaseFailed)
|
||||||
|
.attach_printable("Unable to release redis lock")
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn acquire_redis_lock(state: &SessionState) -> CustomResult<bool, ForexCacheError> {
|
async fn acquire_redis_lock(state: &SessionState) -> CustomResult<bool, ForexCacheError> {
|
||||||
@ -475,6 +491,7 @@ async fn acquire_redis_lock(state: &SessionState) -> CustomResult<bool, ForexCac
|
|||||||
.await
|
.await
|
||||||
.map(|val| matches!(val, redis_interface::SetnxReply::KeySet))
|
.map(|val| matches!(val, redis_interface::SetnxReply::KeySet))
|
||||||
.change_context(ForexCacheError::CouldNotAcquireLock)
|
.change_context(ForexCacheError::CouldNotAcquireLock)
|
||||||
|
.attach_printable("Unable to acquire redis lock")
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn save_forex_to_redis(
|
async fn save_forex_to_redis(
|
||||||
@ -488,6 +505,7 @@ async fn save_forex_to_redis(
|
|||||||
.serialize_and_set_key(REDIX_FOREX_CACHE_DATA, forex_exchange_cache_entry)
|
.serialize_and_set_key(REDIX_FOREX_CACHE_DATA, forex_exchange_cache_entry)
|
||||||
.await
|
.await
|
||||||
.change_context(ForexCacheError::RedisWriteError)
|
.change_context(ForexCacheError::RedisWriteError)
|
||||||
|
.attach_printable("Unable to save forex data to redis")
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn retrieve_forex_from_redis(
|
async fn retrieve_forex_from_redis(
|
||||||
@ -500,6 +518,7 @@ async fn retrieve_forex_from_redis(
|
|||||||
.get_and_deserialize_key(REDIX_FOREX_CACHE_DATA, "FxExchangeRatesCache")
|
.get_and_deserialize_key(REDIX_FOREX_CACHE_DATA, "FxExchangeRatesCache")
|
||||||
.await
|
.await
|
||||||
.change_context(ForexCacheError::EntryNotFound)
|
.change_context(ForexCacheError::EntryNotFound)
|
||||||
|
.attach_printable("Forex entry not found in redis")
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn is_redis_expired(
|
async fn is_redis_expired(
|
||||||
@ -515,6 +534,7 @@ async fn is_redis_expired(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip_all)]
|
||||||
pub async fn convert_currency(
|
pub async fn convert_currency(
|
||||||
state: SessionState,
|
state: SessionState,
|
||||||
amount: i64,
|
amount: i64,
|
||||||
@ -532,14 +552,17 @@ pub async fn convert_currency(
|
|||||||
.change_context(ForexCacheError::ApiError)?;
|
.change_context(ForexCacheError::ApiError)?;
|
||||||
|
|
||||||
let to_currency = enums::Currency::from_str(to_currency.as_str())
|
let to_currency = enums::Currency::from_str(to_currency.as_str())
|
||||||
.change_context(ForexCacheError::CurrencyNotAcceptable)?;
|
.change_context(ForexCacheError::CurrencyNotAcceptable)
|
||||||
|
.attach_printable("The provided currency is not acceptable")?;
|
||||||
|
|
||||||
let from_currency = enums::Currency::from_str(from_currency.as_str())
|
let from_currency = enums::Currency::from_str(from_currency.as_str())
|
||||||
.change_context(ForexCacheError::CurrencyNotAcceptable)?;
|
.change_context(ForexCacheError::CurrencyNotAcceptable)
|
||||||
|
.attach_printable("The provided currency is not acceptable")?;
|
||||||
|
|
||||||
let converted_amount =
|
let converted_amount =
|
||||||
currency_conversion::conversion::convert(&rates.data, from_currency, to_currency, amount)
|
currency_conversion::conversion::convert(&rates.data, from_currency, to_currency, amount)
|
||||||
.change_context(ForexCacheError::ConversionError)?;
|
.change_context(ForexCacheError::ConversionError)
|
||||||
|
.attach_printable("Unable to perform currency conversion")?;
|
||||||
|
|
||||||
Ok(api_models::currency::CurrencyConversionResponse {
|
Ok(api_models::currency::CurrencyConversionResponse {
|
||||||
converted_amount: converted_amount.to_string(),
|
converted_amount: converted_amount.to_string(),
|
||||||
|
|||||||
Reference in New Issue
Block a user