feat(email): implement process_tracker for scheduling email when api_key is about to expire (#1233)

Co-authored-by: Abhishek Marrivagu <abhi.codes10@gmail.com>
This commit is contained in:
Chethan Rao
2023-07-04 12:01:22 +05:30
committed by GitHub
parent 4808af3750
commit ee7cdef107
10 changed files with 457 additions and 17 deletions

View File

@ -8,7 +8,10 @@ use masking::{ExposeInterface, Secret, Strategy};
use quick_xml::de;
use serde::{Deserialize, Serialize};
use crate::errors::{self, CustomResult};
use crate::{
crypto,
errors::{self, CustomResult},
};
///
/// Encode interface
@ -254,6 +257,15 @@ where
}
}
impl<E: ValueExt + Clone> ValueExt for crypto::Encryptable<E> {
fn parse_value<T>(self, type_name: &'static str) -> CustomResult<T, errors::ParsingError>
where
T: serde::de::DeserializeOwned,
{
self.into_inner().parse_value(type_name)
}
}
///
/// Extending functionalities of `String` for performing parsing
///

View File

@ -136,3 +136,22 @@ impl Default for super::settings::DrainerSettings {
}
}
}
#[allow(clippy::derivable_impls)]
impl Default for super::settings::ApiKeys {
fn default() -> Self {
Self {
#[cfg(feature = "kms")]
kms_encrypted_hash_key: String::new(),
/// Hex-encoded 32-byte long (64 characters long when hex-encoded) key used for calculating
/// hashes of API keys
#[cfg(not(feature = "kms"))]
hash_key: String::new(),
// Specifies the number of days before API key expiry when email reminders should be sent
#[cfg(feature = "email")]
expiry_reminder_days: vec![7, 3, 1],
}
}
}

View File

