feat: add cache for api_key and mca tables (#1212)

This commit is contained in:
Kartikeya Hegde
2023-07-07 12:10:49 +05:30
committed by GitHub
parent 06f92c2c4c
commit fc9057ef2c
3 changed files with 373 additions and 54 deletions

View File

@ -1,6 +1,8 @@
use error_stack::IntoReport;
use super::{MockDb, Store};
#[cfg(feature = "accounts_cache")]
use crate::cache::{self, ACCOUNTS_CACHE};
use crate::{
connection,
core::errors::{self, CustomResult},
@ -67,10 +69,46 @@ impl ApiKeyInterface for Store {
api_key: storage::ApiKeyUpdate,
) -> CustomResult<storage::ApiKey, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
let _merchant_id = merchant_id.clone();
let _key_id = key_id.clone();
let update_call = || async {
storage::ApiKey::update_by_merchant_id_key_id(&conn, merchant_id, key_id, api_key)
.await
.map_err(Into::into)
.into_report()
};
#[cfg(not(feature = "accounts_cache"))]
{
update_call().await
}
#[cfg(feature = "accounts_cache")]
{
use error_stack::report;
// We need to fetch api_key here because the key that's saved in cache in HashedApiKey.
// Used function from storage model to reuse the connection that made here instead of
// creating new.
let api_key = storage::ApiKey::find_optional_by_merchant_id_key_id(
&conn,
&_merchant_id,
&_key_id,
)
.await
.map_err(Into::into)
.into_report()?
.ok_or(report!(errors::StorageError::ValueNotFound(format!(
"ApiKey of {_key_id} not found"
))))?;
super::cache::publish_and_redact(
self,
cache::CacheKind::Accounts(api_key.hashed_api_key.into_inner().into()),
update_call,
)
.await
}
}
async fn revoke_api_key(
@ -79,10 +117,41 @@ impl ApiKeyInterface for Store {
key_id: &str,
) -> CustomResult<bool, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
let delete_call = || async {
storage::ApiKey::revoke_by_merchant_id_key_id(&conn, merchant_id, key_id)
.await
.map_err(Into::into)
.into_report()
};
#[cfg(not(feature = "accounts_cache"))]
{
delete_call().await
}
#[cfg(feature = "accounts_cache")]
{
use error_stack::report;
// We need to fetch api_key here because the key that's saved in cache in HashedApiKey.
// Used function from storage model to reuse the connection that made here instead of
// creating new.
let api_key =
storage::ApiKey::find_optional_by_merchant_id_key_id(&conn, merchant_id, key_id)
.await
.map_err(Into::into)
.into_report()?
.ok_or(report!(errors::StorageError::ValueNotFound(format!(
"ApiKey of {key_id} not found"
))))?;
super::cache::publish_and_redact(
self,
cache::CacheKind::Accounts(api_key.hashed_api_key.into_inner().into()),
delete_call,
)
.await
}
}
async fn find_api_key_by_merchant_id_key_id_optional(
@ -101,11 +170,30 @@ impl ApiKeyInterface for Store {
&self,
hashed_api_key: storage::HashedApiKey,
) -> CustomResult<Option<storage::ApiKey>, errors::StorageError> {
let _hashed_api_key = hashed_api_key.clone();
let find_call = || async {
let conn = connection::pg_connection_read(self).await?;
storage::ApiKey::find_optional_by_hashed_api_key(&conn, hashed_api_key)
.await
.map_err(Into::into)
.into_report()
};
#[cfg(not(feature = "accounts_cache"))]
{
find_call().await
}
#[cfg(feature = "accounts_cache")]
{
super::cache::get_or_populate_in_memory(
self,
&_hashed_api_key.into_inner(),
find_call,
&ACCOUNTS_CACHE,
)
.await
}
}
async fn list_api_keys_by_merchant_id(
@ -288,7 +376,9 @@ mod tests {
use time::macros::datetime;
use crate::{
db::{api_keys::ApiKeyInterface, MockDb},
cache::{CacheKind, ACCOUNTS_CACHE},
db::{api_keys::ApiKeyInterface, cache, MockDb},
services::{PubSubInterface, RedisConnInterface},
types::storage,
};
@ -374,4 +464,67 @@ mod tests {
1
);
}
#[allow(clippy::unwrap_used)]
#[tokio::test]
async fn test_api_keys_cache() {
let db = MockDb::new(&Default::default()).await;
let redis_conn = db.get_redis_conn();
redis_conn
.subscribe("hyperswitch_invalidate")
.await
.unwrap();
let merchant_id = "test_merchant";
let api = storage::ApiKeyNew {
key_id: "test_key".into(),
merchant_id: merchant_id.into(),
name: "My test key".into(),
description: None,
hashed_api_key: "a_hashed_key".to_string().into(),
prefix: "pre".into(),
created_at: datetime!(2023-06-01 0:00),
expires_at: None,
last_used: None,
};
let api = db.insert_api_key(api).await.unwrap();
let hashed_api_key = api.hashed_api_key.clone();
let find_call = || async {
db.find_api_key_by_hash_optional(hashed_api_key.clone())
.await
};
let _: Option<storage::ApiKey> = cache::get_or_populate_in_memory(
&db,
&format!("{}_{}", merchant_id, hashed_api_key.clone().into_inner()),
find_call,
&ACCOUNTS_CACHE,
)
.await
.unwrap();
let delete_call = || async { db.revoke_api_key(merchant_id, &api.key_id).await };
cache::publish_and_redact(
&db,
CacheKind::Accounts(
format!("{}_{}", merchant_id, hashed_api_key.clone().into_inner()).into(),
),
delete_call,
)
.await
.unwrap();
assert!(
ACCOUNTS_CACHE
.get_val::<storage::ApiKey>(&format!(
"{}_{}",
merchant_id,
hashed_api_key.into_inner()
),)
.is_none()
)
}
}

View File

@ -1,9 +1,9 @@
use common_utils::ext_traits::{AsyncExt, ByteSliceExt, Encode};
use error_stack::{IntoReport, ResultExt};
#[cfg(feature = "accounts_cache")]
use super::cache;
use super::{MockDb, Store};
#[cfg(feature = "accounts_cache")]
use crate::cache::{self, ACCOUNTS_CACHE};
use crate::{
connection,
core::errors::{self, CustomResult},
@ -161,6 +161,7 @@ impl MerchantConnectorAccountInterface for Store {
connector_label: &str,
key_store: &domain::MerchantKeyStore,
) -> CustomResult<domain::MerchantConnectorAccount, errors::StorageError> {
let find_call = || async {
let conn = connection::pg_connection_read(self).await?;
storage::MerchantConnectorAccount::find_by_merchant_id_connector(
&conn,
@ -170,30 +171,6 @@ impl MerchantConnectorAccountInterface for Store {
.await
.map_err(Into::into)
.into_report()
.async_and_then(|item| async {
item.convert(key_store.key.get_inner())
.await
.change_context(errors::StorageError::DecryptionError)
})
.await
}
async fn find_by_merchant_connector_account_merchant_id_merchant_connector_id(
&self,
merchant_id: &str,
merchant_connector_id: &str,
key_store: &domain::MerchantKeyStore,
) -> CustomResult<domain::MerchantConnectorAccount, errors::StorageError> {
let find_call = || async {
let conn = connection::pg_connection_read(self).await?;
storage::MerchantConnectorAccount::find_by_merchant_id_merchant_connector_id(
&conn,
merchant_id,
merchant_connector_id,
)
.await
.map_err(Into::into)
.into_report()
};
#[cfg(not(feature = "accounts_cache"))]
@ -207,14 +184,45 @@ impl MerchantConnectorAccountInterface for Store {
#[cfg(feature = "accounts_cache")]
{
cache::get_or_populate_redis(self, merchant_connector_id, find_call)
.await?
.convert(key_store.key.get_inner())
super::cache::get_or_populate_in_memory(
self,
&format!("{}_{}", merchant_id, connector_label),
find_call,
&ACCOUNTS_CACHE,
)
.await
.async_and_then(|item| async {
item.convert(key_store.key.get_inner())
.await
.change_context(errors::StorageError::DecryptionError)
})
.await
.change_context(errors::StorageError::DeserializationFailed)
}
}
async fn find_by_merchant_connector_account_merchant_id_merchant_connector_id(
&self,
merchant_id: &str,
merchant_connector_id: &str,
key_store: &domain::MerchantKeyStore,
) -> CustomResult<domain::MerchantConnectorAccount, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage::MerchantConnectorAccount::find_by_merchant_id_merchant_connector_id(
&conn,
merchant_id,
merchant_connector_id,
)
.await
.map_err(Into::into)
.into_report()
.async_and_then(|item| async {
item.convert(key_store.key.get_inner())
.await
.change_context(errors::StorageError::DecryptionError)
})
.await
}
async fn insert_merchant_connector_account(
&self,
t: domain::MerchantConnectorAccount,
@ -267,7 +275,9 @@ impl MerchantConnectorAccountInterface for Store {
merchant_connector_account: storage::MerchantConnectorAccountUpdateInternal,
key_store: &domain::MerchantKeyStore,
) -> CustomResult<domain::MerchantConnectorAccount, errors::StorageError> {
let _merchant_connector_id = this.merchant_connector_id.clone();
let _merchant_id = this.merchant_id.clone();
let _merchant_connector_label = this.connector_label.clone();
let update_call = || async {
let conn = connection::pg_connection_write(self).await?;
Conversion::convert(this)
@ -287,7 +297,14 @@ impl MerchantConnectorAccountInterface for Store {
#[cfg(feature = "accounts_cache")]
{
cache::redact_cache(self, &_merchant_connector_id, update_call, None).await
super::cache::publish_and_redact(
self,
cache::CacheKind::Accounts(
format!("{}_{}", _merchant_id, _merchant_connector_label).into(),
),
update_call,
)
.await
}
#[cfg(not(feature = "accounts_cache"))]
@ -302,6 +319,7 @@ impl MerchantConnectorAccountInterface for Store {
merchant_connector_id: &str,
) -> CustomResult<bool, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
let delete_call = || async {
storage::MerchantConnectorAccount::delete_by_merchant_id_merchant_connector_id(
&conn,
merchant_id,
@ -310,6 +328,38 @@ impl MerchantConnectorAccountInterface for Store {
.await
.map_err(Into::into)
.into_report()
};
#[cfg(feature = "accounts_cache")]
{
// We need to fetch mca here because the key that's saved in cache in
// {merchant_id}_{connector_label}.
// Used function from storage model to reuse the connection that made here instead of
// creating new.
let mca = storage::MerchantConnectorAccount::find_by_merchant_id_merchant_connector_id(
&conn,
merchant_id,
merchant_connector_id,
)
.await
.map_err(Into::into)
.into_report()?;
super::cache::publish_and_redact(
self,
cache::CacheKind::Accounts(
format!("{}_{}", mca.merchant_id, mca.connector_label).into(),
),
delete_call,
)
.await
}
#[cfg(not(feature = "accounts_cache"))]
{
delete_call().await
}
}
}
@ -507,3 +557,119 @@ impl MerchantConnectorAccountInterface for MockDb {
}
}
}
#[cfg(test)]
mod merchant_connector_account_cache_tests {
use api_models::enums::CountryAlpha2;
use common_utils::date_time;
use error_stack::ResultExt;
use storage_models::enums::ConnectorType;
use crate::{
cache::{CacheKind, ACCOUNTS_CACHE},
core::errors,
db::{
cache, merchant_connector_account::MerchantConnectorAccountInterface,
merchant_key_store::MerchantKeyStoreInterface, MasterKeyInterface, MockDb,
},
services::{PubSubInterface, RedisConnInterface},
types::{
domain::{self, behaviour::Conversion, types as domain_types},
storage,
},
};
#[allow(clippy::unwrap_used)]
#[tokio::test]
async fn test_connector_label_cache() {
let db = MockDb::new(&Default::default()).await;
let redis_conn = db.get_redis_conn();
let key = db.get_master_key();
redis_conn
.subscribe("hyperswitch_invalidate")
.await
.unwrap();
let merchant_id = "test_merchant";
let connector_label = "stripe_USA";
let merchant_connector_id = "simple_merchant_connector_id";
let mca = domain::MerchantConnectorAccount {
id: Some(1),
merchant_id: merchant_id.to_string(),
connector_name: "stripe".to_string(),
connector_account_details: domain_types::encrypt(
serde_json::Value::default().into(),
key,
)
.await
.unwrap(),
test_mode: None,
disabled: None,
merchant_connector_id: merchant_connector_id.to_string(),
payment_methods_enabled: None,
connector_type: ConnectorType::FinOperations,
metadata: None,
frm_configs: None,
connector_label: connector_label.to_string(),
business_country: CountryAlpha2::US,
business_label: "cloth".to_string(),
business_sub_label: None,
created_at: date_time::now(),
modified_at: date_time::now(),
};
let key_store = db
.get_merchant_key_store_by_merchant_id(merchant_id, &key.to_vec().into())
.await
.unwrap();
db.insert_merchant_connector_account(mca, &key_store)
.await
.unwrap();
let find_call = || async {
db.find_merchant_connector_account_by_merchant_id_connector_label(
merchant_id,
connector_label,
&key_store,
)
.await
.unwrap()
.convert()
.await
.change_context(errors::StorageError::DecryptionError)
};
let _: storage::MerchantConnectorAccount = cache::get_or_populate_in_memory(
&db,
&format!("{}_{}", merchant_id, connector_label),
find_call,
&ACCOUNTS_CACHE,
)
.await
.unwrap();
let delete_call = || async {
db.delete_merchant_connector_account_by_merchant_id_merchant_connector_id(
merchant_id,
merchant_connector_id,
)
.await
};
cache::publish_and_redact(
&db,
CacheKind::Accounts(format!("{}_{}", merchant_id, connector_label).into()),
delete_call,
)
.await
.unwrap();
assert!(ACCOUNTS_CACHE
.get_val::<domain::MerchantConnectorAccount>(&format!(
"{}_{}",
merchant_id, connector_label
),)
.is_none())
}
}

View File

@ -4,7 +4,7 @@ use time::PrimitiveDateTime;
use crate::schema::api_keys;
#[derive(Debug, Clone, Identifiable, Queryable)]
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Identifiable, Queryable)]
#[diesel(table_name = api_keys, primary_key(key_id))]
pub struct ApiKey {
pub key_id: String,
@ -78,7 +78,7 @@ impl From<ApiKeyUpdate> for ApiKeyUpdateInternal {
}
}
#[derive(Debug, Clone, AsExpression, PartialEq)]
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, AsExpression, PartialEq)]
#[diesel(sql_type = diesel::sql_types::Text)]
pub struct HashedApiKey(String);