|
|
|
|
@ -38,7 +38,7 @@ static FX_EXCHANGE_RATES_CACHE: Lazy<RwLock<Option<FxExchangeRatesCacheEntry>>>
|
|
|
|
|
impl ApiEventMetric for FxExchangeRatesCacheEntry {}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, thiserror::Error)]
|
|
|
|
|
pub enum ForexCacheError {
|
|
|
|
|
pub enum ForexError {
|
|
|
|
|
#[error("API error")]
|
|
|
|
|
ApiError,
|
|
|
|
|
#[error("API timeout")]
|
|
|
|
|
@ -107,8 +107,8 @@ impl FxExchangeRatesCacheEntry {
|
|
|
|
|
timestamp: date_time::now_unix_timestamp(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
fn is_expired(&self, call_delay: i64) -> bool {
|
|
|
|
|
self.timestamp + call_delay < date_time::now_unix_timestamp()
|
|
|
|
|
fn is_expired(&self, data_expiration_delay: u32) -> bool {
|
|
|
|
|
self.timestamp + i64::from(data_expiration_delay) < date_time::now_unix_timestamp()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -118,7 +118,7 @@ async fn retrieve_forex_from_local_cache() -> Option<FxExchangeRatesCacheEntry>
|
|
|
|
|
|
|
|
|
|
async fn save_forex_data_to_local_cache(
|
|
|
|
|
exchange_rates_cache_entry: FxExchangeRatesCacheEntry,
|
|
|
|
|
) -> CustomResult<(), ForexCacheError> {
|
|
|
|
|
) -> CustomResult<(), ForexError> {
|
|
|
|
|
let mut local = FX_EXCHANGE_RATES_CACHE.write().await;
|
|
|
|
|
*local = Some(exchange_rates_cache_entry);
|
|
|
|
|
logger::debug!("forex_log: forex saved in cache");
|
|
|
|
|
@ -126,17 +126,17 @@ async fn save_forex_data_to_local_cache(
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl TryFrom<DefaultExchangeRates> for ExchangeRates {
|
|
|
|
|
type Error = error_stack::Report<ForexCacheError>;
|
|
|
|
|
type Error = error_stack::Report<ForexError>;
|
|
|
|
|
fn try_from(value: DefaultExchangeRates) -> Result<Self, Self::Error> {
|
|
|
|
|
let mut conversion_usable: HashMap<enums::Currency, CurrencyFactors> = HashMap::new();
|
|
|
|
|
for (curr, conversion) in value.conversion {
|
|
|
|
|
let enum_curr = enums::Currency::from_str(curr.as_str())
|
|
|
|
|
.change_context(ForexCacheError::ConversionError)
|
|
|
|
|
.change_context(ForexError::ConversionError)
|
|
|
|
|
.attach_printable("Unable to Convert currency received")?;
|
|
|
|
|
conversion_usable.insert(enum_curr, CurrencyFactors::from(conversion));
|
|
|
|
|
}
|
|
|
|
|
let base_curr = enums::Currency::from_str(value.base_currency.as_str())
|
|
|
|
|
.change_context(ForexCacheError::ConversionError)
|
|
|
|
|
.change_context(ForexError::ConversionError)
|
|
|
|
|
.attach_printable("Unable to convert base currency")?;
|
|
|
|
|
Ok(Self {
|
|
|
|
|
base_currency: base_curr,
|
|
|
|
|
@ -157,10 +157,10 @@ impl From<Conversion> for CurrencyFactors {
|
|
|
|
|
#[instrument(skip_all)]
|
|
|
|
|
pub async fn get_forex_rates(
|
|
|
|
|
state: &SessionState,
|
|
|
|
|
call_delay: i64,
|
|
|
|
|
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
|
|
|
|
|
data_expiration_delay: u32,
|
|
|
|
|
) -> CustomResult<FxExchangeRatesCacheEntry, ForexError> {
|
|
|
|
|
if let Some(local_rates) = retrieve_forex_from_local_cache().await {
|
|
|
|
|
if local_rates.is_expired(call_delay) {
|
|
|
|
|
if local_rates.is_expired(data_expiration_delay) {
|
|
|
|
|
// expired local data
|
|
|
|
|
logger::debug!("forex_log: Forex stored in cache is expired");
|
|
|
|
|
call_forex_api_and_save_data_to_cache_and_redis(state, Some(local_rates)).await
|
|
|
|
|
@ -171,26 +171,28 @@ pub async fn get_forex_rates(
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// No data in local
|
|
|
|
|
call_api_if_redis_forex_data_expired(state, call_delay).await
|
|
|
|
|
call_api_if_redis_forex_data_expired(state, data_expiration_delay).await
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn call_api_if_redis_forex_data_expired(
|
|
|
|
|
state: &SessionState,
|
|
|
|
|
call_delay: i64,
|
|
|
|
|
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
|
|
|
|
|
data_expiration_delay: u32,
|
|
|
|
|
) -> CustomResult<FxExchangeRatesCacheEntry, ForexError> {
|
|
|
|
|
match retrieve_forex_data_from_redis(state).await {
|
|
|
|
|
Ok(Some(data)) => call_forex_api_if_redis_data_expired(state, data, call_delay).await,
|
|
|
|
|
Ok(Some(data)) => {
|
|
|
|
|
call_forex_api_if_redis_data_expired(state, data, data_expiration_delay).await
|
|
|
|
|
}
|
|
|
|
|
Ok(None) => {
|
|
|
|
|
// No data in local as well as redis
|
|
|
|
|
call_forex_api_and_save_data_to_cache_and_redis(state, None).await?;
|
|
|
|
|
Err(ForexCacheError::ForexDataUnavailable.into())
|
|
|
|
|
Err(ForexError::ForexDataUnavailable.into())
|
|
|
|
|
}
|
|
|
|
|
Err(error) => {
|
|
|
|
|
// Error in deriving forex rates from redis
|
|
|
|
|
logger::error!("forex_error: {:?}", error);
|
|
|
|
|
call_forex_api_and_save_data_to_cache_and_redis(state, None).await?;
|
|
|
|
|
Err(ForexCacheError::ForexDataUnavailable.into())
|
|
|
|
|
Err(ForexError::ForexDataUnavailable.into())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@ -198,11 +200,11 @@ async fn call_api_if_redis_forex_data_expired(
|
|
|
|
|
async fn call_forex_api_and_save_data_to_cache_and_redis(
|
|
|
|
|
state: &SessionState,
|
|
|
|
|
stale_redis_data: Option<FxExchangeRatesCacheEntry>,
|
|
|
|
|
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
|
|
|
|
|
) -> CustomResult<FxExchangeRatesCacheEntry, ForexError> {
|
|
|
|
|
// spawn a new thread and do the api fetch and write operations on redis.
|
|
|
|
|
let forex_api_key = state.conf.forex_api.get_inner().api_key.peek();
|
|
|
|
|
if forex_api_key.is_empty() {
|
|
|
|
|
Err(ForexCacheError::ConfigurationError("api_keys not provided".into()).into())
|
|
|
|
|
Err(ForexError::ConfigurationError("api_keys not provided".into()).into())
|
|
|
|
|
} else {
|
|
|
|
|
let state = state.clone();
|
|
|
|
|
tokio::spawn(
|
|
|
|
|
@ -216,16 +218,16 @@ async fn call_forex_api_and_save_data_to_cache_and_redis(
|
|
|
|
|
}
|
|
|
|
|
.in_current_span(),
|
|
|
|
|
);
|
|
|
|
|
stale_redis_data.ok_or(ForexCacheError::EntryNotFound.into())
|
|
|
|
|
stale_redis_data.ok_or(ForexError::EntryNotFound.into())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn acquire_redis_lock_and_call_forex_api(
|
|
|
|
|
state: &SessionState,
|
|
|
|
|
) -> CustomResult<(), ForexCacheError> {
|
|
|
|
|
) -> CustomResult<(), ForexError> {
|
|
|
|
|
let lock_acquired = acquire_redis_lock(state).await?;
|
|
|
|
|
if !lock_acquired {
|
|
|
|
|
Err(ForexCacheError::CouldNotAcquireLock.into())
|
|
|
|
|
Err(ForexError::CouldNotAcquireLock.into())
|
|
|
|
|
} else {
|
|
|
|
|
logger::debug!("forex_log: redis lock acquired");
|
|
|
|
|
let api_rates = fetch_forex_rates_from_primary_api(state).await;
|
|
|
|
|
@ -250,7 +252,7 @@ async fn acquire_redis_lock_and_call_forex_api(
|
|
|
|
|
async fn save_forex_data_to_cache_and_redis(
|
|
|
|
|
state: &SessionState,
|
|
|
|
|
forex: FxExchangeRatesCacheEntry,
|
|
|
|
|
) -> CustomResult<(), ForexCacheError> {
|
|
|
|
|
) -> CustomResult<(), ForexError> {
|
|
|
|
|
save_forex_data_to_redis(state, &forex)
|
|
|
|
|
.await
|
|
|
|
|
.async_and_then(|_rates| release_redis_lock(state))
|
|
|
|
|
@ -262,9 +264,9 @@ async fn save_forex_data_to_cache_and_redis(
|
|
|
|
|
async fn call_forex_api_if_redis_data_expired(
|
|
|
|
|
state: &SessionState,
|
|
|
|
|
redis_data: FxExchangeRatesCacheEntry,
|
|
|
|
|
call_delay: i64,
|
|
|
|
|
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
|
|
|
|
|
match is_redis_expired(Some(redis_data.clone()).as_ref(), call_delay).await {
|
|
|
|
|
data_expiration_delay: u32,
|
|
|
|
|
) -> CustomResult<FxExchangeRatesCacheEntry, ForexError> {
|
|
|
|
|
match is_redis_expired(Some(redis_data.clone()).as_ref(), data_expiration_delay).await {
|
|
|
|
|
Some(redis_forex) => {
|
|
|
|
|
// Valid data present in redis
|
|
|
|
|
let exchange_rates = FxExchangeRatesCacheEntry::new(redis_forex.as_ref().clone());
|
|
|
|
|
@ -281,7 +283,7 @@ async fn call_forex_api_if_redis_data_expired(
|
|
|
|
|
|
|
|
|
|
async fn fetch_forex_rates_from_primary_api(
|
|
|
|
|
state: &SessionState,
|
|
|
|
|
) -> Result<FxExchangeRatesCacheEntry, error_stack::Report<ForexCacheError>> {
|
|
|
|
|
) -> Result<FxExchangeRatesCacheEntry, error_stack::Report<ForexError>> {
|
|
|
|
|
let forex_api_key = state.conf.forex_api.get_inner().api_key.peek();
|
|
|
|
|
|
|
|
|
|
logger::debug!("forex_log: Primary api call for forex fetch");
|
|
|
|
|
@ -301,12 +303,12 @@ async fn fetch_forex_rates_from_primary_api(
|
|
|
|
|
false,
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
.change_context(ForexCacheError::ApiUnresponsive)
|
|
|
|
|
.change_context(ForexError::ApiUnresponsive)
|
|
|
|
|
.attach_printable("Primary forex fetch api unresponsive")?;
|
|
|
|
|
let forex_response = response
|
|
|
|
|
.json::<ForexResponse>()
|
|
|
|
|
.await
|
|
|
|
|
.change_context(ForexCacheError::ParsingError)
|
|
|
|
|
.change_context(ForexError::ParsingError)
|
|
|
|
|
.attach_printable(
|
|
|
|
|
"Unable to parse response received from primary api into ForexResponse",
|
|
|
|
|
)?;
|
|
|
|
|
@ -347,7 +349,7 @@ async fn fetch_forex_rates_from_primary_api(
|
|
|
|
|
|
|
|
|
|
pub async fn fetch_forex_rates_from_fallback_api(
|
|
|
|
|
state: &SessionState,
|
|
|
|
|
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
|
|
|
|
|
) -> CustomResult<FxExchangeRatesCacheEntry, ForexError> {
|
|
|
|
|
let fallback_forex_api_key = state.conf.forex_api.get_inner().fallback_api_key.peek();
|
|
|
|
|
|
|
|
|
|
let fallback_forex_url: String =
|
|
|
|
|
@ -367,13 +369,13 @@ pub async fn fetch_forex_rates_from_fallback_api(
|
|
|
|
|
false,
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
.change_context(ForexCacheError::ApiUnresponsive)
|
|
|
|
|
.change_context(ForexError::ApiUnresponsive)
|
|
|
|
|
.attach_printable("Fallback forex fetch api unresponsive")?;
|
|
|
|
|
|
|
|
|
|
let fallback_forex_response = response
|
|
|
|
|
.json::<FallbackForexResponse>()
|
|
|
|
|
.await
|
|
|
|
|
.change_context(ForexCacheError::ParsingError)
|
|
|
|
|
.change_context(ForexError::ParsingError)
|
|
|
|
|
.attach_printable(
|
|
|
|
|
"Unable to parse response received from falback api into ForexResponse",
|
|
|
|
|
)?;
|
|
|
|
|
@ -432,74 +434,76 @@ pub async fn fetch_forex_rates_from_fallback_api(
|
|
|
|
|
|
|
|
|
|
async fn release_redis_lock(
|
|
|
|
|
state: &SessionState,
|
|
|
|
|
) -> Result<DelReply, error_stack::Report<ForexCacheError>> {
|
|
|
|
|
) -> Result<DelReply, error_stack::Report<ForexError>> {
|
|
|
|
|
logger::debug!("forex_log: Releasing redis lock");
|
|
|
|
|
state
|
|
|
|
|
.store
|
|
|
|
|
.get_redis_conn()
|
|
|
|
|
.change_context(ForexCacheError::RedisConnectionError)?
|
|
|
|
|
.change_context(ForexError::RedisConnectionError)?
|
|
|
|
|
.delete_key(&REDIX_FOREX_CACHE_KEY.into())
|
|
|
|
|
.await
|
|
|
|
|
.change_context(ForexCacheError::RedisLockReleaseFailed)
|
|
|
|
|
.change_context(ForexError::RedisLockReleaseFailed)
|
|
|
|
|
.attach_printable("Unable to release redis lock")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn acquire_redis_lock(state: &SessionState) -> CustomResult<bool, ForexCacheError> {
|
|
|
|
|
async fn acquire_redis_lock(state: &SessionState) -> CustomResult<bool, ForexError> {
|
|
|
|
|
let forex_api = state.conf.forex_api.get_inner();
|
|
|
|
|
logger::debug!("forex_log: Acquiring redis lock");
|
|
|
|
|
state
|
|
|
|
|
.store
|
|
|
|
|
.get_redis_conn()
|
|
|
|
|
.change_context(ForexCacheError::RedisConnectionError)?
|
|
|
|
|
.change_context(ForexError::RedisConnectionError)?
|
|
|
|
|
.set_key_if_not_exists_with_expiry(
|
|
|
|
|
&REDIX_FOREX_CACHE_KEY.into(),
|
|
|
|
|
"",
|
|
|
|
|
Some(
|
|
|
|
|
i64::try_from(forex_api.redis_lock_timeout)
|
|
|
|
|
.change_context(ForexCacheError::ConversionError)?,
|
|
|
|
|
),
|
|
|
|
|
Some(i64::from(forex_api.redis_lock_timeout_in_seconds)),
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
.map(|val| matches!(val, redis_interface::SetnxReply::KeySet))
|
|
|
|
|
.change_context(ForexCacheError::CouldNotAcquireLock)
|
|
|
|
|
.change_context(ForexError::CouldNotAcquireLock)
|
|
|
|
|
.attach_printable("Unable to acquire redis lock")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn save_forex_data_to_redis(
|
|
|
|
|
app_state: &SessionState,
|
|
|
|
|
forex_exchange_cache_entry: &FxExchangeRatesCacheEntry,
|
|
|
|
|
) -> CustomResult<(), ForexCacheError> {
|
|
|
|
|
) -> CustomResult<(), ForexError> {
|
|
|
|
|
let forex_api = app_state.conf.forex_api.get_inner();
|
|
|
|
|
logger::debug!("forex_log: Saving forex to redis");
|
|
|
|
|
app_state
|
|
|
|
|
.store
|
|
|
|
|
.get_redis_conn()
|
|
|
|
|
.change_context(ForexCacheError::RedisConnectionError)?
|
|
|
|
|
.serialize_and_set_key(&REDIX_FOREX_CACHE_DATA.into(), forex_exchange_cache_entry)
|
|
|
|
|
.change_context(ForexError::RedisConnectionError)?
|
|
|
|
|
.serialize_and_set_key_with_expiry(
|
|
|
|
|
&REDIX_FOREX_CACHE_DATA.into(),
|
|
|
|
|
forex_exchange_cache_entry,
|
|
|
|
|
i64::from(forex_api.redis_ttl_in_seconds),
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
.change_context(ForexCacheError::RedisWriteError)
|
|
|
|
|
.change_context(ForexError::RedisWriteError)
|
|
|
|
|
.attach_printable("Unable to save forex data to redis")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn retrieve_forex_data_from_redis(
|
|
|
|
|
app_state: &SessionState,
|
|
|
|
|
) -> CustomResult<Option<FxExchangeRatesCacheEntry>, ForexCacheError> {
|
|
|
|
|
) -> CustomResult<Option<FxExchangeRatesCacheEntry>, ForexError> {
|
|
|
|
|
logger::debug!("forex_log: Retrieving forex from redis");
|
|
|
|
|
app_state
|
|
|
|
|
.store
|
|
|
|
|
.get_redis_conn()
|
|
|
|
|
.change_context(ForexCacheError::RedisConnectionError)?
|
|
|
|
|
.change_context(ForexError::RedisConnectionError)?
|
|
|
|
|
.get_and_deserialize_key(&REDIX_FOREX_CACHE_DATA.into(), "FxExchangeRatesCache")
|
|
|
|
|
.await
|
|
|
|
|
.change_context(ForexCacheError::EntryNotFound)
|
|
|
|
|
.change_context(ForexError::EntryNotFound)
|
|
|
|
|
.attach_printable("Forex entry not found in redis")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn is_redis_expired(
|
|
|
|
|
redis_cache: Option<&FxExchangeRatesCacheEntry>,
|
|
|
|
|
call_delay: i64,
|
|
|
|
|
data_expiration_delay: u32,
|
|
|
|
|
) -> Option<Arc<ExchangeRates>> {
|
|
|
|
|
redis_cache.and_then(|cache| {
|
|
|
|
|
if cache.timestamp + call_delay > date_time::now_unix_timestamp() {
|
|
|
|
|
if cache.timestamp + i64::from(data_expiration_delay) > date_time::now_unix_timestamp() {
|
|
|
|
|
Some(cache.data.clone())
|
|
|
|
|
} else {
|
|
|
|
|
logger::debug!("forex_log: Forex stored in redis is expired");
|
|
|
|
|
@ -514,23 +518,23 @@ pub async fn convert_currency(
|
|
|
|
|
amount: i64,
|
|
|
|
|
to_currency: String,
|
|
|
|
|
from_currency: String,
|
|
|
|
|
) -> CustomResult<api_models::currency::CurrencyConversionResponse, ForexCacheError> {
|
|
|
|
|
) -> CustomResult<api_models::currency::CurrencyConversionResponse, ForexError> {
|
|
|
|
|
let forex_api = state.conf.forex_api.get_inner();
|
|
|
|
|
let rates = get_forex_rates(&state, forex_api.call_delay)
|
|
|
|
|
let rates = get_forex_rates(&state, forex_api.data_expiration_delay_in_seconds)
|
|
|
|
|
.await
|
|
|
|
|
.change_context(ForexCacheError::ApiError)?;
|
|
|
|
|
.change_context(ForexError::ApiError)?;
|
|
|
|
|
|
|
|
|
|
let to_currency = enums::Currency::from_str(to_currency.as_str())
|
|
|
|
|
.change_context(ForexCacheError::CurrencyNotAcceptable)
|
|
|
|
|
.change_context(ForexError::CurrencyNotAcceptable)
|
|
|
|
|
.attach_printable("The provided currency is not acceptable")?;
|
|
|
|
|
|
|
|
|
|
let from_currency = enums::Currency::from_str(from_currency.as_str())
|
|
|
|
|
.change_context(ForexCacheError::CurrencyNotAcceptable)
|
|
|
|
|
.change_context(ForexError::CurrencyNotAcceptable)
|
|
|
|
|
.attach_printable("The provided currency is not acceptable")?;
|
|
|
|
|
|
|
|
|
|
let converted_amount =
|
|
|
|
|
currency_conversion::conversion::convert(&rates.data, from_currency, to_currency, amount)
|
|
|
|
|
.change_context(ForexCacheError::ConversionError)
|
|
|
|
|
.change_context(ForexError::ConversionError)
|
|
|
|
|
.attach_printable("Unable to perform currency conversion")?;
|
|
|
|
|
|
|
|
|
|
Ok(api_models::currency::CurrencyConversionResponse {
|
|
|
|
|
|