refactor(currency_conversion): re frame the currency_conversion crate to make api calls on background thread (#6906)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Prajjwal Kumar
2025-01-28 23:27:23 +05:30
committed by GitHub
parent ecab2b1f51
commit 858866f9f3
9 changed files with 142 additions and 203 deletions

View File

@ -303,16 +303,11 @@ pub struct PaymentLink {
#[derive(Debug, Deserialize, Clone, Default)]
#[serde(default)]
pub struct ForexApi {
pub local_fetch_retry_count: u64,
pub api_key: Secret<String>,
pub fallback_api_key: Secret<String>,
/// in ms
/// in s
pub call_delay: i64,
/// in ms
pub local_fetch_retry_delay: u64,
/// in ms
pub api_timeout: u64,
/// in ms
/// in s
pub redis_lock_timeout: u64,
}

View File

@ -15,16 +15,11 @@ pub async fn retrieve_forex(
) -> CustomResult<ApplicationResponse<currency::FxExchangeRatesCacheEntry>, ApiErrorResponse> {
let forex_api = state.conf.forex_api.get_inner();
Ok(ApplicationResponse::Json(
get_forex_rates(
&state,
forex_api.call_delay,
forex_api.local_fetch_retry_delay,
forex_api.local_fetch_retry_count,
)
.await
.change_context(ApiErrorResponse::GenericNotFoundError {
message: "Unable to fetch forex rates".to_string(),
})?,
get_forex_rates(&state, forex_api.call_delay)
.await
.change_context(ApiErrorResponse::GenericNotFoundError {
message: "Unable to fetch forex rates".to_string(),
})?,
))
}
@ -53,14 +48,9 @@ pub async fn get_forex_exchange_rates(
state: SessionState,
) -> CustomResult<ExchangeRates, AnalyticsError> {
let forex_api = state.conf.forex_api.get_inner();
let rates = get_forex_rates(
&state,
forex_api.call_delay,
forex_api.local_fetch_retry_delay,
forex_api.local_fetch_retry_count,
)
.await
.change_context(AnalyticsError::ForexFetchFailed)?;
let rates = get_forex_rates(&state, forex_api.call_delay)
.await
.change_context(AnalyticsError::ForexFetchFailed)?;
Ok((*rates.data).clone())
}

View File

@ -1,4 +1,4 @@
use std::{collections::HashMap, ops::Deref, str::FromStr, sync::Arc, time::Duration};
use std::{collections::HashMap, ops::Deref, str::FromStr, sync::Arc};
use api_models::enums;
use common_utils::{date_time, errors::CustomResult, events::ApiEventMetric, ext_traits::AsyncExt};
@ -10,7 +10,8 @@ use redis_interface::DelReply;
use router_env::{instrument, tracing};
use rust_decimal::Decimal;
use strum::IntoEnumIterator;
use tokio::{sync::RwLock, time::sleep};
use tokio::sync::RwLock;
use tracing_futures::Instrument;
use crate::{
logger,
@ -50,10 +51,14 @@ pub enum ForexCacheError {
CouldNotAcquireLock,
#[error("Provided currency not acceptable")]
CurrencyNotAcceptable,
#[error("Forex configuration error: {0}")]
ConfigurationError(String),
#[error("Incorrect entries in default Currency response")]
DefaultCurrencyParsingError,
#[error("Entry not found in cache")]
EntryNotFound,
#[error("Forex data unavailable")]
ForexDataUnavailable,
#[error("Expiration time invalid")]
InvalidLogExpiry,
#[error("Error reading local")]
@ -107,44 +112,19 @@ impl FxExchangeRatesCacheEntry {
}
}
async fn retrieve_forex_from_local() -> Option<FxExchangeRatesCacheEntry> {
async fn retrieve_forex_from_local_cache() -> Option<FxExchangeRatesCacheEntry> {
FX_EXCHANGE_RATES_CACHE.read().await.clone()
}
async fn save_forex_to_local(
async fn save_forex_data_to_local_cache(
exchange_rates_cache_entry: FxExchangeRatesCacheEntry,
) -> CustomResult<(), ForexCacheError> {
let mut local = FX_EXCHANGE_RATES_CACHE.write().await;
*local = Some(exchange_rates_cache_entry);
logger::debug!("forex_log: forex saved in cache");
Ok(())
}
// Alternative handler for handling the case, When no data in local as well as redis
#[allow(dead_code)]
async fn waited_fetch_and_update_caches(
state: &SessionState,
local_fetch_retry_delay: u64,
local_fetch_retry_count: u64,
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
for _n in 1..local_fetch_retry_count {
sleep(Duration::from_millis(local_fetch_retry_delay)).await;
//read from redis and update local plus break the loop and return
match retrieve_forex_from_redis(state).await {
Ok(Some(rates)) => {
save_forex_to_local(rates.clone()).await?;
return Ok(rates.clone());
}
Ok(None) => continue,
Err(error) => {
logger::error!(?error);
continue;
}
}
}
//acquire lock one last time and try to fetch and update local & redis
successive_fetch_and_save_forex(state, None).await
}
impl TryFrom<DefaultExchangeRates> for ExchangeRates {
type Error = error_stack::Report<ForexCacheError>;
fn try_from(value: DefaultExchangeRates) -> Result<Self, Self::Error> {
@ -178,102 +158,108 @@ impl From<Conversion> for CurrencyFactors {
pub async fn get_forex_rates(
state: &SessionState,
call_delay: i64,
local_fetch_retry_delay: u64,
local_fetch_retry_count: u64,
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
if let Some(local_rates) = retrieve_forex_from_local().await {
if let Some(local_rates) = retrieve_forex_from_local_cache().await {
if local_rates.is_expired(call_delay) {
// expired local data
handler_local_expired(state, call_delay, local_rates).await
logger::debug!("forex_log: Forex stored in cache is expired");
call_forex_api_and_save_data_to_cache_and_redis(state, Some(local_rates)).await
} else {
// Valid data present in local
logger::debug!("forex_log: forex found in cache");
Ok(local_rates)
}
} else {
// No data in local
handler_local_no_data(
state,
call_delay,
local_fetch_retry_delay,
local_fetch_retry_count,
)
.await
call_api_if_redis_forex_data_expired(state, call_delay).await
}
}
async fn handler_local_no_data(
async fn call_api_if_redis_forex_data_expired(
state: &SessionState,
call_delay: i64,
_local_fetch_retry_delay: u64,
_local_fetch_retry_count: u64,
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
match retrieve_forex_from_redis(state).await {
Ok(Some(data)) => fallback_forex_redis_check(state, data, call_delay).await,
match retrieve_forex_data_from_redis(state).await {
Ok(Some(data)) => call_forex_api_if_redis_data_expired(state, data, call_delay).await,
Ok(None) => {
// No data in local as well as redis
Ok(successive_fetch_and_save_forex(state, None).await?)
call_forex_api_and_save_data_to_cache_and_redis(state, None).await?;
Err(ForexCacheError::ForexDataUnavailable.into())
}
Err(error) => {
logger::error!(?error);
Ok(successive_fetch_and_save_forex(state, None).await?)
// Error in deriving forex rates from redis
logger::error!("forex_error: {:?}", error);
call_forex_api_and_save_data_to_cache_and_redis(state, None).await?;
Err(ForexCacheError::ForexDataUnavailable.into())
}
}
}
async fn successive_fetch_and_save_forex(
async fn call_forex_api_and_save_data_to_cache_and_redis(
state: &SessionState,
stale_redis_data: Option<FxExchangeRatesCacheEntry>,
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
match acquire_redis_lock(state).await {
Ok(lock_acquired) => {
if !lock_acquired {
return stale_redis_data.ok_or(ForexCacheError::CouldNotAcquireLock.into());
// spawn a new thread and do the api fetch and write operations on redis.
let forex_api_key = state.conf.forex_api.get_inner().api_key.peek();
if forex_api_key.is_empty() {
Err(ForexCacheError::ConfigurationError("api_keys not provided".into()).into())
} else {
let state = state.clone();
tokio::spawn(
async move {
acquire_redis_lock_and_call_forex_api(&state)
.await
.map_err(|err| {
logger::error!(forex_error=?err);
})
.ok();
}
let api_rates = fetch_forex_rates(state).await;
match api_rates {
Ok(rates) => successive_save_data_to_redis_local(state, rates).await,
Err(error) => {
// API not able to fetch data call secondary service
logger::error!(?error);
let secondary_api_rates = fallback_fetch_forex_rates(state).await;
match secondary_api_rates {
Ok(rates) => Ok(successive_save_data_to_redis_local(state, rates).await?),
Err(error) => stale_redis_data.ok_or({
logger::error!(?error);
release_redis_lock(state).await?;
ForexCacheError::ApiUnresponsive.into()
}),
.in_current_span(),
);
stale_redis_data.ok_or(ForexCacheError::EntryNotFound.into())
}
}
async fn acquire_redis_lock_and_call_forex_api(
state: &SessionState,
) -> CustomResult<(), ForexCacheError> {
let lock_acquired = acquire_redis_lock(state).await?;
if !lock_acquired {
Err(ForexCacheError::CouldNotAcquireLock.into())
} else {
logger::debug!("forex_log: redis lock acquired");
let api_rates = fetch_forex_rates_from_primary_api(state).await;
match api_rates {
Ok(rates) => save_forex_data_to_cache_and_redis(state, rates).await,
Err(error) => {
logger::error!(forex_error=?error,"primary_forex_error");
// API not able to fetch data call secondary service
let secondary_api_rates = fetch_forex_rates_from_fallback_api(state).await;
match secondary_api_rates {
Ok(rates) => save_forex_data_to_cache_and_redis(state, rates).await,
Err(error) => {
release_redis_lock(state).await?;
Err(error)
}
}
}
}
Err(error) => stale_redis_data.ok_or({
logger::error!(?error);
ForexCacheError::ApiUnresponsive.into()
}),
}
}
async fn successive_save_data_to_redis_local(
async fn save_forex_data_to_cache_and_redis(
state: &SessionState,
forex: FxExchangeRatesCacheEntry,
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
Ok(save_forex_to_redis(state, &forex)
) -> CustomResult<(), ForexCacheError> {
save_forex_data_to_redis(state, &forex)
.await
.async_and_then(|_rates| release_redis_lock(state))
.await
.async_and_then(|_val| save_forex_to_local(forex.clone()))
.async_and_then(|_val| save_forex_data_to_local_cache(forex.clone()))
.await
.map_or_else(
|error| {
logger::error!(?error);
forex.clone()
},
|_| forex.clone(),
))
}
async fn fallback_forex_redis_check(
async fn call_forex_api_if_redis_data_expired(
state: &SessionState,
redis_data: FxExchangeRatesCacheEntry,
call_delay: i64,
@ -282,57 +268,30 @@ async fn fallback_forex_redis_check(
Some(redis_forex) => {
// Valid data present in redis
let exchange_rates = FxExchangeRatesCacheEntry::new(redis_forex.as_ref().clone());
save_forex_to_local(exchange_rates.clone()).await?;
logger::debug!("forex_log: forex response found in redis");
save_forex_data_to_local_cache(exchange_rates.clone()).await?;
Ok(exchange_rates)
}
None => {
// redis expired
successive_fetch_and_save_forex(state, Some(redis_data)).await
call_forex_api_and_save_data_to_cache_and_redis(state, Some(redis_data)).await
}
}
}
async fn handler_local_expired(
state: &SessionState,
call_delay: i64,
local_rates: FxExchangeRatesCacheEntry,
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
match retrieve_forex_from_redis(state).await {
Ok(redis_data) => {
match is_redis_expired(redis_data.as_ref(), call_delay).await {
Some(redis_forex) => {
// Valid data present in redis
let exchange_rates =
FxExchangeRatesCacheEntry::new(redis_forex.as_ref().clone());
save_forex_to_local(exchange_rates.clone()).await?;
Ok(exchange_rates)
}
None => {
// Redis is expired going for API request
successive_fetch_and_save_forex(state, Some(local_rates)).await
}
}
}
Err(error) => {
// data not present in redis waited fetch
logger::error!(?error);
successive_fetch_and_save_forex(state, Some(local_rates)).await
}
}
}
async fn fetch_forex_rates(
async fn fetch_forex_rates_from_primary_api(
state: &SessionState,
) -> Result<FxExchangeRatesCacheEntry, error_stack::Report<ForexCacheError>> {
let forex_api_key = state.conf.forex_api.get_inner().api_key.peek();
logger::debug!("forex_log: Primary api call for forex fetch");
let forex_url: String = format!("{}{}{}", FOREX_BASE_URL, forex_api_key, FOREX_BASE_CURRENCY);
let forex_request = services::RequestBuilder::new()
.method(services::Method::Get)
.url(&forex_url)
.build();
logger::info!(?forex_request);
logger::info!(primary_forex_request=?forex_request,"forex_log: Primary api call for forex fetch");
let response = state
.api_client
.send_request(
@ -352,7 +311,7 @@ async fn fetch_forex_rates(
"Unable to parse response received from primary api into ForexResponse",
)?;
logger::info!("{:?}", forex_response);
logger::info!(primary_forex_response=?forex_response,"forex_log");
let mut conversions: HashMap<enums::Currency, CurrencyFactors> = HashMap::new();
for enum_curr in enums::Currency::iter() {
@ -361,7 +320,10 @@ async fn fetch_forex_rates(
let from_factor = match Decimal::new(1, 0).checked_div(**rate) {
Some(rate) => rate,
None => {
logger::error!("Rates for {} not received from API", &enum_curr);
logger::error!(
"forex_error: Rates for {} not received from API",
&enum_curr
);
continue;
}
};
@ -369,7 +331,10 @@ async fn fetch_forex_rates(
conversions.insert(enum_curr, currency_factors);
}
None => {
logger::error!("Rates for {} not received from API", &enum_curr);
logger::error!(
"forex_error: Rates for {} not received from API",
&enum_curr
);
}
};
}
@ -380,7 +345,7 @@ async fn fetch_forex_rates(
)))
}
pub async fn fallback_fetch_forex_rates(
pub async fn fetch_forex_rates_from_fallback_api(
state: &SessionState,
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
let fallback_forex_api_key = state.conf.forex_api.get_inner().fallback_api_key.peek();
@ -392,7 +357,7 @@ pub async fn fallback_fetch_forex_rates(
.url(&fallback_forex_url)
.build();
logger::info!(?fallback_forex_request);
logger::info!(fallback_forex_request=?fallback_forex_request,"forex_log: Fallback api call for forex fetch");
let response = state
.api_client
.send_request(
@ -413,7 +378,8 @@ pub async fn fallback_fetch_forex_rates(
"Unable to parse response received from falback api into ForexResponse",
)?;
logger::info!("{:?}", fallback_forex_response);
logger::info!(fallback_forex_response=?fallback_forex_response,"forex_log");
let mut conversions: HashMap<enums::Currency, CurrencyFactors> = HashMap::new();
for enum_curr in enums::Currency::iter() {
match fallback_forex_response.quotes.get(
@ -428,7 +394,10 @@ pub async fn fallback_fetch_forex_rates(
let from_factor = match Decimal::new(1, 0).checked_div(**rate) {
Some(rate) => rate,
None => {
logger::error!("Rates for {} not received from API", &enum_curr);
logger::error!(
"forex_error: Rates for {} not received from API",
&enum_curr
);
continue;
}
};
@ -441,7 +410,10 @@ pub async fn fallback_fetch_forex_rates(
CurrencyFactors::new(Decimal::new(1, 0), Decimal::new(1, 0));
conversions.insert(enum_curr, currency_factors);
} else {
logger::error!("Rates for {} not received from API", &enum_curr);
logger::error!(
"forex_error: Rates for {} not received from API",
&enum_curr
);
}
}
};
@ -450,17 +422,18 @@ pub async fn fallback_fetch_forex_rates(
let rates =
FxExchangeRatesCacheEntry::new(ExchangeRates::new(enums::Currency::USD, conversions));
match acquire_redis_lock(state).await {
Ok(_) => Ok(successive_save_data_to_redis_local(state, rates).await?),
Err(e) => {
logger::error!(?e);
Ok(_) => {
save_forex_data_to_cache_and_redis(state, rates.clone()).await?;
Ok(rates)
}
Err(e) => Err(e),
}
}
async fn release_redis_lock(
state: &SessionState,
) -> Result<DelReply, error_stack::Report<ForexCacheError>> {
logger::debug!("forex_log: Releasing redis lock");
state
.store
.get_redis_conn()
@ -473,6 +446,7 @@ async fn release_redis_lock(
async fn acquire_redis_lock(state: &SessionState) -> CustomResult<bool, ForexCacheError> {
let forex_api = state.conf.forex_api.get_inner();
logger::debug!("forex_log: Acquiring redis lock");
state
.store
.get_redis_conn()
@ -481,11 +455,8 @@ async fn acquire_redis_lock(state: &SessionState) -> CustomResult<bool, ForexCac
REDIX_FOREX_CACHE_KEY,
"",
Some(
i64::try_from(
forex_api.local_fetch_retry_count * forex_api.local_fetch_retry_delay
+ forex_api.api_timeout,
)
.change_context(ForexCacheError::ConversionError)?,
i64::try_from(forex_api.redis_lock_timeout)
.change_context(ForexCacheError::ConversionError)?,
),
)
.await
@ -494,10 +465,11 @@ async fn acquire_redis_lock(state: &SessionState) -> CustomResult<bool, ForexCac
.attach_printable("Unable to acquire redis lock")
}
async fn save_forex_to_redis(
async fn save_forex_data_to_redis(
app_state: &SessionState,
forex_exchange_cache_entry: &FxExchangeRatesCacheEntry,
) -> CustomResult<(), ForexCacheError> {
logger::debug!("forex_log: Saving forex to redis");
app_state
.store
.get_redis_conn()
@ -508,9 +480,10 @@ async fn save_forex_to_redis(
.attach_printable("Unable to save forex data to redis")
}
async fn retrieve_forex_from_redis(
async fn retrieve_forex_data_from_redis(
app_state: &SessionState,
) -> CustomResult<Option<FxExchangeRatesCacheEntry>, ForexCacheError> {
logger::debug!("forex_log: Retrieving forex from redis");
app_state
.store
.get_redis_conn()
@ -529,6 +502,7 @@ async fn is_redis_expired(
if cache.timestamp + call_delay > date_time::now_unix_timestamp() {
Some(cache.data.clone())
} else {
logger::debug!("forex_log: Forex stored in redis is expired");
None
}
})
@ -542,14 +516,9 @@ pub async fn convert_currency(
from_currency: String,
) -> CustomResult<api_models::currency::CurrencyConversionResponse, ForexCacheError> {
let forex_api = state.conf.forex_api.get_inner();
let rates = get_forex_rates(
&state,
forex_api.call_delay,
forex_api.local_fetch_retry_delay,
forex_api.local_fetch_retry_count,
)
.await
.change_context(ForexCacheError::ApiError)?;
let rates = get_forex_rates(&state, forex_api.call_delay)
.await
.change_context(ForexCacheError::ApiError)?;
let to_currency = enums::Currency::from_str(to_currency.as_str())
.change_context(ForexCacheError::CurrencyNotAcceptable)