diff --git a/crates/common_utils/src/ext_traits.rs b/crates/common_utils/src/ext_traits.rs index 82553f41bc..fec0446b16 100644 --- a/crates/common_utils/src/ext_traits.rs +++ b/crates/common_utils/src/ext_traits.rs @@ -8,7 +8,10 @@ use masking::{ExposeInterface, Secret, Strategy}; use quick_xml::de; use serde::{Deserialize, Serialize}; -use crate::errors::{self, CustomResult}; +use crate::{ + crypto, + errors::{self, CustomResult}, +}; /// /// Encode interface @@ -254,6 +257,15 @@ where } } +impl ValueExt for crypto::Encryptable { + fn parse_value(self, type_name: &'static str) -> CustomResult + where + T: serde::de::DeserializeOwned, + { + self.into_inner().parse_value(type_name) + } +} + /// /// Extending functionalities of `String` for performing parsing /// diff --git a/crates/router/src/configs/defaults.rs b/crates/router/src/configs/defaults.rs index fc2cec0546..7fae1d65ac 100644 --- a/crates/router/src/configs/defaults.rs +++ b/crates/router/src/configs/defaults.rs @@ -136,3 +136,22 @@ impl Default for super::settings::DrainerSettings { } } } + +#[allow(clippy::derivable_impls)] +impl Default for super::settings::ApiKeys { + fn default() -> Self { + Self { + #[cfg(feature = "kms")] + kms_encrypted_hash_key: String::new(), + + /// Hex-encoded 32-byte long (64 characters long when hex-encoded) key used for calculating + /// hashes of API keys + #[cfg(not(feature = "kms"))] + hash_key: String::new(), + + // Specifies the number of days before API key expiry when email reminders should be sent + #[cfg(feature = "email")] + expiry_reminder_days: vec![7, 3, 1], + } + } +} diff --git a/crates/router/src/configs/settings.rs b/crates/router/src/configs/settings.rs index 1d2ca3f501..c494ee0a7c 100644 --- a/crates/router/src/configs/settings.rs +++ b/crates/router/src/configs/settings.rs @@ -488,7 +488,7 @@ pub struct WebhooksSettings { pub outgoing_enabled: bool, } -#[derive(Debug, Deserialize, Clone, Default)] +#[derive(Debug, Deserialize, Clone)] #[serde(default)] pub struct ApiKeys { /// Base64-encoded (KMS encrypted) ciphertext of the key used for calculating hashes of API @@ -500,6 +500,10 @@ pub struct ApiKeys { /// hashes of API keys #[cfg(not(feature = "kms"))] pub hash_key: String, + + // Specifies the number of days before API key expiry when email reminders should be sent + #[cfg(feature = "email")] + pub expiry_reminder_days: Vec, } #[cfg(feature = "s3")] diff --git a/crates/router/src/core/api_keys.rs b/crates/router/src/core/api_keys.rs index 51805b95b4..b13aaea904 100644 --- a/crates/router/src/core/api_keys.rs +++ b/crates/router/src/core/api_keys.rs @@ -4,18 +4,29 @@ use error_stack::{report, IntoReport, ResultExt}; use external_services::kms; use masking::{PeekInterface, StrongSecret}; use router_env::{instrument, tracing}; +#[cfg(feature = "email")] +use storage_models::{api_keys::ApiKey, enums as storage_enums}; +#[cfg(feature = "email")] +use crate::types::storage::enums; use crate::{ configs::settings, consts, core::errors::{self, RouterResponse, StorageErrorExt}, db::StorageInterface, - routes::metrics, + routes::{metrics, AppState}, services::ApplicationResponse, types::{api, storage, transformers::ForeignInto}, utils, }; +#[cfg(feature = "email")] +const API_KEY_EXPIRY_TAG: &str = "API_KEY"; +#[cfg(feature = "email")] +const API_KEY_EXPIRY_NAME: &str = "API_KEY_EXPIRY"; +#[cfg(feature = "email")] +const API_KEY_EXPIRY_RUNNER: &str = "API_KEY_EXPIRY_WORKFLOW"; + static HASH_KEY: tokio::sync::OnceCell> = tokio::sync::OnceCell::const_new(); @@ -117,12 +128,13 @@ impl PlaintextApiKey { #[instrument(skip_all)] pub async fn create_api_key( - store: &dyn StorageInterface, + state: &AppState, api_key_config: &settings::ApiKeys, #[cfg(feature = "kms")] kms_config: &kms::KmsConfig, api_key: api::CreateApiKeyRequest, merchant_id: String, ) -> RouterResponse { + let store = &*state.store; let hash_key = get_hash_key( api_key_config, #[cfg(feature = "kms")] @@ -154,11 +166,91 @@ pub async fn create_api_key( &[metrics::request::add_attributes("merchant", merchant_id)], ); + // Add process to process_tracker for email reminder, only if expiry is set to future date + #[cfg(feature = "email")] + { + if api_key.expires_at.is_some() { + let expiry_reminder_days = state.conf.api_keys.expiry_reminder_days.clone(); + + add_api_key_expiry_task(store, &api_key, expiry_reminder_days) + .await + .into_report() + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to insert API key expiry reminder to process tracker")?; + } + } + Ok(ApplicationResponse::Json( (api_key, plaintext_api_key).foreign_into(), )) } +// Add api_key_expiry task to the process_tracker table. +// Construct ProcessTrackerNew struct with all required fields, and schedule the first email. +// After first email has been sent, update the schedule_time based on retry_count in execute_workflow(). +#[cfg(feature = "email")] +#[instrument(skip_all)] +pub async fn add_api_key_expiry_task( + store: &dyn StorageInterface, + api_key: &ApiKey, + expiry_reminder_days: Vec, +) -> Result<(), errors::ProcessTrackerError> { + let current_time = common_utils::date_time::now(); + let api_key_expiry_tracker = &storage::ApiKeyExpiryWorkflow { + key_id: api_key.key_id.clone(), + merchant_id: api_key.merchant_id.clone(), + // We need API key expiry too, because we need to decide on the schedule_time in + // execute_workflow() where we won't be having access to the Api key object. + api_key_expiry: api_key.expires_at, + expiry_reminder_days: expiry_reminder_days.clone(), + }; + let api_key_expiry_workflow_model = serde_json::to_value(api_key_expiry_tracker) + .into_report() + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable_lazy(|| { + format!("unable to serialize API key expiry tracker: {api_key_expiry_tracker:?}") + })?; + + let schedule_time = expiry_reminder_days + .first() + .and_then(|expiry_reminder_day| { + api_key.expires_at.map(|expires_at| { + expires_at.saturating_sub(time::Duration::days(i64::from(*expiry_reminder_day))) + }) + }); + + let process_tracker_entry = storage::ProcessTrackerNew { + id: generate_task_id_for_api_key_expiry_workflow(api_key.key_id.as_str()), + name: Some(String::from(API_KEY_EXPIRY_NAME)), + tag: vec![String::from(API_KEY_EXPIRY_TAG)], + runner: Some(String::from(API_KEY_EXPIRY_RUNNER)), + // Retry count specifies, number of times the current process (email) has been retried. + // It also acts as an index of expiry_reminder_days vector + retry_count: 0, + schedule_time, + rule: String::new(), + tracking_data: api_key_expiry_workflow_model, + business_status: String::from("Pending"), + status: enums::ProcessTrackerStatus::New, + event: vec![], + created_at: current_time, + updated_at: current_time, + }; + + store + .insert_process(process_tracker_entry) + .await + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable_lazy(|| { + format!( + "Failed while inserting API key expiry reminder to process_tracker: api_key_id: {}", + api_key_expiry_tracker.key_id + ) + })?; + + Ok(()) +} + #[instrument(skip_all)] pub async fn retrieve_api_key( store: &dyn StorageInterface, @@ -177,11 +269,13 @@ pub async fn retrieve_api_key( #[instrument(skip_all)] pub async fn update_api_key( - store: &dyn StorageInterface, + state: &AppState, merchant_id: &str, key_id: &str, api_key: api::UpdateApiKeyRequest, ) -> RouterResponse { + let store = &*state.store; + let api_key = store .update_api_key( merchant_id.to_owned(), @@ -191,15 +285,124 @@ pub async fn update_api_key( .await .to_not_found_response(errors::ApiErrorResponse::ApiKeyNotFound)?; + #[cfg(feature = "email")] + { + let expiry_reminder_days = state.conf.api_keys.expiry_reminder_days.clone(); + + let task_id = generate_task_id_for_api_key_expiry_workflow(key_id); + // In order to determine how to update the existing process in the process_tracker table, + // we need access to the current entry in the table. + let existing_process_tracker_task = store + .find_process_by_id(task_id.as_str()) + .await + .change_context(errors::ApiErrorResponse::InternalServerError) // If retrieve failed + .attach_printable( + "Failed to retrieve API key expiry reminder task from process tracker", + )?; + + // If process exist + if existing_process_tracker_task.is_some() { + if api_key.expires_at.is_some() { + // Process exist in process, update the process with new schedule_time + update_api_key_expiry_task(store, &api_key, expiry_reminder_days) + .await + .into_report() + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable( + "Failed to update API key expiry reminder task in process tracker", + )?; + } + // If an expiry is set to 'never' + else { + // Process exist in process, revoke it + revoke_api_key_expiry_task(store, key_id) + .await + .into_report() + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable( + "Failed to revoke API key expiry reminder task in process tracker", + )?; + } + } + // This case occurs if the expiry for an API key is set to 'never' during its creation. If so, + // process in tracker was not created. + else if api_key.expires_at.is_some() { + // Process doesn't exist in process_tracker table, so create new entry with + // schedule_time based on new expiry set. + add_api_key_expiry_task(store, &api_key, expiry_reminder_days) + .await + .into_report() + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable( + "Failed to insert API key expiry reminder task to process tracker", + )?; + } + } + Ok(ApplicationResponse::Json(api_key.foreign_into())) } +// Update api_key_expiry task in the process_tracker table. +// Construct Update variant of ProcessTrackerUpdate with new tracking_data. +#[cfg(feature = "email")] +#[instrument(skip_all)] +pub async fn update_api_key_expiry_task( + store: &dyn StorageInterface, + api_key: &ApiKey, + expiry_reminder_days: Vec, +) -> Result<(), errors::ProcessTrackerError> { + let current_time = common_utils::date_time::now(); + + let task_id = generate_task_id_for_api_key_expiry_workflow(api_key.key_id.as_str()); + + let task_ids = vec![task_id.clone()]; + + let schedule_time = expiry_reminder_days + .first() + .and_then(|expiry_reminder_day| { + api_key.expires_at.map(|expires_at| { + expires_at.saturating_sub(time::Duration::days(i64::from(*expiry_reminder_day))) + }) + }); + + let updated_tracking_data = &storage::ApiKeyExpiryWorkflow { + key_id: api_key.key_id.clone(), + merchant_id: api_key.merchant_id.clone(), + api_key_expiry: api_key.expires_at, + expiry_reminder_days, + }; + + let updated_api_key_expiry_workflow_model = serde_json::to_value(updated_tracking_data) + .into_report() + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable_lazy(|| { + format!("unable to serialize API key expiry tracker: {updated_tracking_data:?}") + })?; + + let updated_process_tracker_data = storage::ProcessTrackerUpdate::Update { + name: None, + retry_count: Some(0), + schedule_time, + tracking_data: Some(updated_api_key_expiry_workflow_model), + business_status: Some("Pending".to_string()), + status: Some(storage_enums::ProcessTrackerStatus::New), + updated_at: Some(current_time), + }; + store + .process_tracker_update_process_status_by_ids(task_ids, updated_process_tracker_data) + .await + .change_context(errors::ApiErrorResponse::InternalServerError)?; + + Ok(()) +} + #[instrument(skip_all)] pub async fn revoke_api_key( - store: &dyn StorageInterface, + state: &AppState, merchant_id: &str, key_id: &str, ) -> RouterResponse { + let store = &*state.store; let revoked = store .revoke_api_key(merchant_id, key_id) .await @@ -207,6 +410,31 @@ pub async fn revoke_api_key( metrics::API_KEY_REVOKED.add(&metrics::CONTEXT, 1, &[]); + #[cfg(feature = "email")] + { + let task_id = generate_task_id_for_api_key_expiry_workflow(key_id); + // In order to determine how to update the existing process in the process_tracker table, + // we need access to the current entry in the table. + let existing_process_tracker_task = store + .find_process_by_id(task_id.as_str()) + .await + .change_context(errors::ApiErrorResponse::InternalServerError) // If retrieve failed + .attach_printable( + "Failed to retrieve API key expiry reminder task from process tracker", + )?; + + // If process exist, then revoke it + if existing_process_tracker_task.is_some() { + revoke_api_key_expiry_task(store, key_id) + .await + .into_report() + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable( + "Failed to revoke API key expiry reminder task in process tracker", + )?; + } + } + Ok(ApplicationResponse::Json(api::RevokeApiKeyResponse { merchant_id: merchant_id.to_owned(), key_id: key_id.to_owned(), @@ -214,6 +442,29 @@ pub async fn revoke_api_key( })) } +// Function to revoke api_key_expiry task in the process_tracker table when API key is revoked. +// Construct StatusUpdate variant of ProcessTrackerUpdate by setting status to 'finish'. +#[cfg(feature = "email")] +#[instrument(skip_all)] +pub async fn revoke_api_key_expiry_task( + store: &dyn StorageInterface, + key_id: &str, +) -> Result<(), errors::ProcessTrackerError> { + let task_id = generate_task_id_for_api_key_expiry_workflow(key_id); + let task_ids = vec![task_id]; + let updated_process_tracker_data = storage::ProcessTrackerUpdate::StatusUpdate { + status: storage_enums::ProcessTrackerStatus::Finish, + business_status: Some("Revoked".to_string()), + }; + + store + .process_tracker_update_process_status_by_ids(task_ids, updated_process_tracker_data) + .await + .change_context(errors::ApiErrorResponse::InternalServerError)?; + + Ok(()) +} + #[instrument(skip_all)] pub async fn list_api_keys( store: &dyn StorageInterface, @@ -234,6 +485,11 @@ pub async fn list_api_keys( Ok(ApplicationResponse::Json(api_keys)) } +#[cfg(feature = "email")] +fn generate_task_id_for_api_key_expiry_workflow(key_id: &str) -> String { + format!("{API_KEY_EXPIRY_RUNNER}_{API_KEY_EXPIRY_NAME}_{key_id}") +} + impl From<&str> for PlaintextApiKey { fn from(s: &str) -> Self { Self(s.to_owned().into()) diff --git a/crates/router/src/core/errors.rs b/crates/router/src/core/errors.rs index d41fe74a4b..e615ef7128 100644 --- a/crates/router/src/core/errors.rs +++ b/crates/router/src/core/errors.rs @@ -387,6 +387,8 @@ pub enum ProcessTrackerError { EParsingError(error_stack::Report), #[error("Validation Error Received: {0}")] EValidationError(error_stack::Report), + #[error("Type Conversion error")] + TypeConversionError, } macro_rules! error_to_process_tracker_error { diff --git a/crates/router/src/routes/api_keys.rs b/crates/router/src/routes/api_keys.rs index 787aa9357a..27fb6a55b6 100644 --- a/crates/router/src/routes/api_keys.rs +++ b/crates/router/src/routes/api_keys.rs @@ -43,7 +43,7 @@ pub async fn api_key_create( payload, |state, _, payload| async { api_keys::create_api_key( - &*state.store, + state, &state.conf.api_keys, #[cfg(feature = "kms")] &state.conf.kms, @@ -133,7 +133,7 @@ pub async fn api_key_update( &req, (&merchant_id, &key_id, payload), |state, _, (merchant_id, key_id, payload)| { - api_keys::update_api_key(&*state.store, merchant_id, key_id, payload) + api_keys::update_api_key(state, merchant_id, key_id, payload) }, &auth::AdminApiAuth, ) @@ -173,9 +173,7 @@ pub async fn api_key_revoke( state.get_ref(), &req, (&merchant_id, &key_id), - |state, _, (merchant_id, key_id)| { - api_keys::revoke_api_key(&*state.store, merchant_id, key_id) - }, + |state, _, (merchant_id, key_id)| api_keys::revoke_api_key(state, merchant_id, key_id), &auth::AdminApiAuth, ) .await diff --git a/crates/router/src/scheduler/workflows.rs b/crates/router/src/scheduler/workflows.rs index edf38c84d9..1b0cc66186 100644 --- a/crates/router/src/scheduler/workflows.rs +++ b/crates/router/src/scheduler/workflows.rs @@ -8,22 +8,26 @@ use crate::{ types::storage, utils::{OptionExt, StringExt}, }; +#[cfg(feature = "email")] +pub mod api_key_expiry; + pub mod payment_sync; pub mod refund_router; pub mod tokenized_data; macro_rules! runners { - ($($body:tt),*) => { + ($(#[$attr:meta] $body:tt),*) => { as_item! { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, EnumString)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] #[strum(serialize_all = "SCREAMING_SNAKE_CASE")] pub enum PTRunner { - $($body),* + $(#[$attr] $body),* } } $( as_item! { + #[$attr] pub struct $body; } )* @@ -32,7 +36,7 @@ macro_rules! runners { let runner = task.runner.clone().get_required_value("runner")?; let runner: Option = runner.parse_enum("PTRunner").ok(); Ok(match runner { - $( Some( PTRunner::$body ) => { + $( #[$attr] Some( PTRunner::$body ) => { Some(Box::new($body)) } ,)* None => { @@ -50,9 +54,10 @@ macro_rules! as_item { } runners! { - PaymentsSyncWorkflow, - RefundWorkflowRouter, - DeleteTokenizeDataWorkflow + #[cfg(all())] PaymentsSyncWorkflow, + #[cfg(all())] RefundWorkflowRouter, + #[cfg(all())] DeleteTokenizeDataWorkflow, + #[cfg(feature = "email")] ApiKeyExpiryWorkflow } pub type WorkflowSelectorFn = diff --git a/crates/router/src/scheduler/workflows/api_key_expiry.rs b/crates/router/src/scheduler/workflows/api_key_expiry.rs new file mode 100644 index 0000000000..f8d7f40129 --- /dev/null +++ b/crates/router/src/scheduler/workflows/api_key_expiry.rs @@ -0,0 +1,131 @@ +use common_utils::ext_traits::ValueExt; +use storage_models::enums::{self as storage_enums}; + +use super::{ApiKeyExpiryWorkflow, ProcessTrackerWorkflow}; +use crate::{ + errors, + logger::error, + routes::AppState, + types::{ + api, + storage::{self, ProcessTrackerExt}, + }, + utils::OptionExt, +}; + +#[async_trait::async_trait] +impl ProcessTrackerWorkflow for ApiKeyExpiryWorkflow { + async fn execute_workflow<'a>( + &'a self, + state: &'a AppState, + process: storage::ProcessTracker, + ) -> Result<(), errors::ProcessTrackerError> { + let db = &*state.store; + let tracking_data: storage::ApiKeyExpiryWorkflow = process + .tracking_data + .clone() + .parse_value("ApiKeyExpiryWorkflow")?; + + let key_store = state + .store + .get_merchant_key_store_by_merchant_id( + tracking_data.merchant_id.as_str(), + &state.store.get_master_key().to_vec().into(), + ) + .await?; + + let merchant_account = db + .find_merchant_account_by_merchant_id(tracking_data.merchant_id.as_str(), &key_store) + .await?; + + let email_id = merchant_account + .merchant_details + .parse_value::("MerchantDetails")? + .primary_email; + + let task_id = process.id.clone(); + + let retry_count = process.retry_count; + + let expires_in = tracking_data + .expiry_reminder_days + .get( + usize::try_from(retry_count) + .map_err(|_| errors::ProcessTrackerError::TypeConversionError)?, + ) + .ok_or(errors::ProcessTrackerError::EApiErrorResponse( + errors::ApiErrorResponse::InvalidDataValue { + field_name: "index", + } + .into(), + ))?; + + state + .email_client + .send_email( + email_id.ok_or_else(|| errors::ProcessTrackerError::MissingRequiredField)?, + "API Key Expiry Notice".to_string(), + format!("Dear Merchant,\n +It has come to our attention that your API key will expire in {expires_in} days. To ensure uninterrupted access to our platform and continued smooth operation of your services, we kindly request that you take the necessary actions as soon as possible.\n\n +Thanks,\n +Team Hyperswitch"), + ) + .await + .map_err(|_| errors::ProcessTrackerError::FlowExecutionError { + flow: "ApiKeyExpiryWorkflow", + })?; + + // If all the mails have been sent, then retry_count would be equal to length of the expiry_reminder_days vector + if retry_count + == i32::try_from(tracking_data.expiry_reminder_days.len() - 1) + .map_err(|_| errors::ProcessTrackerError::TypeConversionError)? + { + process + .finish_with_status(db, format!("COMPLETED_BY_PT_{task_id}")) + .await? + } + // If tasks are remaining that has to be scheduled + else { + let expiry_reminder_day = tracking_data + .expiry_reminder_days + .get( + usize::try_from(retry_count + 1) + .map_err(|_| errors::ProcessTrackerError::TypeConversionError)?, + ) + .ok_or(errors::ProcessTrackerError::EApiErrorResponse( + errors::ApiErrorResponse::InvalidDataValue { + field_name: "index", + } + .into(), + ))?; + + let updated_schedule_time = tracking_data.api_key_expiry.map(|api_key_expiry| { + api_key_expiry.saturating_sub(time::Duration::days(i64::from(*expiry_reminder_day))) + }); + let updated_process_tracker_data = storage::ProcessTrackerUpdate::Update { + name: None, + retry_count: Some(retry_count + 1), + schedule_time: updated_schedule_time, + tracking_data: None, + business_status: None, + status: Some(storage_enums::ProcessTrackerStatus::New), + updated_at: Some(common_utils::date_time::now()), + }; + let task_ids = vec![task_id]; + db.process_tracker_update_process_status_by_ids(task_ids, updated_process_tracker_data) + .await?; + } + + Ok(()) + } + + async fn error_handler<'a>( + &'a self, + _state: &'a AppState, + process: storage::ProcessTracker, + _error: errors::ProcessTrackerError, + ) -> errors::CustomResult<(), errors::ProcessTrackerError> { + error!(%process.id, "Failed while executing workflow"); + Ok(()) + } +} diff --git a/crates/router/src/types/storage/api_keys.rs b/crates/router/src/types/storage/api_keys.rs index 1d74787eaa..d4c52a5550 100644 --- a/crates/router/src/types/storage/api_keys.rs +++ b/crates/router/src/types/storage/api_keys.rs @@ -1 +1,3 @@ +#[cfg(feature = "email")] +pub use storage_models::api_keys::ApiKeyExpiryWorkflow; pub use storage_models::api_keys::{ApiKey, ApiKeyNew, ApiKeyUpdate, HashedApiKey}; diff --git a/crates/storage_models/src/api_keys.rs b/crates/storage_models/src/api_keys.rs index 1644d14ecc..1df3cdd72d 100644 --- a/crates/storage_models/src/api_keys.rs +++ b/crates/storage_models/src/api_keys.rs @@ -1,4 +1,5 @@ use diesel::{AsChangeset, AsExpression, Identifiable, Insertable, Queryable}; +use serde::{Deserialize, Serialize}; use time::PrimitiveDateTime; use crate::schema::api_keys; @@ -134,3 +135,13 @@ mod diesel_impl { } } } + +// Tracking data by process_tracker +#[derive(Default, Debug, Deserialize, Serialize, Clone)] +pub struct ApiKeyExpiryWorkflow { + pub key_id: String, + pub merchant_id: String, + pub api_key_expiry: Option, + // Days on which email reminder about api_key expiry has to be sent, prior to it's expiry. + pub expiry_reminder_days: Vec, +}