@ -488,7 +488,7 @@ pub struct WebhooksSettings {
pub outgoing_enabled: bool,
}
#[derive(Debug, Deserialize, Clone, Default)]
#[derive(Debug, Deserialize, Clone)]
#[serde(default)]
pub struct ApiKeys {
/// Base64-encoded (KMS encrypted) ciphertext of the key used for calculating hashes of API
@ -500,6 +500,10 @@ pub struct ApiKeys {
/// hashes of API keys
#[cfg(not(feature = "kms"))]
pub hash_key: String,
// Specifies the number of days before API key expiry when email reminders should be sent
#[cfg(feature = "email")]
pub expiry_reminder_days: Vec<u8>,
}
#[cfg(feature = "s3")]

View File

@ -4,18 +4,29 @@ use error_stack::{report, IntoReport, ResultExt};
use external_services::kms;
use masking::{PeekInterface, StrongSecret};
use router_env::{instrument, tracing};
#[cfg(feature = "email")]
use storage_models::{api_keys::ApiKey, enums as storage_enums};
#[cfg(feature = "email")]
use crate::types::storage::enums;
use crate::{
configs::settings,
consts,
core::errors::{self, RouterResponse, StorageErrorExt},
db::StorageInterface,
routes::metrics,
routes::{metrics, AppState},
services::ApplicationResponse,
types::{api, storage, transformers::ForeignInto},
utils,
};
#[cfg(feature = "email")]
const API_KEY_EXPIRY_TAG: &str = "API_KEY";
#[cfg(feature = "email")]
const API_KEY_EXPIRY_NAME: &str = "API_KEY_EXPIRY";
#[cfg(feature = "email")]
const API_KEY_EXPIRY_RUNNER: &str = "API_KEY_EXPIRY_WORKFLOW";
static HASH_KEY: tokio::sync::OnceCell<StrongSecret<[u8; PlaintextApiKey::HASH_KEY_LEN]>> =
tokio::sync::OnceCell::const_new();
@ -117,12 +128,13 @@ impl PlaintextApiKey {
#[instrument(skip_all)]
pub async fn create_api_key(
store: &dyn StorageInterface,
state: &AppState,
api_key_config: &settings::ApiKeys,
#[cfg(feature = "kms")] kms_config: &kms::KmsConfig,
api_key: api::CreateApiKeyRequest,
merchant_id: String,
) -> RouterResponse<api::CreateApiKeyResponse> {
let store = &*state.store;
let hash_key = get_hash_key(
api_key_config,
#[cfg(feature = "kms")]
@ -154,11 +166,91 @@ pub async fn create_api_key(
&[metrics::request::add_attributes("merchant", merchant_id)],
);
// Add process to process_tracker for email reminder, only if expiry is set to future date
#[cfg(feature = "email")]
{
if api_key.expires_at.is_some() {
let expiry_reminder_days = state.conf.api_keys.expiry_reminder_days.clone();
add_api_key_expiry_task(store, &api_key, expiry_reminder_days)
.await
.into_report()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to insert API key expiry reminder to process tracker")?;
}
}
Ok(ApplicationResponse::Json(
(api_key, plaintext_api_key).foreign_into(),
))
}
// Add api_key_expiry task to the process_tracker table.
// Construct ProcessTrackerNew struct with all required fields, and schedule the first email.
// After first email has been sent, update the schedule_time based on retry_count in execute_workflow().
#[cfg(feature = "email")]
#[instrument(skip_all)]
pub async fn add_api_key_expiry_task(
store: &dyn StorageInterface,
api_key: &ApiKey,
expiry_reminder_days: Vec<u8>,
) -> Result<(), errors::ProcessTrackerError> {
let current_time = common_utils::date_time::now();
let api_key_expiry_tracker = &storage::ApiKeyExpiryWorkflow {
key_id: api_key.key_id.clone(),
merchant_id: api_key.merchant_id.clone(),
// We need API key expiry too, because we need to decide on the schedule_time in
// execute_workflow() where we won't be having access to the Api key object.
api_key_expiry: api_key.expires_at,
expiry_reminder_days: expiry_reminder_days.clone(),
};
let api_key_expiry_workflow_model = serde_json::to_value(api_key_expiry_tracker)
.into_report()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable_lazy(|| {
format!("unable to serialize API key expiry tracker: {api_key_expiry_tracker:?}")
})?;
let schedule_time = expiry_reminder_days
.first()
.and_then(|expiry_reminder_day| {
api_key.expires_at.map(|expires_at| {
expires_at.saturating_sub(time::Duration::days(i64::from(*expiry_reminder_day)))
})
});
let process_tracker_entry = storage::ProcessTrackerNew {
id: generate_task_id_for_api_key_expiry_workflow(api_key.key_id.as_str()),
name: Some(String::from(API_KEY_EXPIRY_NAME)),
tag: vec![String::from(API_KEY_EXPIRY_TAG)],
runner: Some(String::from(API_KEY_EXPIRY_RUNNER)),
// Retry count specifies, number of times the current process (email) has been retried.
// It also acts as an index of expiry_reminder_days vector
retry_count: 0,
schedule_time,
rule: String::new(),
tracking_data: api_key_expiry_workflow_model,
business_status: String::from("Pending"),
status: enums::ProcessTrackerStatus::New,
event: vec![],
created_at: current_time,
updated_at: current_time,
};
store
.insert_process(process_tracker_entry)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable_lazy(|| {
format!(
"Failed while inserting API key expiry reminder to process_tracker: api_key_id: {}",
api_key_expiry_tracker.key_id
)
})?;
Ok(())
}
#[instrument(skip_all)]
pub async fn retrieve_api_key(
store: &dyn StorageInterface,
@ -177,11 +269,13 @@ pub async fn retrieve_api_key(
#[instrument(skip_all)]
pub async fn update_api_key(
store: &dyn StorageInterface,
state: &AppState,
merchant_id: &str,
key_id: &str,
api_key: api::UpdateApiKeyRequest,
) -> RouterResponse<api::RetrieveApiKeyResponse> {
let store = &*state.store;
let api_key = store
.update_api_key(
merchant_id.to_owned(),
@ -191,15 +285,124 @@ pub async fn update_api_key(
.await
.to_not_found_response(errors::ApiErrorResponse::ApiKeyNotFound)?;
#[cfg(feature = "email")]
{
let expiry_reminder_days = state.conf.api_keys.expiry_reminder_days.clone();
let task_id = generate_task_id_for_api_key_expiry_workflow(key_id);
// In order to determine how to update the existing process in the process_tracker table,
// we need access to the current entry in the table.
let existing_process_tracker_task = store
.find_process_by_id(task_id.as_str())
.await
.change_context(errors::ApiErrorResponse::InternalServerError) // If retrieve failed
.attach_printable(
"Failed to retrieve API key expiry reminder task from process tracker",
)?;
// If process exist
if existing_process_tracker_task.is_some() {
if api_key.expires_at.is_some() {
// Process exist in process, update the process with new schedule_time
update_api_key_expiry_task(store, &api_key, expiry_reminder_days)
.await
.into_report()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable(
"Failed to update API key expiry reminder task in process tracker",
)?;
}
// If an expiry is set to 'never'
else {
// Process exist in process, revoke it
revoke_api_key_expiry_task(store, key_id)
.await
.into_report()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable(
"Failed to revoke API key expiry reminder task in process tracker",
)?;
}
}
// This case occurs if the expiry for an API key is set to 'never' during its creation. If so,
// process in tracker was not created.
else if api_key.expires_at.is_some() {
// Process doesn't exist in process_tracker table, so create new entry with
// schedule_time based on new expiry set.
add_api_key_expiry_task(store, &api_key, expiry_reminder_days)
.await
.into_report()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable(
"Failed to insert API key expiry reminder task to process tracker",
)?;
}
}
Ok(ApplicationResponse::Json(api_key.foreign_into()))
}
// Update api_key_expiry task in the process_tracker table.
// Construct Update variant of ProcessTrackerUpdate with new tracking_data.
#[cfg(feature = "email")]
#[instrument(skip_all)]
pub async fn update_api_key_expiry_task(
store: &dyn StorageInterface,
api_key: &ApiKey,
expiry_reminder_days: Vec<u8>,
) -> Result<(), errors::ProcessTrackerError> {
let current_time = common_utils::date_time::now();
let task_id = generate_task_id_for_api_key_expiry_workflow(api_key.key_id.as_str());
let task_ids = vec![task_id.clone()];
let schedule_time = expiry_reminder_days
.first()
.and_then(|expiry_reminder_day| {
api_key.expires_at.map(|expires_at| {
expires_at.saturating_sub(time::Duration::days(i64::from(*expiry_reminder_day)))
})
});
let updated_tracking_data = &storage::ApiKeyExpiryWorkflow {
key_id: api_key.key_id.clone(),
merchant_id: api_key.merchant_id.clone(),
api_key_expiry: api_key.expires_at,
expiry_reminder_days,
};
let updated_api_key_expiry_workflow_model = serde_json::to_value(updated_tracking_data)
.into_report()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable_lazy(|| {
format!("unable to serialize API key expiry tracker: {updated_tracking_data:?}")
})?;
let updated_process_tracker_data = storage::ProcessTrackerUpdate::Update {
name: None,
retry_count: Some(0),
schedule_time,
tracking_data: Some(updated_api_key_expiry_workflow_model),
business_status: Some("Pending".to_string()),
status: Some(storage_enums::ProcessTrackerStatus::New),
updated_at: Some(current_time),
};
store
.process_tracker_update_process_status_by_ids(task_ids, updated_process_tracker_data)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)?;
Ok(())
}
#[instrument(skip_all)]
pub async fn revoke_api_key(
store: &dyn StorageInterface,
state: &AppState,
merchant_id: &str,
key_id: &str,
) -> RouterResponse<api::RevokeApiKeyResponse> {
let store = &*state.store;
let revoked = store
.revoke_api_key(merchant_id, key_id)
.await
@ -207,6 +410,31 @@ pub async fn revoke_api_key(
metrics::API_KEY_REVOKED.add(&metrics::CONTEXT, 1, &[]);
#[cfg(feature = "email")]
{
let task_id = generate_task_id_for_api_key_expiry_workflow(key_id);
// In order to determine how to update the existing process in the process_tracker table,
// we need access to the current entry in the table.
let existing_process_tracker_task = store
.find_process_by_id(task_id.as_str())
.await
.change_context(errors::ApiErrorResponse::InternalServerError) // If retrieve failed
.attach_printable(
"Failed to retrieve API key expiry reminder task from process tracker",
)?;
// If process exist, then revoke it
if existing_process_tracker_task.is_some() {
revoke_api_key_expiry_task(store, key_id)
.await
.into_report()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable(
"Failed to revoke API key expiry reminder task in process tracker",
)?;
}
}
Ok(ApplicationResponse::Json(api::RevokeApiKeyResponse {
merchant_id: merchant_id.to_owned(),
key_id: key_id.to_owned(),
@ -214,6 +442,29 @@ pub async fn revoke_api_key(
}))
}
// Function to revoke api_key_expiry task in the process_tracker table when API key is revoked.
// Construct StatusUpdate variant of ProcessTrackerUpdate by setting status to 'finish'.
#[cfg(feature = "email")]
#[instrument(skip_all)]
pub async fn revoke_api_key_expiry_task(
store: &dyn StorageInterface,
key_id: &str,
) -> Result<(), errors::ProcessTrackerError> {
let task_id = generate_task_id_for_api_key_expiry_workflow(key_id);
let task_ids = vec![task_id];
let updated_process_tracker_data = storage::ProcessTrackerUpdate::StatusUpdate {
status: storage_enums::ProcessTrackerStatus::Finish,
business_status: Some("Revoked".to_string()),
};
store
.process_tracker_update_process_status_by_ids(task_ids, updated_process_tracker_data)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)?;
Ok(())
}
#[instrument(skip_all)]
pub async fn list_api_keys(
store: &dyn StorageInterface,
@ -234,6 +485,11 @@ pub async fn list_api_keys(
Ok(ApplicationResponse::Json(api_keys))
}
#[cfg(feature = "email")]
fn generate_task_id_for_api_key_expiry_workflow(key_id: &str) -> String {
format!("{API_KEY_EXPIRY_RUNNER}_{API_KEY_EXPIRY_NAME}_{key_id}")
}
impl From<&str> for PlaintextApiKey {
fn from(s: &str) -> Self {
Self(s.to_owned().into())

View File

@ -387,6 +387,8 @@ pub enum ProcessTrackerError {
EParsingError(error_stack::Report<ParsingError>),
#[error("Validation Error Received: {0}")]
EValidationError(error_stack::Report<ValidationError>),
#[error("Type Conversion error")]
TypeConversionError,
}
macro_rules! error_to_process_tracker_error {

View File

@ -43,7 +43,7 @@ pub async fn api_key_create(
payload,
|state, _, payload| async {
api_keys::create_api_key(
&*state.store,
state,
&state.conf.api_keys,
#[cfg(feature = "kms")]
&state.conf.kms,
@ -133,7 +133,7 @@ pub async fn api_key_update(
&req,
(&merchant_id, &key_id, payload),
|state, _, (merchant_id, key_id, payload)| {
api_keys::update_api_key(&*state.store, merchant_id, key_id, payload)
api_keys::update_api_key(state, merchant_id, key_id, payload)
},
&auth::AdminApiAuth,
)
@ -173,9 +173,7 @@ pub async fn api_key_revoke(
state.get_ref(),
&req,
(&merchant_id, &key_id),
|state, _, (merchant_id, key_id)| {
api_keys::revoke_api_key(&*state.store, merchant_id, key_id)
},
|state, _, (merchant_id, key_id)| api_keys::revoke_api_key(state, merchant_id, key_id),
&auth::AdminApiAuth,
)
.await

View File

@ -8,22 +8,26 @@ use crate::{
types::storage,
utils::{OptionExt, StringExt},
};
#[cfg(feature = "email")]
pub mod api_key_expiry;
pub mod payment_sync;
pub mod refund_router;
pub mod tokenized_data;
macro_rules! runners {
($($body:tt),*) => {
($(#[$attr:meta] $body:tt),*) => {
as_item! {
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, EnumString)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
pub enum PTRunner {
$($body),*
$(#[$attr] $body),*
}
}
$( as_item! {
#[$attr]
pub struct $body;
} )*
@ -32,7 +36,7 @@ macro_rules! runners {
let runner = task.runner.clone().get_required_value("runner")?;
let runner: Option<PTRunner> = runner.parse_enum("PTRunner").ok();
Ok(match runner {
$( Some( PTRunner::$body ) => {
$( #[$attr] Some( PTRunner::$body ) => {
Some(Box::new($body))
} ,)*
None => {
@ -50,9 +54,10 @@ macro_rules! as_item {
}
runners! {
PaymentsSyncWorkflow,
RefundWorkflowRouter,
DeleteTokenizeDataWorkflow
#[cfg(all())] PaymentsSyncWorkflow,
#[cfg(all())] RefundWorkflowRouter,
#[cfg(all())] DeleteTokenizeDataWorkflow,
#[cfg(feature = "email")] ApiKeyExpiryWorkflow
}
pub type WorkflowSelectorFn =

View File

@ -0,0 +1,131 @@
use common_utils::ext_traits::ValueExt;
use storage_models::enums::{self as storage_enums};
use super::{ApiKeyExpiryWorkflow, ProcessTrackerWorkflow};
use crate::{
errors,
logger::error,
routes::AppState,
types::{
api,
storage::{self, ProcessTrackerExt},
},
utils::OptionExt,
};
#[async_trait::async_trait]
impl ProcessTrackerWorkflow for ApiKeyExpiryWorkflow {
async fn execute_workflow<'a>(
&'a self,
state: &'a AppState,
process: storage::ProcessTracker,
) -> Result<(), errors::ProcessTrackerError> {
let db = &*state.store;
let tracking_data: storage::ApiKeyExpiryWorkflow = process
.tracking_data
.clone()
.parse_value("ApiKeyExpiryWorkflow")?;
let key_store = state
.store
.get_merchant_key_store_by_merchant_id(
tracking_data.merchant_id.as_str(),
&state.store.get_master_key().to_vec().into(),
)
.await?;
let merchant_account = db
.find_merchant_account_by_merchant_id(tracking_data.merchant_id.as_str(), &key_store)
.await?;
let email_id = merchant_account
.merchant_details
.parse_value::<api::MerchantDetails>("MerchantDetails")?
.primary_email;
let task_id = process.id.clone();
let retry_count = process.retry_count;
let expires_in = tracking_data
.expiry_reminder_days
.get(
usize::try_from(retry_count)
.map_err(|_| errors::ProcessTrackerError::TypeConversionError)?,
)
.ok_or(errors::ProcessTrackerError::EApiErrorResponse(
errors::ApiErrorResponse::InvalidDataValue {
field_name: "index",
}
.into(),
))?;
state
.email_client
.send_email(
email_id.ok_or_else(|| errors::ProcessTrackerError::MissingRequiredField)?,
"API Key Expiry Notice".to_string(),
format!("Dear Merchant,\n
It has come to our attention that your API key will expire in {expires_in} days. To ensure uninterrupted access to our platform and continued smooth operation of your services, we kindly request that you take the necessary actions as soon as possible.\n\n
Thanks,\n
Team Hyperswitch"),
)
.await
.map_err(|_| errors::ProcessTrackerError::FlowExecutionError {
flow: "ApiKeyExpiryWorkflow",
})?;
// If all the mails have been sent, then retry_count would be equal to length of the expiry_reminder_days vector
if retry_count
== i32::try_from(tracking_data.expiry_reminder_days.len() - 1)
.map_err(|_| errors::ProcessTrackerError::TypeConversionError)?
{
process
.finish_with_status(db, format!("COMPLETED_BY_PT_{task_id}"))
.await?
}
// If tasks are remaining that has to be scheduled
else {
let expiry_reminder_day = tracking_data
.expiry_reminder_days
.get(
usize::try_from(retry_count + 1)
.map_err(|_| errors::ProcessTrackerError::TypeConversionError)?,
)
.ok_or(errors::ProcessTrackerError::EApiErrorResponse(
errors::ApiErrorResponse::InvalidDataValue {
field_name: "index",
}
.into(),
))?;
let updated_schedule_time = tracking_data.api_key_expiry.map(|api_key_expiry| {
api_key_expiry.saturating_sub(time::Duration::days(i64::from(*expiry_reminder_day)))
});
let updated_process_tracker_data = storage::ProcessTrackerUpdate::Update {
name: None,
retry_count: Some(retry_count + 1),
schedule_time: updated_schedule_time,
tracking_data: None,
business_status: None,
status: Some(storage_enums::ProcessTrackerStatus::New),
updated_at: Some(common_utils::date_time::now()),
};
let task_ids = vec![task_id];
db.process_tracker_update_process_status_by_ids(task_ids, updated_process_tracker_data)
.await?;
}
Ok(())
}
async fn error_handler<'a>(
&'a self,
_state: &'a AppState,
process: storage::ProcessTracker,
_error: errors::ProcessTrackerError,
) -> errors::CustomResult<(), errors::ProcessTrackerError> {
error!(%process.id, "Failed while executing workflow");
Ok(())
}
}

View File

@ -1 +1,3 @@
#[cfg(feature = "email")]
pub use storage_models::api_keys::ApiKeyExpiryWorkflow;
pub use storage_models::api_keys::{ApiKey, ApiKeyNew, ApiKeyUpdate, HashedApiKey};

View File

@ -1,4 +1,5 @@
use diesel::{AsChangeset, AsExpression, Identifiable, Insertable, Queryable};
use serde::{Deserialize, Serialize};
use time::PrimitiveDateTime;
use crate::schema::api_keys;
@ -134,3 +135,13 @@ mod diesel_impl {
}
}
}
// Tracking data by process_tracker
#[derive(Default, Debug, Deserialize, Serialize, Clone)]
pub struct ApiKeyExpiryWorkflow {
pub key_id: String,
pub merchant_id: String,
pub api_key_expiry: Option<PrimitiveDateTime>,
// Days on which email reminder about api_key expiry has to be sent, prior to it's expiry.
pub expiry_reminder_days: Vec<u8>,
}