feat(currency_conversion): add currency conversion feature (#2948)

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Prajjwal Kumar
2023-11-28 16:05:04 +05:30
committed by GitHub
parent 0e66b1b5dc
commit c0116db271
27 changed files with 1501 additions and 10 deletions

View File

@ -76,6 +76,7 @@ regex = "1.8.4"
reqwest = { version = "0.11.18", features = ["json", "native-tls", "gzip", "multipart"] }
ring = "0.16.20"
roxmltree = "0.18.0"
rust_decimal = { version = "1.30.0", features = ["serde-with-float", "serde-with-str"] }
rustc-hash = "1.1.0"
serde = { version = "1.0.163", features = ["derive"] }
serde_json = "1.0.96"
@ -85,7 +86,7 @@ serde_urlencoded = "0.7.1"
serde_with = "3.0.0"
sha-1 = { version = "0.9" }
sqlx = { version = "0.6.3", features = ["postgres", "runtime-actix", "runtime-actix-native-tls", "time", "bigdecimal"] }
strum = { version = "0.24.1", features = ["derive"] }
strum = { version = "0.25", features = ["derive"] }
tera = "1.19.1"
thiserror = "1.0.40"
time = { version = "0.3.21", features = ["serde", "serde-well-known", "std"] }
@ -104,6 +105,7 @@ api_models = { version = "0.1.0", path = "../api_models", features = ["errors"]
cards = { version = "0.1.0", path = "../cards" }
common_enums = { version = "0.1.0", path = "../common_enums" }
common_utils = { version = "0.1.0", path = "../common_utils", features = ["signals", "async_ext", "logs"] }
currency_conversion = { version = "0.1.0", path = "../currency_conversion" }
data_models = { version = "0.1.0", path = "../data_models", default-features = false }
diesel_models = { version = "0.1.0", path = "../diesel_models", features = ["kv_store"] }
euclid = { version = "0.1.0", path = "../euclid", features = ["valued_jit"] }

View File

@ -13,6 +13,7 @@ use external_services::email::EmailSettings;
use external_services::kms;
use redis_interface::RedisSettings;
pub use router_env::config::{Log, LogConsole, LogFile, LogTelemetry};
use rust_decimal::Decimal;
use scheduler::SchedulerSettings;
use serde::{de::Error, Deserialize, Deserializer};
@ -70,6 +71,7 @@ pub struct Settings {
pub secrets: Secrets,
pub locker: Locker,
pub connectors: Connectors,
pub forex_api: ForexApi,
pub refund: Refund,
pub eph_key: EphemeralConfig,
pub scheduler: Option<SchedulerSettings>,
@ -119,6 +121,37 @@ pub struct PaymentLink {
pub sdk_url: String,
}
#[derive(Debug, Deserialize, Clone, Default)]
#[serde(default)]
pub struct ForexApi {
pub local_fetch_retry_count: u64,
pub api_key: masking::Secret<String>,
pub fallback_api_key: masking::Secret<String>,
/// in ms
pub call_delay: i64,
/// in ms
pub local_fetch_retry_delay: u64,
/// in ms
pub api_timeout: u64,
/// in ms
pub redis_lock_timeout: u64,
}
#[derive(Debug, Deserialize, Clone, Default)]
pub struct DefaultExchangeRates {
pub base_currency: String,
pub conversion: HashMap<String, Conversion>,
pub timestamp: i64,
}
#[derive(Debug, Deserialize, Clone, Default)]
pub struct Conversion {
#[serde(with = "rust_decimal::serde::str")]
pub to_factor: Decimal,
#[serde(with = "rust_decimal::serde::str")]
pub from_factor: Decimal,
}
#[derive(Debug, Deserialize, Clone, Default)]
#[serde(default)]
pub struct ApplepayMerchantConfigs {

View File

@ -5,6 +5,8 @@ pub mod cache;
pub mod cards_info;
pub mod conditional_config;
pub mod configs;
#[cfg(any(feature = "olap", feature = "oltp"))]
pub mod currency;
pub mod customers;
pub mod disputes;
pub mod errors;

View File

@ -0,0 +1,51 @@
use common_utils::errors::CustomResult;
use error_stack::ResultExt;
use crate::{
core::errors::ApiErrorResponse,
services::ApplicationResponse,
utils::currency::{self, convert_currency, get_forex_rates},
AppState,
};
pub async fn retrieve_forex(
state: AppState,
) -> CustomResult<ApplicationResponse<currency::FxExchangeRatesCacheEntry>, ApiErrorResponse> {
Ok(ApplicationResponse::Json(
get_forex_rates(
&state,
state.conf.forex_api.call_delay,
state.conf.forex_api.local_fetch_retry_delay,
state.conf.forex_api.local_fetch_retry_count,
#[cfg(feature = "kms")]
&state.conf.kms,
)
.await
.change_context(ApiErrorResponse::GenericNotFoundError {
message: "Unable to fetch forex rates".to_string(),
})?,
))
}
pub async fn convert_forex(
state: AppState,
amount: i64,
to_currency: String,
from_currency: String,
) -> CustomResult<
ApplicationResponse<api_models::currency::CurrencyConversionResponse>,
ApiErrorResponse,
> {
Ok(ApplicationResponse::Json(
Box::pin(convert_currency(
state.clone(),
amount,
to_currency,
from_currency,
#[cfg(feature = "kms")]
&state.conf.kms,
))
.await
.change_context(ApiErrorResponse::InternalServerError)?,
))
}

View File

@ -122,6 +122,7 @@ pub fn mk_app(
.service(routes::Payments::server(state.clone()))
.service(routes::Customers::server(state.clone()))
.service(routes::Configs::server(state.clone()))
.service(routes::Forex::server(state.clone()))
.service(routes::Refunds::server(state.clone()))
.service(routes::MerchantConnectorAccount::server(state.clone()))
.service(routes::Mandates::server(state.clone()))

View File

@ -4,6 +4,8 @@ pub mod app;
pub mod cache;
pub mod cards_info;
pub mod configs;
#[cfg(any(feature = "olap", feature = "oltp"))]
pub mod currency;
pub mod customers;
pub mod disputes;
#[cfg(feature = "dummy_connector")]
@ -32,6 +34,8 @@ pub mod webhooks;
pub mod locker_migration;
#[cfg(feature = "dummy_connector")]
pub use self::app::DummyConnector;
#[cfg(any(feature = "olap", feature = "oltp"))]
pub use self::app::Forex;
#[cfg(feature = "payouts")]
pub use self::app::Payouts;
#[cfg(feature = "olap")]

View File

@ -10,6 +10,8 @@ use scheduler::SchedulerInterface;
use storage_impl::MockDb;
use tokio::sync::oneshot;
#[cfg(any(feature = "olap", feature = "oltp"))]
use super::currency;
#[cfg(feature = "dummy_connector")]
use super::dummy_connector::*;
#[cfg(feature = "payouts")]
@ -28,7 +30,7 @@ use super::{cache::*, health::*};
use super::{configs::*, customers::*, mandates::*, payments::*, refunds::*};
#[cfg(feature = "oltp")]
use super::{ephemeral_key::*, payment_methods::*, webhooks::*};
use crate::{
pub use crate::{
configs::settings,
db::{StorageImpl, StorageInterface},
events::{event_logger::EventLogger, EventHandler},
@ -302,6 +304,22 @@ impl Payments {
}
}
#[cfg(any(feature = "olap", feature = "oltp"))]
pub struct Forex;
#[cfg(any(feature = "olap", feature = "oltp"))]
impl Forex {
pub fn server(state: AppState) -> Scope {
web::scope("/forex")
.app_data(web::Data::new(state.clone()))
.app_data(web::Data::new(state.clone()))
.service(web::resource("/rates").route(web::get().to(currency::retrieve_forex)))
.service(
web::resource("/convert_from_minor").route(web::get().to(currency::convert_forex)),
)
}
}
#[cfg(feature = "olap")]
pub struct Routing;

View File

@ -0,0 +1,58 @@
use actix_web::{web, HttpRequest, HttpResponse};
use router_env::Flow;
use crate::{
core::{api_locking, currency},
routes::AppState,
services::{api, authentication as auth, authorization::permissions::Permission},
};
pub async fn retrieve_forex(state: web::Data<AppState>, req: HttpRequest) -> HttpResponse {
let flow = Flow::RetrieveForexFlow;
Box::pin(api::server_wrap(
flow,
state,
&req,
(),
|state, _auth: auth::AuthenticationData, _| currency::retrieve_forex(state),
auth::auth_type(
&auth::ApiKeyAuth,
&auth::JWTAuth(Permission::ForexRead),
req.headers(),
),
api_locking::LockAction::NotApplicable,
))
.await
}
pub async fn convert_forex(
state: web::Data<AppState>,
req: HttpRequest,
params: web::Query<api_models::currency::CurrencyConversionParams>,
) -> HttpResponse {
let flow = Flow::RetrieveForexFlow;
let amount = &params.amount;
let to_currency = &params.to_currency;
let from_currency = &params.from_currency;
Box::pin(api::server_wrap(
flow,
state.clone(),
&req,
(),
|state, _, _| {
currency::convert_forex(
state,
*amount,
to_currency.to_string(),
from_currency.to_string(),
)
},
auth::auth_type(
&auth::ApiKeyAuth,
&auth::JWTAuth(Permission::ForexRead),
req.headers(),
),
api_locking::LockAction::NotApplicable,
))
.await
}

View File

@ -23,6 +23,7 @@ pub enum ApiIdentifier {
ApiKeys,
PaymentLink,
Routing,
Forex,
RustLockerMigration,
Gsm,
User,
@ -51,6 +52,8 @@ impl From<Flow> for ApiIdentifier {
| Flow::DecisionManagerRetrieveConfig
| Flow::DecisionManagerUpsertConfig => Self::Routing,
Flow::RetrieveForexFlow => Self::Forex,
Flow::MerchantConnectorsCreate
| Flow::MerchantConnectorsRetrieve
| Flow::MerchantConnectorsUpdate

View File

@ -1,11 +1,11 @@
pub mod currency;
pub mod custom_serde;
pub mod db_utils;
pub mod ext_traits;
#[cfg(feature = "olap")]
pub mod user;
#[cfg(feature = "kv_store")]
pub mod storage_partitioning;
#[cfg(feature = "olap")]
pub mod user;
use std::fmt::Debug;

View File

@ -0,0 +1,641 @@
use std::{collections::HashMap, ops::Deref, str::FromStr, sync::Arc, time::Duration};
use api_models::enums;
use common_utils::{date_time, errors::CustomResult, events::ApiEventMetric, ext_traits::AsyncExt};
use currency_conversion::types::{CurrencyFactors, ExchangeRates};
use error_stack::{IntoReport, ResultExt};
#[cfg(feature = "kms")]
use external_services::kms;
use masking::PeekInterface;
use once_cell::sync::Lazy;
use redis_interface::DelReply;
use rust_decimal::Decimal;
use strum::IntoEnumIterator;
use tokio::{sync::RwLock, time::sleep};
use crate::{
logger,
routes::app::settings::{Conversion, DefaultExchangeRates},
services, AppState,
};
const REDIX_FOREX_CACHE_KEY: &str = "{forex_cache}_lock";
const REDIX_FOREX_CACHE_DATA: &str = "{forex_cache}_data";
const FOREX_API_TIMEOUT: u64 = 5;
const FOREX_BASE_URL: &str = "https://openexchangerates.org/api/latest.json?app_id=";
const FOREX_BASE_CURRENCY: &str = "&base=USD";
const FALLBACK_FOREX_BASE_URL: &str = "http://apilayer.net/api/live?access_key=";
const FALLBACK_FOREX_API_CURRENCY_PREFIX: &str = "USD";
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct FxExchangeRatesCacheEntry {
data: Arc<ExchangeRates>,
timestamp: i64,
}
static FX_EXCHANGE_RATES_CACHE: Lazy<RwLock<Option<FxExchangeRatesCacheEntry>>> =
Lazy::new(|| RwLock::new(None));
impl ApiEventMetric for FxExchangeRatesCacheEntry {}
#[derive(Debug, Clone, thiserror::Error)]
pub enum ForexCacheError {
#[error("API error")]
ApiError,
#[error("API timeout")]
ApiTimeout,
#[error("API unresponsive")]
ApiUnresponsive,
#[error("Conversion error")]
ConversionError,
#[error("Could not acquire the lock for cache entry")]
CouldNotAcquireLock,
#[error("Provided currency not acceptable")]
CurrencyNotAcceptable,
#[error("Incorrect entries in default Currency response")]
DefaultCurrencyParsingError,
#[error("Entry not found in cache")]
EntryNotFound,
#[error("Expiration time invalid")]
InvalidLogExpiry,
#[error("Error reading local")]
LocalReadError,
#[error("Error writing to local cache")]
LocalWriteError,
#[error("Json Parsing error")]
ParsingError,
#[error("Kms decryption error")]
KmsDecryptionFailed,
#[error("Error connecting to redis")]
RedisConnectionError,
#[error("Not able to release write lock")]
RedisLockReleaseFailed,
#[error("Error writing to redis")]
RedisWriteError,
#[error("Not able to acquire write lock")]
WriteLockNotAcquired,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct ForexResponse {
pub rates: HashMap<String, FloatDecimal>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct FallbackForexResponse {
pub quotes: HashMap<String, FloatDecimal>,
}
#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize)]
#[serde(transparent)]
struct FloatDecimal(#[serde(with = "rust_decimal::serde::float")] Decimal);
impl Deref for FloatDecimal {
type Target = Decimal;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl FxExchangeRatesCacheEntry {
fn new(exchange_rate: ExchangeRates) -> Self {
Self {
data: Arc::new(exchange_rate),
timestamp: date_time::now_unix_timestamp(),
}
}
fn is_expired(&self, call_delay: i64) -> bool {
self.timestamp + call_delay < date_time::now_unix_timestamp()
}
}
async fn retrieve_forex_from_local() -> Option<FxExchangeRatesCacheEntry> {
FX_EXCHANGE_RATES_CACHE.read().await.clone()
}
async fn save_forex_to_local(
exchange_rates_cache_entry: FxExchangeRatesCacheEntry,
) -> CustomResult<(), ForexCacheError> {
let mut local = FX_EXCHANGE_RATES_CACHE.write().await;
*local = Some(exchange_rates_cache_entry);
Ok(())
}
// Alternative handler for handling the case, When no data in local as well as redis
#[allow(dead_code)]
async fn waited_fetch_and_update_caches(
state: &AppState,
local_fetch_retry_delay: u64,
local_fetch_retry_count: u64,
#[cfg(feature = "kms")] kms_config: &kms::KmsConfig,
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
for _n in 1..local_fetch_retry_count {
sleep(Duration::from_millis(local_fetch_retry_delay)).await;
//read from redis and update local plus break the loop and return
match retrieve_forex_from_redis(state).await {
Ok(Some(rates)) => {
save_forex_to_local(rates.clone()).await?;
return Ok(rates.clone());
}
Ok(None) => continue,
Err(e) => {
logger::error!(?e);
continue;
}
}
}
//acquire lock one last time and try to fetch and update local & redis
successive_fetch_and_save_forex(
state,
None,
#[cfg(feature = "kms")]
kms_config,
)
.await
}
impl TryFrom<DefaultExchangeRates> for ExchangeRates {
type Error = error_stack::Report<ForexCacheError>;
fn try_from(value: DefaultExchangeRates) -> Result<Self, Self::Error> {
let mut conversion_usable: HashMap<enums::Currency, CurrencyFactors> = HashMap::new();
for (curr, conversion) in value.conversion {
let enum_curr = enums::Currency::from_str(curr.as_str())
.into_report()
.change_context(ForexCacheError::ConversionError)?;
conversion_usable.insert(enum_curr, CurrencyFactors::from(conversion));
}
let base_curr = enums::Currency::from_str(value.base_currency.as_str())
.into_report()
.change_context(ForexCacheError::ConversionError)?;
Ok(Self {
base_currency: base_curr,
conversion: conversion_usable,
})
}
}
impl From<Conversion> for CurrencyFactors {
fn from(value: Conversion) -> Self {
Self {
to_factor: value.to_factor,
from_factor: value.from_factor,
}
}
}
pub async fn get_forex_rates(
state: &AppState,
call_delay: i64,
local_fetch_retry_delay: u64,
local_fetch_retry_count: u64,
#[cfg(feature = "kms")] kms_config: &kms::KmsConfig,
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
if let Some(local_rates) = retrieve_forex_from_local().await {
if local_rates.is_expired(call_delay) {
// expired local data
handler_local_expired(
state,
call_delay,
local_rates,
#[cfg(feature = "kms")]
kms_config,
)
.await
} else {
// Valid data present in local
Ok(local_rates)
}
} else {
// No data in local
handler_local_no_data(
state,
call_delay,
local_fetch_retry_delay,
local_fetch_retry_count,
#[cfg(feature = "kms")]
kms_config,
)
.await
}
}
async fn handler_local_no_data(
state: &AppState,
call_delay: i64,
_local_fetch_retry_delay: u64,
_local_fetch_retry_count: u64,
#[cfg(feature = "kms")] kms_config: &kms::KmsConfig,
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
match retrieve_forex_from_redis(state).await {
Ok(Some(data)) => {
fallback_forex_redis_check(
state,
data,
call_delay,
#[cfg(feature = "kms")]
kms_config,
)
.await
}
Ok(None) => {
// No data in local as well as redis
Ok(successive_fetch_and_save_forex(
state,
None,
#[cfg(feature = "kms")]
kms_config,
)
.await?)
}
Err(err) => {
logger::error!(?err);
Ok(successive_fetch_and_save_forex(
state,
None,
#[cfg(feature = "kms")]
kms_config,
)
.await?)
}
}
}
async fn successive_fetch_and_save_forex(
state: &AppState,
stale_redis_data: Option<FxExchangeRatesCacheEntry>,
#[cfg(feature = "kms")] kms_config: &kms::KmsConfig,
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
match acquire_redis_lock(state).await {
Ok(lock_acquired) => {
if !lock_acquired {
return stale_redis_data.ok_or(ForexCacheError::CouldNotAcquireLock.into());
}
let api_rates = fetch_forex_rates(
state,
#[cfg(feature = "kms")]
kms_config,
)
.await;
match api_rates {
Ok(rates) => successive_save_data_to_redis_local(state, rates).await,
Err(err) => {
// API not able to fetch data call secondary service
logger::error!(?err);
let secondary_api_rates = fallback_fetch_forex_rates(
state,
#[cfg(feature = "kms")]
kms_config,
)
.await;
match secondary_api_rates {
Ok(rates) => Ok(successive_save_data_to_redis_local(state, rates).await?),
Err(err) => stale_redis_data.ok_or({
logger::error!(?err);
ForexCacheError::ApiUnresponsive.into()
}),
}
}
}
}
Err(e) => stale_redis_data.ok_or({
logger::error!(?e);
ForexCacheError::ApiUnresponsive.into()
}),
}
}
async fn successive_save_data_to_redis_local(
state: &AppState,
forex: FxExchangeRatesCacheEntry,
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
Ok(save_forex_to_redis(state, &forex)
.await
.async_and_then(|_rates| async { release_redis_lock(state).await })
.await
.async_and_then(|_val| async { Ok(save_forex_to_local(forex.clone()).await) })
.await
.map_or_else(
|e| {
logger::error!(?e);
forex.clone()
},
|_| forex.clone(),
))
}
async fn fallback_forex_redis_check(
state: &AppState,
redis_data: FxExchangeRatesCacheEntry,
call_delay: i64,
#[cfg(feature = "kms")] kms_config: &kms::KmsConfig,
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
match is_redis_expired(Some(redis_data.clone()).as_ref(), call_delay).await {
Some(redis_forex) => {
// Valid data present in redis
let exchange_rates = FxExchangeRatesCacheEntry::new(redis_forex.as_ref().clone());
save_forex_to_local(exchange_rates.clone()).await?;
Ok(exchange_rates)
}
None => {
// redis expired
successive_fetch_and_save_forex(
state,
Some(redis_data),
#[cfg(feature = "kms")]
kms_config,
)
.await
}
}
}
async fn handler_local_expired(
state: &AppState,
call_delay: i64,
local_rates: FxExchangeRatesCacheEntry,
#[cfg(feature = "kms")] kms_config: &kms::KmsConfig,
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
match retrieve_forex_from_redis(state).await {
Ok(redis_data) => {
match is_redis_expired(redis_data.as_ref(), call_delay).await {
Some(redis_forex) => {
// Valid data present in redis
let exchange_rates =
FxExchangeRatesCacheEntry::new(redis_forex.as_ref().clone());
save_forex_to_local(exchange_rates.clone()).await?;
Ok(exchange_rates)
}
None => {
// Redis is expired going for API request
successive_fetch_and_save_forex(
state,
Some(local_rates),
#[cfg(feature = "kms")]
kms_config,
)
.await
}
}
}
Err(e) => {
// data not present in redis waited fetch
logger::error!(?e);
successive_fetch_and_save_forex(
state,
Some(local_rates),
#[cfg(feature = "kms")]
kms_config,
)
.await
}
}
}
async fn fetch_forex_rates(
state: &AppState,
#[cfg(feature = "kms")] kms_config: &kms::KmsConfig,
) -> Result<FxExchangeRatesCacheEntry, error_stack::Report<ForexCacheError>> {
#[cfg(feature = "kms")]
let forex_api_key = kms::get_kms_client(kms_config)
.await
.decrypt(state.conf.forex_api.api_key.peek())
.await
.change_context(ForexCacheError::KmsDecryptionFailed)?;
#[cfg(not(feature = "kms"))]
let forex_api_key = state.conf.forex_api.api_key.peek();
let forex_url: String = format!("{}{}{}", FOREX_BASE_URL, forex_api_key, FOREX_BASE_CURRENCY);
let forex_request = services::RequestBuilder::new()
.method(services::Method::Get)
.url(&forex_url)
.build();
logger::info!(?forex_request);
let response = state
.api_client
.send_request(
&state.clone(),
forex_request,
Some(FOREX_API_TIMEOUT),
false,
)
.await
.change_context(ForexCacheError::ApiUnresponsive)?;
let forex_response = response
.json::<ForexResponse>()
.await
.into_report()
.change_context(ForexCacheError::ParsingError)?;
logger::info!("{:?}", forex_response);
let mut conversions: HashMap<enums::Currency, CurrencyFactors> = HashMap::new();
for enum_curr in enums::Currency::iter() {
match forex_response.rates.get(&enum_curr.to_string()) {
Some(rate) => {
let from_factor = match Decimal::new(1, 0).checked_div(**rate) {
Some(rate) => rate,
None => {
logger::error!("Rates for {} not received from API", &enum_curr);
continue;
}
};
let currency_factors = CurrencyFactors::new(**rate, from_factor);
conversions.insert(enum_curr, currency_factors);
}
None => {
logger::error!("Rates for {} not received from API", &enum_curr);
}
};
}
Ok(FxExchangeRatesCacheEntry::new(ExchangeRates::new(
enums::Currency::USD,
conversions,
)))
}
pub async fn fallback_fetch_forex_rates(
state: &AppState,
#[cfg(feature = "kms")] kms_config: &kms::KmsConfig,
) -> CustomResult<FxExchangeRatesCacheEntry, ForexCacheError> {
#[cfg(feature = "kms")]
let fallback_forex_api_key = kms::get_kms_client(kms_config)
.await
.decrypt(state.conf.forex_api.fallback_api_key.peek())
.await
.change_context(ForexCacheError::KmsDecryptionFailed)?;
#[cfg(not(feature = "kms"))]
let fallback_forex_api_key = state.conf.forex_api.fallback_api_key.peek();
let fallback_forex_url: String =
format!("{}{}", FALLBACK_FOREX_BASE_URL, fallback_forex_api_key,);
let fallback_forex_request = services::RequestBuilder::new()
.method(services::Method::Get)
.url(&fallback_forex_url)
.build();
logger::info!(?fallback_forex_request);
let response = state
.api_client
.send_request(
&state.clone(),
fallback_forex_request,
Some(FOREX_API_TIMEOUT),
false,
)
.await
.change_context(ForexCacheError::ApiUnresponsive)?;
let fallback_forex_response = response
.json::<FallbackForexResponse>()
.await
.into_report()
.change_context(ForexCacheError::ParsingError)?;
logger::info!("{:?}", fallback_forex_response);
let mut conversions: HashMap<enums::Currency, CurrencyFactors> = HashMap::new();
for enum_curr in enums::Currency::iter() {
match fallback_forex_response.quotes.get(
format!(
"{}{}",
FALLBACK_FOREX_API_CURRENCY_PREFIX,
&enum_curr.to_string()
)
.as_str(),
) {
Some(rate) => {
let from_factor = match Decimal::new(1, 0).checked_div(**rate) {
Some(rate) => rate,
None => {
logger::error!("Rates for {} not received from API", &enum_curr);
continue;
}
};
let currency_factors = CurrencyFactors::new(**rate, from_factor);
conversions.insert(enum_curr, currency_factors);
}
None => {
logger::error!("Rates for {} not received from API", &enum_curr);
}
};
}
let rates =
FxExchangeRatesCacheEntry::new(ExchangeRates::new(enums::Currency::USD, conversions));
match acquire_redis_lock(state).await {
Ok(_) => Ok(successive_save_data_to_redis_local(state, rates).await?),
Err(e) => {
logger::error!(?e);
Ok(rates)
}
}
}
async fn release_redis_lock(
state: &AppState,
) -> Result<DelReply, error_stack::Report<ForexCacheError>> {
state
.store
.get_redis_conn()
.change_context(ForexCacheError::RedisConnectionError)?
.delete_key(REDIX_FOREX_CACHE_KEY)
.await
.change_context(ForexCacheError::RedisLockReleaseFailed)
}
async fn acquire_redis_lock(app_state: &AppState) -> CustomResult<bool, ForexCacheError> {
app_state
.store
.get_redis_conn()
.change_context(ForexCacheError::RedisConnectionError)?
.set_key_if_not_exists_with_expiry(
REDIX_FOREX_CACHE_KEY,
"",
Some(
(app_state.conf.forex_api.local_fetch_retry_count
* app_state.conf.forex_api.local_fetch_retry_delay
+ app_state.conf.forex_api.api_timeout)
.try_into()
.into_report()
.change_context(ForexCacheError::ConversionError)?,
),
)
.await
.map(|val| matches!(val, redis_interface::SetnxReply::KeySet))
.change_context(ForexCacheError::CouldNotAcquireLock)
}
async fn save_forex_to_redis(
app_state: &AppState,
forex_exchange_cache_entry: &FxExchangeRatesCacheEntry,
) -> CustomResult<(), ForexCacheError> {
app_state
.store
.get_redis_conn()
.change_context(ForexCacheError::RedisConnectionError)?
.serialize_and_set_key(REDIX_FOREX_CACHE_DATA, forex_exchange_cache_entry)
.await
.change_context(ForexCacheError::RedisWriteError)
}
async fn retrieve_forex_from_redis(
app_state: &AppState,
) -> CustomResult<Option<FxExchangeRatesCacheEntry>, ForexCacheError> {
app_state
.store
.get_redis_conn()
.change_context(ForexCacheError::RedisConnectionError)?
.get_and_deserialize_key(REDIX_FOREX_CACHE_DATA, "FxExchangeRatesCache")
.await
.change_context(ForexCacheError::EntryNotFound)
}
async fn is_redis_expired(
redis_cache: Option<&FxExchangeRatesCacheEntry>,
call_delay: i64,
) -> Option<Arc<ExchangeRates>> {
redis_cache.and_then(|cache| {
if cache.timestamp + call_delay > date_time::now_unix_timestamp() {
Some(cache.data.clone())
} else {
None
}
})
}
pub async fn convert_currency(
state: AppState,
amount: i64,
to_currency: String,
from_currency: String,
#[cfg(feature = "kms")] kms_config: &kms::KmsConfig,
) -> CustomResult<api_models::currency::CurrencyConversionResponse, ForexCacheError> {
let rates = get_forex_rates(
&state,
state.conf.forex_api.call_delay,
state.conf.forex_api.local_fetch_retry_delay,
state.conf.forex_api.local_fetch_retry_count,
#[cfg(feature = "kms")]
kms_config,
)
.await
.change_context(ForexCacheError::ApiError)?;
let to_currency = api_models::enums::Currency::from_str(to_currency.as_str())
.into_report()
.change_context(ForexCacheError::CurrencyNotAcceptable)?;
let from_currency = api_models::enums::Currency::from_str(from_currency.as_str())
.into_report()
.change_context(ForexCacheError::CurrencyNotAcceptable)?;
let converted_amount =
currency_conversion::conversion::convert(&rates.data, from_currency, to_currency, amount)
.into_report()
.change_context(ForexCacheError::ConversionError)?;
Ok(api_models::currency::CurrencyConversionResponse {
converted_amount: converted_amount.to_string(),
currency: to_currency.to_string(),
})
}