feat: encryption service integration to support batch encryption and decryption (#5164)

Co-authored-by: dracarys18 <karthikey.hegde@juspay.in>
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Arjun Karthik
2024-07-19 13:08:58 +05:30
committed by GitHub
parent c698921c41
commit 33298b3808
127 changed files with 4239 additions and 1378 deletions

View File

@ -9,6 +9,7 @@ license.workspace = true
[features]
default = ["olap", "payouts", "frm"]
encryption_service =[]
olap = []
payouts = ["api_models/payouts"]
frm = ["api_models/frm"]
@ -18,7 +19,7 @@ frm = ["api_models/frm"]
api_models = { version = "0.1.0", path = "../api_models", features = ["errors"] }
cards = { version = "0.1.0", path = "../cards" }
common_enums = { version = "0.1.0", path = "../common_enums" }
common_utils = { version = "0.1.0", path = "../common_utils", features = ["async_ext", "metrics"] }
common_utils = { version = "0.1.0", path = "../common_utils", features = ["async_ext", "metrics", "encryption_service", "keymanager"] }
diesel_models = { version = "0.1.0", path = "../diesel_models", features = ["kv_store"] }
masking = { version = "0.1.0", path = "../masking" }
router_derive = { version = "0.1.0", path = "../router_derive" }
@ -31,6 +32,7 @@ error-stack = "0.4.1"
futures = "0.3.30"
http = "0.2.12"
mime = "0.3.17"
rustc-hash = "1.1.0"
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.115"
serde_with = "3.7.0"

View File

@ -1,4 +1,7 @@
use common_utils::errors::{CustomResult, ValidationError};
use common_utils::{
errors::{CustomResult, ValidationError},
types::keymanager::KeyManagerState,
};
use masking::Secret;
/// Trait for converting domain types to storage models
@ -9,8 +12,10 @@ pub trait Conversion {
async fn convert(self) -> CustomResult<Self::DstType, ValidationError>;
async fn convert_back(
state: &KeyManagerState,
item: Self::DstType,
key: &Secret<Vec<u8>>,
key_store_ref_id: String,
) -> CustomResult<Self, ValidationError>
where
Self: Sized;
@ -20,12 +25,22 @@ pub trait Conversion {
#[async_trait::async_trait]
pub trait ReverseConversion<SrcType: Conversion> {
async fn convert(self, key: &Secret<Vec<u8>>) -> CustomResult<SrcType, ValidationError>;
async fn convert(
self,
state: &KeyManagerState,
key: &Secret<Vec<u8>>,
key_store_ref_id: String,
) -> CustomResult<SrcType, ValidationError>;
}
#[async_trait::async_trait]
impl<T: Send, U: Conversion<DstType = T>> ReverseConversion<U> for T {
async fn convert(self, key: &Secret<Vec<u8>>) -> CustomResult<U, ValidationError> {
U::convert_back(self, key).await
async fn convert(
self,
state: &KeyManagerState,
key: &Secret<Vec<u8>>,
key_store_ref_id: String,
) -> CustomResult<U, ValidationError> {
U::convert_back(state, self, key, key_store_ref_id).await
}
}

View File

@ -1,13 +1,14 @@
use common_utils::{
crypto::{OptionalEncryptableName, OptionalEncryptableValue},
date_time,
encryption::Encryption,
errors::{CustomResult, ValidationError},
ext_traits::ValueExt,
pii,
types::keymanager,
};
use diesel_models::{
encryption::Encryption, enums::MerchantStorageScheme,
merchant_account::MerchantAccountUpdateInternal,
enums::MerchantStorageScheme, merchant_account::MerchantAccountUpdateInternal,
};
use error_stack::ResultExt;
use masking::{PeekInterface, Secret};
@ -195,8 +196,10 @@ impl super::behaviour::Conversion for MerchantAccount {
}
async fn convert_back(
state: &keymanager::KeyManagerState,
item: Self::DstType,
key: &Secret<Vec<u8>>,
key_store_ref_id: String,
) -> CustomResult<Self, ValidationError>
where
Self: Sized,
@ -206,7 +209,7 @@ impl super::behaviour::Conversion for MerchantAccount {
.ok_or(ValidationError::MissingRequiredField {
field_name: "publishable_key".to_string(),
})?;
let identifier = keymanager::Identifier::Merchant(key_store_ref_id.clone());
async {
Ok::<Self, error_stack::Report<common_utils::errors::CryptoError>>(Self {
id: Some(item.id),
@ -217,11 +220,11 @@ impl super::behaviour::Conversion for MerchantAccount {
redirect_to_merchant_with_http_post: item.redirect_to_merchant_with_http_post,
merchant_name: item
.merchant_name
.async_lift(|inner| decrypt(inner, key.peek()))
.async_lift(|inner| decrypt(state, inner, identifier.clone(), key.peek()))
.await?,
merchant_details: item
.merchant_details
.async_lift(|inner| decrypt(inner, key.peek()))
.async_lift(|inner| decrypt(state, inner, identifier.clone(), key.peek()))
.await?,
webhook_details: item.webhook_details,
sub_merchants_enabled: item.sub_merchants_enabled,

View File

@ -2,6 +2,7 @@ use common_utils::{
crypto::{Encryptable, GcmAes256},
custom_serde, date_time,
errors::{CustomResult, ValidationError},
types::keymanager::{Identifier, KeyManagerState},
};
use error_stack::ResultExt;
use masking::{PeekInterface, Secret};
@ -30,14 +31,17 @@ impl super::behaviour::Conversion for MerchantKeyStore {
}
async fn convert_back(
state: &KeyManagerState,
item: Self::DstType,
key: &Secret<Vec<u8>>,
_key_store_ref_id: String,
) -> CustomResult<Self, ValidationError>
where
Self: Sized,
{
let identifier = Identifier::Merchant(item.merchant_id.clone());
Ok(Self {
key: Encryptable::decrypt(item.key, key.peek(), GcmAes256)
key: Encryptable::decrypt_via_api(state, item.key, identifier, key.peek(), GcmAes256)
.await
.change_context(ValidationError::InvalidValue {
message: "Failed while decrypting customer data".to_string(),

View File

@ -1,9 +1,10 @@
use api_models::enums::Connector;
use common_enums as storage_enums;
use common_utils::{
encryption::Encryption,
errors::{CustomResult, ValidationError},
pii,
types::MinorUnit,
types::{keymanager::KeyManagerState, MinorUnit},
};
use error_stack::ResultExt;
use masking::PeekInterface;
@ -479,8 +480,7 @@ impl ForeignIDRef for PaymentAttempt {
}
use diesel_models::{
encryption::Encryption, PaymentIntent as DieselPaymentIntent,
PaymentIntentNew as DieselPaymentIntentNew,
PaymentIntent as DieselPaymentIntent, PaymentIntentNew as DieselPaymentIntentNew,
};
#[async_trait::async_trait]
@ -542,14 +542,23 @@ impl behaviour::Conversion for PaymentIntent {
}
async fn convert_back(
state: &KeyManagerState,
storage_model: Self::DstType,
key: &masking::Secret<Vec<u8>>,
key_store_ref_id: String,
) -> CustomResult<Self, ValidationError>
where
Self: Sized,
{
async {
let inner_decrypt = |inner| decrypt(inner, key.peek());
let inner_decrypt = |inner| {
decrypt(
state,
inner,
common_utils::types::keymanager::Identifier::Merchant(key_store_ref_id.clone()),
key.peek(),
)
};
Ok::<Self, error_stack::Report<common_utils::errors::CryptoError>>(Self {
payment_id: storage_model.payment_id,
merchant_id: storage_model.merchant_id,

View File

@ -2,9 +2,10 @@ use common_enums as storage_enums;
use common_utils::{
consts::{PAYMENTS_LIST_MAX_LIMIT_V1, PAYMENTS_LIST_MAX_LIMIT_V2},
crypto::Encryptable,
encryption::Encryption,
id_type,
pii::{self, Email},
types::MinorUnit,
types::{keymanager::KeyManagerState, MinorUnit},
};
use masking::{Deserialize, Secret};
use serde::Serialize;
@ -16,6 +17,7 @@ use crate::{errors, merchant_key_store::MerchantKeyStore, RemoteStorageObject};
pub trait PaymentIntentInterface {
async fn update_payment_intent(
&self,
state: &KeyManagerState,
this: PaymentIntent,
payment_intent: PaymentIntentUpdate,
merchant_key_store: &MerchantKeyStore,
@ -24,6 +26,7 @@ pub trait PaymentIntentInterface {
async fn insert_payment_intent(
&self,
state: &KeyManagerState,
new: PaymentIntent,
merchant_key_store: &MerchantKeyStore,
storage_scheme: storage_enums::MerchantStorageScheme,
@ -31,6 +34,7 @@ pub trait PaymentIntentInterface {
async fn find_payment_intent_by_payment_id_merchant_id(
&self,
state: &KeyManagerState,
payment_id: &str,
merchant_id: &str,
merchant_key_store: &MerchantKeyStore,
@ -46,6 +50,7 @@ pub trait PaymentIntentInterface {
#[cfg(feature = "olap")]
async fn filter_payment_intent_by_constraints(
&self,
state: &KeyManagerState,
merchant_id: &str,
filters: &PaymentIntentFetchConstraints,
merchant_key_store: &MerchantKeyStore,
@ -55,6 +60,7 @@ pub trait PaymentIntentInterface {
#[cfg(feature = "olap")]
async fn filter_payment_intents_by_time_range_constraints(
&self,
state: &KeyManagerState,
merchant_id: &str,
time_range: &api_models::payments::TimeRange,
merchant_key_store: &MerchantKeyStore,
@ -64,6 +70,7 @@ pub trait PaymentIntentInterface {
#[cfg(feature = "olap")]
async fn get_filtered_payment_intents_attempt(
&self,
state: &KeyManagerState,
merchant_id: &str,
constraints: &PaymentIntentFetchConstraints,
merchant_key_store: &MerchantKeyStore,
@ -467,7 +474,7 @@ impl From<PaymentIntentUpdate> for PaymentIntentUpdateInternal {
}
use diesel_models::{
encryption::Encryption, PaymentIntentUpdate as DieselPaymentIntentUpdate,
PaymentIntentUpdate as DieselPaymentIntentUpdate,
PaymentIntentUpdateFields as DieselPaymentIntentUpdateFields,
};

View File

@ -1,14 +1,30 @@
use async_trait::async_trait;
use common_utils::{
crypto,
encryption::Encryption,
errors::{self, CustomResult},
ext_traits::AsyncExt,
metrics::utils::record_operation_time,
types::keymanager::{Identifier, KeyManagerState},
};
use diesel_models::encryption::Encryption;
use error_stack::ResultExt;
use masking::{PeekInterface, Secret};
use router_env::{instrument, tracing};
use rustc_hash::FxHashMap;
#[cfg(feature = "encryption_service")]
use {
common_utils::{
keymanager::call_encryption_service,
transformers::{ForeignFrom, ForeignTryFrom},
types::keymanager::{
BatchDecryptDataResponse, BatchEncryptDataRequest, BatchEncryptDataResponse,
DecryptDataResponse, EncryptDataRequest, EncryptDataResponse,
TransientBatchDecryptDataRequest, TransientDecryptDataRequest,
},
},
http::Method,
router_env::logger,
};
#[async_trait]
pub trait TypeEncryption<
@ -17,6 +33,22 @@ pub trait TypeEncryption<
S: masking::Strategy<T>,
>: Sized
{
async fn encrypt_via_api(
state: &KeyManagerState,
masked_data: Secret<T, S>,
identifier: Identifier,
key: &[u8],
crypt_algo: V,
) -> CustomResult<Self, errors::CryptoError>;
async fn decrypt_via_api(
state: &KeyManagerState,
encrypted_data: Encryption,
identifier: Identifier,
key: &[u8],
crypt_algo: V,
) -> CustomResult<Self, errors::CryptoError>;
async fn encrypt(
masked_data: Secret<T, S>,
key: &[u8],
@ -28,31 +60,141 @@ pub trait TypeEncryption<
key: &[u8],
crypt_algo: V,
) -> CustomResult<Self, errors::CryptoError>;
async fn batch_encrypt_via_api(
state: &KeyManagerState,
masked_data: FxHashMap<String, Secret<T, S>>,
identifier: Identifier,
key: &[u8],
crypt_algo: V,
) -> CustomResult<FxHashMap<String, Self>, errors::CryptoError>;
async fn batch_decrypt_via_api(
state: &KeyManagerState,
encrypted_data: FxHashMap<String, Encryption>,
identifier: Identifier,
key: &[u8],
crypt_algo: V,
) -> CustomResult<FxHashMap<String, Self>, errors::CryptoError>;
async fn batch_encrypt(
masked_data: FxHashMap<String, Secret<T, S>>,
key: &[u8],
crypt_algo: V,
) -> CustomResult<FxHashMap<String, Self>, errors::CryptoError>;
async fn batch_decrypt(
encrypted_data: FxHashMap<String, Encryption>,
key: &[u8],
crypt_algo: V,
) -> CustomResult<FxHashMap<String, Self>, errors::CryptoError>;
}
#[async_trait]
impl<
V: crypto::DecodeMessage + crypto::EncodeMessage + Send + 'static,
S: masking::Strategy<String> + Send,
S: masking::Strategy<String> + Send + Sync,
> TypeEncryption<String, V, S> for crypto::Encryptable<Secret<String, S>>
{
#[instrument(skip_all)]
#[allow(unused_variables)]
async fn encrypt_via_api(
state: &KeyManagerState,
masked_data: Secret<String, S>,
identifier: Identifier,
key: &[u8],
crypt_algo: V,
) -> CustomResult<Self, errors::CryptoError> {
#[cfg(not(feature = "encryption_service"))]
{
Self::encrypt(masked_data, key, crypt_algo).await
}
#[cfg(feature = "encryption_service")]
{
let result: Result<
EncryptDataResponse,
error_stack::Report<errors::KeyManagerClientError>,
> = call_encryption_service(
state,
Method::POST,
"data/encrypt",
EncryptDataRequest::from((masked_data.clone(), identifier)),
)
.await;
match result {
Ok(response) => Ok(ForeignFrom::foreign_from((masked_data.clone(), response))),
Err(err) => {
logger::error!("Encryption error {:?}", err);
metrics::ENCRYPTION_API_FAILURES.add(&metrics::CONTEXT, 1, &[]);
logger::info!("Fall back to Application Encryption");
Self::encrypt(masked_data, key, crypt_algo).await
}
}
}
}
#[instrument(skip_all)]
#[allow(unused_variables)]
async fn decrypt_via_api(
state: &KeyManagerState,
encrypted_data: Encryption,
identifier: Identifier,
key: &[u8],
crypt_algo: V,
) -> CustomResult<Self, errors::CryptoError> {
#[cfg(not(feature = "encryption_service"))]
{
Self::decrypt(encrypted_data, key, crypt_algo).await
}
#[cfg(feature = "encryption_service")]
{
let result: Result<
DecryptDataResponse,
error_stack::Report<errors::KeyManagerClientError>,
> = call_encryption_service(
state,
Method::POST,
"data/decrypt",
TransientDecryptDataRequest::from((encrypted_data.clone(), identifier)),
)
.await;
let decrypted = match result {
Ok(decrypted_data) => {
ForeignTryFrom::foreign_try_from((encrypted_data.clone(), decrypted_data))
}
Err(err) => {
logger::error!("Decryption error {:?}", err);
Err(err.change_context(errors::CryptoError::DecodingFailed))
}
};
match decrypted {
Ok(de) => Ok(de),
Err(_) => {
metrics::DECRYPTION_API_FAILURES.add(&metrics::CONTEXT, 1, &[]);
logger::info!("Fall back to Application Decryption");
Self::decrypt(encrypted_data, key, crypt_algo).await
}
}
}
}
async fn encrypt(
masked_data: Secret<String, S>,
key: &[u8],
crypt_algo: V,
) -> CustomResult<Self, errors::CryptoError> {
metrics::APPLICATION_ENCRYPTION_COUNT.add(&metrics::CONTEXT, 1, &[]);
let encrypted_data = crypt_algo.encode_message(key, masked_data.peek().as_bytes())?;
Ok(Self::new(masked_data, encrypted_data.into()))
}
#[instrument(skip_all)]
async fn decrypt(
encrypted_data: Encryption,
key: &[u8],
crypt_algo: V,
) -> CustomResult<Self, errors::CryptoError> {
metrics::APPLICATION_DECRYPTION_COUNT.add(&metrics::CONTEXT, 1, &[]);
let encrypted = encrypted_data.into_inner();
let data = crypt_algo.decode_message(key, encrypted.clone())?;
@ -62,25 +204,227 @@ impl<
Ok(Self::new(value.into(), encrypted))
}
#[allow(unused_variables)]
async fn batch_encrypt_via_api(
state: &KeyManagerState,
masked_data: FxHashMap<String, Secret<String, S>>,
identifier: Identifier,
key: &[u8],
crypt_algo: V,
) -> CustomResult<FxHashMap<String, Self>, errors::CryptoError> {
#[cfg(not(feature = "encryption_service"))]
{
Self::batch_encrypt(masked_data, key, crypt_algo).await
}
#[cfg(feature = "encryption_service")]
{
let result: Result<
BatchEncryptDataResponse,
error_stack::Report<errors::KeyManagerClientError>,
> = call_encryption_service(
state,
Method::POST,
"data/encrypt",
BatchEncryptDataRequest::from((masked_data.clone(), identifier)),
)
.await;
match result {
Ok(response) => Ok(ForeignFrom::foreign_from((masked_data, response))),
Err(err) => {
metrics::ENCRYPTION_API_FAILURES.add(&metrics::CONTEXT, 1, &[]);
logger::error!("Encryption error {:?}", err);
logger::info!("Fall back to Application Encryption");
Self::batch_encrypt(masked_data, key, crypt_algo).await
}
}
}
}
#[allow(unused_variables)]
async fn batch_decrypt_via_api(
state: &KeyManagerState,
encrypted_data: FxHashMap<String, Encryption>,
identifier: Identifier,
key: &[u8],
crypt_algo: V,
) -> CustomResult<FxHashMap<String, Self>, errors::CryptoError> {
#[cfg(not(feature = "encryption_service"))]
{
Self::batch_decrypt(encrypted_data, key, crypt_algo).await
}
#[cfg(feature = "encryption_service")]
{
let result: Result<
BatchDecryptDataResponse,
error_stack::Report<errors::KeyManagerClientError>,
> = call_encryption_service(
state,
Method::POST,
"data/decrypt",
TransientBatchDecryptDataRequest::from((encrypted_data.clone(), identifier)),
)
.await;
let decrypted = match result {
Ok(decrypted_data) => {
ForeignTryFrom::foreign_try_from((encrypted_data.clone(), decrypted_data))
}
Err(err) => {
logger::error!("Decryption error {:?}", err);
Err(err.change_context(errors::CryptoError::DecodingFailed))
}
};
match decrypted {
Ok(de) => Ok(de),
Err(_) => {
metrics::DECRYPTION_API_FAILURES.add(&metrics::CONTEXT, 1, &[]);
logger::info!("Fall back to Application Decryption");
Self::batch_decrypt(encrypted_data, key, crypt_algo).await
}
}
}
}
async fn batch_encrypt(
masked_data: FxHashMap<String, Secret<String, S>>,
key: &[u8],
crypt_algo: V,
) -> CustomResult<FxHashMap<String, Self>, errors::CryptoError> {
metrics::APPLICATION_ENCRYPTION_COUNT.add(&metrics::CONTEXT, 1, &[]);
masked_data
.into_iter()
.map(|(k, v)| {
Ok((
k,
Self::new(
v.clone(),
crypt_algo.encode_message(key, v.peek().as_bytes())?.into(),
),
))
})
.collect()
}
async fn batch_decrypt(
encrypted_data: FxHashMap<String, Encryption>,
key: &[u8],
crypt_algo: V,
) -> CustomResult<FxHashMap<String, Self>, errors::CryptoError> {
metrics::APPLICATION_DECRYPTION_COUNT.add(&metrics::CONTEXT, 1, &[]);
encrypted_data
.into_iter()
.map(|(k, v)| {
let data = crypt_algo.decode_message(key, v.clone().into_inner())?;
let value: String = std::str::from_utf8(&data)
.change_context(errors::CryptoError::DecodingFailed)?
.to_string();
Ok((k, Self::new(value.into(), v.into_inner())))
})
.collect()
}
}
#[async_trait]
impl<
V: crypto::DecodeMessage + crypto::EncodeMessage + Send + 'static,
S: masking::Strategy<serde_json::Value> + Send,
S: masking::Strategy<serde_json::Value> + Send + Sync,
> TypeEncryption<serde_json::Value, V, S>
for crypto::Encryptable<Secret<serde_json::Value, S>>
{
#[instrument(skip_all)]
#[allow(unused_variables)]
async fn encrypt_via_api(
state: &KeyManagerState,
masked_data: Secret<serde_json::Value, S>,
identifier: Identifier,
key: &[u8],
crypt_algo: V,
) -> CustomResult<Self, errors::CryptoError> {
#[cfg(not(feature = "encryption_service"))]
{
Self::encrypt(masked_data, key, crypt_algo).await
}
#[cfg(feature = "encryption_service")]
{
let result: Result<
EncryptDataResponse,
error_stack::Report<errors::KeyManagerClientError>,
> = call_encryption_service(
state,
Method::POST,
"data/encrypt",
EncryptDataRequest::from((masked_data.clone(), identifier)),
)
.await;
match result {
Ok(response) => Ok(ForeignFrom::foreign_from((masked_data.clone(), response))),
Err(err) => {
logger::error!("Encryption error {:?}", err);
metrics::ENCRYPTION_API_FAILURES.add(&metrics::CONTEXT, 1, &[]);
logger::info!("Fall back to Application Encryption");
Self::encrypt(masked_data, key, crypt_algo).await
}
}
}
}
#[instrument(skip_all)]
#[allow(unused_variables)]
async fn decrypt_via_api(
state: &KeyManagerState,
encrypted_data: Encryption,
identifier: Identifier,
key: &[u8],
crypt_algo: V,
) -> CustomResult<Self, errors::CryptoError> {
#[cfg(not(feature = "encryption_service"))]
{
Self::decrypt(encrypted_data, key, crypt_algo).await
}
#[cfg(feature = "encryption_service")]
{
let result: Result<
DecryptDataResponse,
error_stack::Report<errors::KeyManagerClientError>,
> = call_encryption_service(
state,
Method::POST,
"data/decrypt",
TransientDecryptDataRequest::from((encrypted_data.clone(), identifier)),
)
.await;
let decrypted = match result {
Ok(decrypted_data) => {
ForeignTryFrom::foreign_try_from((encrypted_data.clone(), decrypted_data))
}
Err(err) => {
logger::error!("Decryption error {:?}", err);
Err(err.change_context(errors::CryptoError::EncodingFailed))
}
};
match decrypted {
Ok(de) => Ok(de),
Err(_) => {
metrics::DECRYPTION_API_FAILURES.add(&metrics::CONTEXT, 1, &[]);
logger::info!("Fall back to Application Decryption");
Self::decrypt(encrypted_data, key, crypt_algo).await
}
}
}
}
#[instrument(skip_all)]
async fn encrypt(
masked_data: Secret<serde_json::Value, S>,
key: &[u8],
crypt_algo: V,
) -> CustomResult<Self, errors::CryptoError> {
metrics::APPLICATION_ENCRYPTION_COUNT.add(&metrics::CONTEXT, 1, &[]);
let data = serde_json::to_vec(&masked_data.peek())
.change_context(errors::CryptoError::DecodingFailed)?;
let encrypted_data = crypt_algo.encode_message(key, &data)?;
Ok(Self::new(masked_data, encrypted_data.into()))
}
@ -90,30 +434,229 @@ impl<
key: &[u8],
crypt_algo: V,
) -> CustomResult<Self, errors::CryptoError> {
metrics::APPLICATION_DECRYPTION_COUNT.add(&metrics::CONTEXT, 1, &[]);
let encrypted = encrypted_data.into_inner();
let data = crypt_algo.decode_message(key, encrypted.clone())?;
let value: serde_json::Value =
serde_json::from_slice(&data).change_context(errors::CryptoError::DecodingFailed)?;
Ok(Self::new(value.into(), encrypted))
}
#[allow(unused_variables)]
async fn batch_encrypt_via_api(
state: &KeyManagerState,
masked_data: FxHashMap<String, Secret<serde_json::Value, S>>,
identifier: Identifier,
key: &[u8],
crypt_algo: V,
) -> CustomResult<FxHashMap<String, Self>, errors::CryptoError> {
#[cfg(not(feature = "encryption_service"))]
{
Self::batch_encrypt(masked_data, key, crypt_algo).await
}
#[cfg(feature = "encryption_service")]
{
let result: Result<
BatchEncryptDataResponse,
error_stack::Report<errors::KeyManagerClientError>,
> = call_encryption_service(
state,
Method::POST,
"data/encrypt",
BatchEncryptDataRequest::from((masked_data.clone(), identifier)),
)
.await;
match result {
Ok(response) => Ok(ForeignFrom::foreign_from((masked_data, response))),
Err(err) => {
metrics::ENCRYPTION_API_FAILURES.add(&metrics::CONTEXT, 1, &[]);
logger::error!("Encryption error {:?}", err);
logger::info!("Fall back to Application Encryption");
Self::batch_encrypt(masked_data, key, crypt_algo).await
}
}
}
}
#[allow(unused_variables)]
async fn batch_decrypt_via_api(
state: &KeyManagerState,
encrypted_data: FxHashMap<String, Encryption>,
identifier: Identifier,
key: &[u8],
crypt_algo: V,
) -> CustomResult<FxHashMap<String, Self>, errors::CryptoError> {
#[cfg(not(feature = "encryption_service"))]
{
Self::batch_decrypt(encrypted_data, key, crypt_algo).await
}
#[cfg(feature = "encryption_service")]
{
let result: Result<
BatchDecryptDataResponse,
error_stack::Report<errors::KeyManagerClientError>,
> = call_encryption_service(
state,
Method::POST,
"data/decrypt",
TransientBatchDecryptDataRequest::from((encrypted_data.clone(), identifier)),
)
.await;
let decrypted = match result {
Ok(decrypted_data) => {
ForeignTryFrom::foreign_try_from((encrypted_data.clone(), decrypted_data))
}
Err(err) => {
logger::error!("Decryption error {:?}", err);
Err(err.change_context(errors::CryptoError::DecodingFailed))
}
};
match decrypted {
Ok(de) => Ok(de),
Err(_) => {
metrics::DECRYPTION_API_FAILURES.add(&metrics::CONTEXT, 1, &[]);
logger::info!("Fall back to Application Decryption");
Self::batch_decrypt(encrypted_data, key, crypt_algo).await
}
}
}
}
async fn batch_encrypt(
masked_data: FxHashMap<String, Secret<serde_json::Value, S>>,
key: &[u8],
crypt_algo: V,
) -> CustomResult<FxHashMap<String, Self>, errors::CryptoError> {
metrics::APPLICATION_ENCRYPTION_COUNT.add(&metrics::CONTEXT, 1, &[]);
masked_data
.into_iter()
.map(|(k, v)| {
let data = serde_json::to_vec(v.peek())
.change_context(errors::CryptoError::DecodingFailed)?;
Ok((
k,
Self::new(v, crypt_algo.encode_message(key, &data)?.into()),
))
})
.collect()
}
async fn batch_decrypt(
encrypted_data: FxHashMap<String, Encryption>,
key: &[u8],
crypt_algo: V,
) -> CustomResult<FxHashMap<String, Self>, errors::CryptoError> {
metrics::APPLICATION_DECRYPTION_COUNT.add(&metrics::CONTEXT, 1, &[]);
encrypted_data
.into_iter()
.map(|(k, v)| {
let data = crypt_algo.decode_message(key, v.clone().into_inner().clone())?;
let value: serde_json::Value = serde_json::from_slice(&data)
.change_context(errors::CryptoError::DecodingFailed)?;
Ok((k, Self::new(value.into(), v.into_inner())))
})
.collect()
}
}
#[async_trait]
impl<
V: crypto::DecodeMessage + crypto::EncodeMessage + Send + 'static,
S: masking::Strategy<Vec<u8>> + Send,
S: masking::Strategy<Vec<u8>> + Send + Sync,
> TypeEncryption<Vec<u8>, V, S> for crypto::Encryptable<Secret<Vec<u8>, S>>
{
#[instrument(skip_all)]
#[allow(unused_variables)]
async fn encrypt_via_api(
state: &KeyManagerState,
masked_data: Secret<Vec<u8>, S>,
identifier: Identifier,
key: &[u8],
crypt_algo: V,
) -> CustomResult<Self, errors::CryptoError> {
#[cfg(not(feature = "encryption_service"))]
{
Self::encrypt(masked_data, key, crypt_algo).await
}
#[cfg(feature = "encryption_service")]
{
let result: Result<
EncryptDataResponse,
error_stack::Report<errors::KeyManagerClientError>,
> = call_encryption_service(
state,
Method::POST,
"data/encrypt",
EncryptDataRequest::from((masked_data.clone(), identifier)),
)
.await;
match result {
Ok(response) => Ok(ForeignFrom::foreign_from((masked_data.clone(), response))),
Err(err) => {
logger::error!("Encryption error {:?}", err);
metrics::ENCRYPTION_API_FAILURES.add(&metrics::CONTEXT, 1, &[]);
logger::info!("Fall back to Application Encryption");
Self::encrypt(masked_data, key, crypt_algo).await
}
}
}
}
#[instrument(skip_all)]
#[allow(unused_variables)]
async fn decrypt_via_api(
state: &KeyManagerState,
encrypted_data: Encryption,
identifier: Identifier,
key: &[u8],
crypt_algo: V,
) -> CustomResult<Self, errors::CryptoError> {
#[cfg(not(feature = "encryption_service"))]
{
Self::decrypt(encrypted_data, key, crypt_algo).await
}
#[cfg(feature = "encryption_service")]
{
let result: Result<
DecryptDataResponse,
error_stack::Report<errors::KeyManagerClientError>,
> = call_encryption_service(
state,
Method::POST,
"data/decrypt",
TransientDecryptDataRequest::from((encrypted_data.clone(), identifier)),
)
.await;
let decrypted = match result {
Ok(decrypted_data) => {
ForeignTryFrom::foreign_try_from((encrypted_data.clone(), decrypted_data))
}
Err(err) => {
logger::error!("Decryption error {:?}", err);
Err(err.change_context(errors::CryptoError::DecodingFailed))
}
};
match decrypted {
Ok(de) => Ok(de),
Err(_) => {
metrics::DECRYPTION_API_FAILURES.add(&metrics::CONTEXT, 1, &[]);
logger::info!("Fall back to Application Decryption");
Self::decrypt(encrypted_data, key, crypt_algo).await
}
}
}
}
#[instrument(skip_all)]
async fn encrypt(
masked_data: Secret<Vec<u8>, S>,
key: &[u8],
crypt_algo: V,
) -> CustomResult<Self, errors::CryptoError> {
metrics::APPLICATION_ENCRYPTION_COUNT.add(&metrics::CONTEXT, 1, &[]);
let encrypted_data = crypt_algo.encode_message(key, masked_data.peek())?;
Ok(Self::new(masked_data, encrypted_data.into()))
}
@ -123,11 +666,131 @@ impl<
key: &[u8],
crypt_algo: V,
) -> CustomResult<Self, errors::CryptoError> {
metrics::APPLICATION_DECRYPTION_COUNT.add(&metrics::CONTEXT, 1, &[]);
let encrypted = encrypted_data.into_inner();
let data = crypt_algo.decode_message(key, encrypted.clone())?;
Ok(Self::new(data.into(), encrypted))
}
#[allow(unused_variables)]
async fn batch_encrypt_via_api(
state: &KeyManagerState,
masked_data: FxHashMap<String, Secret<Vec<u8>, S>>,
identifier: Identifier,
key: &[u8],
crypt_algo: V,
) -> CustomResult<FxHashMap<String, Self>, errors::CryptoError> {
#[cfg(not(feature = "encryption_service"))]
{
Self::batch_encrypt(masked_data, key, crypt_algo).await
}
#[cfg(feature = "encryption_service")]
{
let result: Result<
BatchEncryptDataResponse,
error_stack::Report<errors::KeyManagerClientError>,
> = call_encryption_service(
state,
Method::POST,
"data/encrypt",
BatchEncryptDataRequest::from((masked_data.clone(), identifier)),
)
.await;
match result {
Ok(response) => Ok(ForeignFrom::foreign_from((masked_data, response))),
Err(err) => {
metrics::ENCRYPTION_API_FAILURES.add(&metrics::CONTEXT, 1, &[]);
logger::error!("Encryption error {:?}", err);
logger::info!("Fall back to Application Encryption");
Self::batch_encrypt(masked_data, key, crypt_algo).await
}
}
}
}
#[allow(unused_variables)]
async fn batch_decrypt_via_api(
state: &KeyManagerState,
encrypted_data: FxHashMap<String, Encryption>,
identifier: Identifier,
key: &[u8],
crypt_algo: V,
) -> CustomResult<FxHashMap<String, Self>, errors::CryptoError> {
#[cfg(not(feature = "encryption_service"))]
{
Self::batch_decrypt(encrypted_data, key, crypt_algo).await
}
#[cfg(feature = "encryption_service")]
{
let result: Result<
BatchDecryptDataResponse,
error_stack::Report<errors::KeyManagerClientError>,
> = call_encryption_service(
state,
Method::POST,
"data/decrypt",
TransientBatchDecryptDataRequest::from((encrypted_data.clone(), identifier)),
)
.await;
let decrypted = match result {
Ok(response) => {
ForeignTryFrom::foreign_try_from((encrypted_data.clone(), response))
}
Err(err) => {
logger::error!("Decryption error {:?}", err);
Err(err.change_context(errors::CryptoError::DecodingFailed))
}
};
match decrypted {
Ok(de) => Ok(de),
Err(_) => {
metrics::DECRYPTION_API_FAILURES.add(&metrics::CONTEXT, 1, &[]);
logger::info!("Fall back to Application Decryption");
Self::batch_decrypt(encrypted_data, key, crypt_algo).await
}
}
}
}
async fn batch_encrypt(
masked_data: FxHashMap<String, Secret<Vec<u8>, S>>,
key: &[u8],
crypt_algo: V,
) -> CustomResult<FxHashMap<String, Self>, errors::CryptoError> {
metrics::APPLICATION_ENCRYPTION_COUNT.add(&metrics::CONTEXT, 1, &[]);
masked_data
.into_iter()
.map(|(k, v)| {
Ok((
k,
Self::new(v.clone(), crypt_algo.encode_message(key, v.peek())?.into()),
))
})
.collect()
}
async fn batch_decrypt(
encrypted_data: FxHashMap<String, Encryption>,
key: &[u8],
crypt_algo: V,
) -> CustomResult<FxHashMap<String, Self>, errors::CryptoError> {
metrics::APPLICATION_DECRYPTION_COUNT.add(&metrics::CONTEXT, 1, &[]);
encrypted_data
.into_iter()
.map(|(k, v)| {
Ok((
k,
Self::new(
crypt_algo
.decode_message(key, v.clone().into_inner().clone())?
.into(),
v.into_inner(),
),
))
})
.collect()
}
}
pub trait Lift<U> {
@ -178,7 +841,9 @@ impl<U, V: Lift<U> + Lift<U, SelfWrapper<U> = V> + Send> AsyncLift<U> for V {
#[inline]
pub async fn encrypt<E: Clone, S>(
state: &KeyManagerState,
inner: Secret<E, S>,
identifier: Identifier,
key: &[u8],
) -> CustomResult<crypto::Encryptable<Secret<E, S>>, errors::CryptoError>
where
@ -186,7 +851,7 @@ where
crypto::Encryptable<Secret<E, S>>: TypeEncryption<E, crypto::GcmAes256, S>,
{
record_operation_time(
crypto::Encryptable::encrypt(inner, key, crypto::GcmAes256),
crypto::Encryptable::encrypt_via_api(state, inner, identifier, key, crypto::GcmAes256),
&metrics::ENCRYPTION_TIME,
&metrics::CONTEXT,
&[],
@ -194,9 +859,41 @@ where
.await
}
#[inline]
pub async fn batch_encrypt<E: Clone, S>(
state: &KeyManagerState,
inner: FxHashMap<String, Secret<E, S>>,
identifier: Identifier,
key: &[u8],
) -> CustomResult<FxHashMap<String, crypto::Encryptable<Secret<E, S>>>, errors::CryptoError>
where
S: masking::Strategy<E>,
crypto::Encryptable<Secret<E, S>>: TypeEncryption<E, crypto::GcmAes256, S>,
{
if !inner.is_empty() {
record_operation_time(
crypto::Encryptable::batch_encrypt_via_api(
state,
inner,
identifier,
key,
crypto::GcmAes256,
),
&metrics::ENCRYPTION_TIME,
&metrics::CONTEXT,
&[],
)
.await
} else {
Ok(FxHashMap::default())
}
}
#[inline]
pub async fn encrypt_optional<E: Clone, S>(
state: &KeyManagerState,
inner: Option<Secret<E, S>>,
identifier: Identifier,
key: &[u8],
) -> CustomResult<Option<crypto::Encryptable<Secret<E, S>>>, errors::CryptoError>
where
@ -204,19 +901,26 @@ where
S: masking::Strategy<E>,
crypto::Encryptable<Secret<E, S>>: TypeEncryption<E, crypto::GcmAes256, S>,
{
inner.async_map(|f| encrypt(f, key)).await.transpose()
inner
.async_map(|f| encrypt(state, f, identifier, key))
.await
.transpose()
}
#[inline]
pub async fn decrypt<T: Clone, S: masking::Strategy<T>>(
state: &KeyManagerState,
inner: Option<Encryption>,
identifier: Identifier,
key: &[u8],
) -> CustomResult<Option<crypto::Encryptable<Secret<T, S>>>, errors::CryptoError>
where
crypto::Encryptable<Secret<T, S>>: TypeEncryption<T, crypto::GcmAes256, S>,
{
record_operation_time(
inner.async_map(|item| crypto::Encryptable::decrypt(item, key, crypto::GcmAes256)),
inner.async_map(|item| {
crypto::Encryptable::decrypt_via_api(state, item, identifier, key, crypto::GcmAes256)
}),
&metrics::DECRYPTION_TIME,
&metrics::CONTEXT,
&[],
@ -225,8 +929,38 @@ where
.transpose()
}
#[inline]
pub async fn batch_decrypt<E: Clone, S>(
state: &KeyManagerState,
inner: FxHashMap<String, Encryption>,
identifier: Identifier,
key: &[u8],
) -> CustomResult<FxHashMap<String, crypto::Encryptable<Secret<E, S>>>, errors::CryptoError>
where
S: masking::Strategy<E>,
crypto::Encryptable<Secret<E, S>>: TypeEncryption<E, crypto::GcmAes256, S>,
{
if !inner.is_empty() {
record_operation_time(
crypto::Encryptable::batch_decrypt_via_api(
state,
inner,
identifier,
key,
crypto::GcmAes256,
),
&metrics::ENCRYPTION_TIME,
&metrics::CONTEXT,
&[],
)
.await
} else {
Ok(FxHashMap::default())
}
}
pub(crate) mod metrics {
use router_env::{global_meter, histogram_metric, metrics_context, once_cell};
use router_env::{counter_metric, global_meter, histogram_metric, metrics_context, once_cell};
metrics_context!(CONTEXT);
global_meter!(GLOBAL_METER, "ROUTER_API");
@ -234,4 +968,8 @@ pub(crate) mod metrics {
// Encryption and Decryption metrics
histogram_metric!(ENCRYPTION_TIME, GLOBAL_METER);
histogram_metric!(DECRYPTION_TIME, GLOBAL_METER);
counter_metric!(ENCRYPTION_API_FAILURES, GLOBAL_METER);
counter_metric!(DECRYPTION_API_FAILURES, GLOBAL_METER);
counter_metric!(APPLICATION_ENCRYPTION_COUNT, GLOBAL_METER);
counter_metric!(APPLICATION_DECRYPTION_COUNT, GLOBAL_METER);
}