From a9638d118e0b68653fef3bec2ce8aa3c47feedd3 Mon Sep 17 00:00:00 2001 From: Chethan Rao <70657455+Chethan-rao@users.noreply.github.com> Date: Tue, 30 Jan 2024 13:16:03 +0530 Subject: [PATCH] refactor: add support for extending file storage to other schemes and provide a runtime flag for the same (#3348) Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> --- Cargo.lock | 31 ++-- config/config.example.toml | 8 + config/development.toml | 3 + config/docker_compose.toml | 3 + crates/external_services/Cargo.toml | 2 + crates/external_services/src/file_storage.rs | 96 +++++++++++ .../src/file_storage/aws_s3.rs | 158 ++++++++++++++++++ .../src/file_storage/file_system.rs | 144 ++++++++++++++++ crates/external_services/src/lib.rs | 1 + crates/router/Cargo.toml | 8 +- crates/router/src/configs/settings.rs | 21 +-- crates/router/src/configs/validations.rs | 19 --- crates/router/src/core/files.rs | 8 - crates/router/src/core/files/fs_utils.rs | 57 ------- crates/router/src/core/files/helpers.rs | 67 ++------ crates/router/src/core/files/s3_utils.rs | 87 ---------- crates/router/src/routes/app.rs | 5 + docker-compose.yml | 1 + 18 files changed, 461 insertions(+), 258 deletions(-) create mode 100644 crates/external_services/src/file_storage.rs create mode 100644 crates/external_services/src/file_storage/aws_s3.rs create mode 100644 crates/external_services/src/file_storage/file_system.rs delete mode 100644 crates/router/src/core/files/fs_utils.rs delete mode 100644 crates/router/src/core/files/s3_utils.rs diff --git a/Cargo.lock b/Cargo.lock index f920b1ea9c..b86facad81 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2481,6 +2481,7 @@ dependencies = [ "async-trait", "aws-config", "aws-sdk-kms", + "aws-sdk-s3", "aws-sdk-sesv2", "aws-sdk-sts", "aws-smithy-client", @@ -2726,9 +2727,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", "futures-sink", @@ -2736,9 +2737,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-executor" @@ -2764,9 +2765,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" [[package]] name = "futures-lite" @@ -2785,9 +2786,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", @@ -2796,15 +2797,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-timer" @@ -2814,9 +2815,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" [[package]] name = "futures-util" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-channel", "futures-core", @@ -5155,8 +5156,6 @@ dependencies = [ "async-bb8-diesel", "async-trait", "awc", - "aws-config", - "aws-sdk-s3", "base64 0.21.5", "bb8", "bigdecimal", diff --git a/config/config.example.toml b/config/config.example.toml index 0ad50736e9..27d1f8b18c 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -535,3 +535,11 @@ refund_analytics_topic = "topic" # Kafka topic to be used for Refund events api_logs_topic = "topic" # Kafka topic to be used for incoming api events connector_logs_topic = "topic" # Kafka topic to be used for connector api events outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webhook events + +# File storage configuration +[file_storage] +file_storage_backend = "aws_s3" # File storage backend to be used + +[file_storage.aws_s3] +region = "us-east-1" # The AWS region used by the AWS S3 for file storage +bucket_name = "bucket1" # The AWS S3 bucket name for file storage diff --git a/config/development.toml b/config/development.toml index b23f68680e..5fbe9607cd 100644 --- a/config/development.toml +++ b/config/development.toml @@ -544,3 +544,6 @@ client_id = "" client_secret = "" partner_id = "" enabled = true + +[file_storage] +file_storage_backend = "file_system" diff --git a/config/docker_compose.toml b/config/docker_compose.toml index 8af1528e17..8dd01a3d1c 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -399,3 +399,6 @@ enabled = true [events] source = "logs" + +[file_storage] +file_storage_backend = "file_system" diff --git a/crates/external_services/Cargo.toml b/crates/external_services/Cargo.toml index 6552b57b0e..bf836af71a 100644 --- a/crates/external_services/Cargo.toml +++ b/crates/external_services/Cargo.toml @@ -10,6 +10,7 @@ license.workspace = true [features] kms = ["dep:aws-config", "dep:aws-sdk-kms"] email = ["dep:aws-config"] +aws_s3 = ["dep:aws-config", "dep:aws-sdk-s3"] hashicorp-vault = [ "dep:vaultrs" ] [dependencies] @@ -18,6 +19,7 @@ aws-config = { version = "0.55.3", optional = true } aws-sdk-kms = { version = "0.28.0", optional = true } aws-sdk-sesv2 = "0.28.0" aws-sdk-sts = "0.28.0" +aws-sdk-s3 = { version = "0.28.0", optional = true } aws-smithy-client = "0.55.3" base64 = "0.21.2" dyn-clone = "1.0.11" diff --git a/crates/external_services/src/file_storage.rs b/crates/external_services/src/file_storage.rs new file mode 100644 index 0000000000..fb419b6ec6 --- /dev/null +++ b/crates/external_services/src/file_storage.rs @@ -0,0 +1,96 @@ +//! +//! Module for managing file storage operations with support for multiple storage schemes. +//! + +use std::fmt::{Display, Formatter}; + +use common_utils::errors::CustomResult; + +/// Includes functionality for AWS S3 storage operations. +#[cfg(feature = "aws_s3")] +mod aws_s3; + +mod file_system; + +/// Enum representing different file storage configurations, allowing for multiple storage schemes. +#[derive(Debug, Clone, Default, serde::Deserialize)] +#[serde(tag = "file_storage_backend")] +#[serde(rename_all = "snake_case")] +pub enum FileStorageConfig { + /// AWS S3 storage configuration. + #[cfg(feature = "aws_s3")] + AwsS3 { + /// Configuration for AWS S3 file storage. + aws_s3: aws_s3::AwsFileStorageConfig, + }, + /// Local file system storage configuration. + #[default] + FileSystem, +} + +impl FileStorageConfig { + /// Validates the file storage configuration. + pub fn validate(&self) -> Result<(), InvalidFileStorageConfig> { + match self { + #[cfg(feature = "aws_s3")] + Self::AwsS3 { aws_s3 } => aws_s3.validate(), + Self::FileSystem => Ok(()), + } + } + + /// Retrieves the appropriate file storage client based on the file storage configuration. + pub async fn get_file_storage_client(&self) -> Box { + match self { + #[cfg(feature = "aws_s3")] + Self::AwsS3 { aws_s3 } => Box::new(aws_s3::AwsFileStorageClient::new(aws_s3).await), + Self::FileSystem => Box::new(file_system::FileSystem), + } + } +} + +/// Trait for file storage operations +#[async_trait::async_trait] +pub trait FileStorageInterface: dyn_clone::DynClone + Sync + Send { + /// Uploads a file to the selected storage scheme. + async fn upload_file( + &self, + file_key: &str, + file: Vec, + ) -> CustomResult<(), FileStorageError>; + + /// Deletes a file from the selected storage scheme. + async fn delete_file(&self, file_key: &str) -> CustomResult<(), FileStorageError>; + + /// Retrieves a file from the selected storage scheme. + async fn retrieve_file(&self, file_key: &str) -> CustomResult, FileStorageError>; +} + +dyn_clone::clone_trait_object!(FileStorageInterface); + +/// Error thrown when the file storage config is invalid +#[derive(Debug, Clone)] +pub struct InvalidFileStorageConfig(&'static str); + +impl std::error::Error for InvalidFileStorageConfig {} + +impl Display for InvalidFileStorageConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "file_storage: {}", self.0) + } +} + +/// Represents errors that can occur during file storage operations. +#[derive(Debug, thiserror::Error, PartialEq)] +pub enum FileStorageError { + /// Indicates that the file upload operation failed. + #[error("Failed to upload file")] + UploadFailed, + + /// Indicates that the file retrieval operation failed. + #[error("Failed to retrieve file")] + RetrieveFailed, + + /// Indicates that the file deletion operation failed. + #[error("Failed to delete file")] + DeleteFailed, +} diff --git a/crates/external_services/src/file_storage/aws_s3.rs b/crates/external_services/src/file_storage/aws_s3.rs new file mode 100644 index 0000000000..86d1c0f0ef --- /dev/null +++ b/crates/external_services/src/file_storage/aws_s3.rs @@ -0,0 +1,158 @@ +use aws_config::meta::region::RegionProviderChain; +use aws_sdk_s3::{ + operation::{ + delete_object::DeleteObjectError, get_object::GetObjectError, put_object::PutObjectError, + }, + Client, +}; +use aws_sdk_sts::config::Region; +use common_utils::{errors::CustomResult, ext_traits::ConfigExt}; +use error_stack::ResultExt; + +use super::InvalidFileStorageConfig; +use crate::file_storage::{FileStorageError, FileStorageInterface}; + +/// Configuration for AWS S3 file storage. +#[derive(Debug, serde::Deserialize, Clone, Default)] +#[serde(default)] +pub struct AwsFileStorageConfig { + /// The AWS region to send file uploads + region: String, + /// The AWS s3 bucket to send file uploads + bucket_name: String, +} + +impl AwsFileStorageConfig { + /// Validates the AWS S3 file storage configuration. + pub(super) fn validate(&self) -> Result<(), InvalidFileStorageConfig> { + use common_utils::fp_utils::when; + + when(self.region.is_default_or_empty(), || { + Err(InvalidFileStorageConfig("aws s3 region must not be empty")) + })?; + + when(self.bucket_name.is_default_or_empty(), || { + Err(InvalidFileStorageConfig( + "aws s3 bucket name must not be empty", + )) + }) + } +} + +/// AWS S3 file storage client. +#[derive(Debug, Clone)] +pub(super) struct AwsFileStorageClient { + /// AWS S3 client + inner_client: Client, + /// The name of the AWS S3 bucket. + bucket_name: String, +} + +impl AwsFileStorageClient { + /// Creates a new AWS S3 file storage client. + pub(super) async fn new(config: &AwsFileStorageConfig) -> Self { + let region_provider = RegionProviderChain::first_try(Region::new(config.region.clone())); + let sdk_config = aws_config::from_env().region(region_provider).load().await; + Self { + inner_client: Client::new(&sdk_config), + bucket_name: config.bucket_name.clone(), + } + } + + /// Uploads a file to AWS S3. + async fn upload_file( + &self, + file_key: &str, + file: Vec, + ) -> CustomResult<(), AwsS3StorageError> { + self.inner_client + .put_object() + .bucket(&self.bucket_name) + .key(file_key) + .body(file.into()) + .send() + .await + .map_err(AwsS3StorageError::UploadFailure)?; + Ok(()) + } + + /// Deletes a file from AWS S3. + async fn delete_file(&self, file_key: &str) -> CustomResult<(), AwsS3StorageError> { + self.inner_client + .delete_object() + .bucket(&self.bucket_name) + .key(file_key) + .send() + .await + .map_err(AwsS3StorageError::DeleteFailure)?; + Ok(()) + } + + /// Retrieves a file from AWS S3. + async fn retrieve_file(&self, file_key: &str) -> CustomResult, AwsS3StorageError> { + Ok(self + .inner_client + .get_object() + .bucket(&self.bucket_name) + .key(file_key) + .send() + .await + .map_err(AwsS3StorageError::RetrieveFailure)? + .body + .collect() + .await + .map_err(AwsS3StorageError::UnknownError)? + .to_vec()) + } +} + +#[async_trait::async_trait] +impl FileStorageInterface for AwsFileStorageClient { + /// Uploads a file to AWS S3. + async fn upload_file( + &self, + file_key: &str, + file: Vec, + ) -> CustomResult<(), FileStorageError> { + self.upload_file(file_key, file) + .await + .change_context(FileStorageError::UploadFailed)?; + Ok(()) + } + + /// Deletes a file from AWS S3. + async fn delete_file(&self, file_key: &str) -> CustomResult<(), FileStorageError> { + self.delete_file(file_key) + .await + .change_context(FileStorageError::DeleteFailed)?; + Ok(()) + } + + /// Retrieves a file from AWS S3. + async fn retrieve_file(&self, file_key: &str) -> CustomResult, FileStorageError> { + Ok(self + .retrieve_file(file_key) + .await + .change_context(FileStorageError::RetrieveFailed)?) + } +} + +/// Enum representing errors that can occur during AWS S3 file storage operations. +#[derive(Debug, thiserror::Error)] +enum AwsS3StorageError { + /// Error indicating that file upload to S3 failed. + #[error("File upload to S3 failed: {0:?}")] + UploadFailure(aws_smithy_client::SdkError), + + /// Error indicating that file retrieval from S3 failed. + #[error("File retrieve from S3 failed: {0:?}")] + RetrieveFailure(aws_smithy_client::SdkError), + + /// Error indicating that file deletion from S3 failed. + #[error("File delete from S3 failed: {0:?}")] + DeleteFailure(aws_smithy_client::SdkError), + + /// Unknown error occurred. + #[error("Unknown error occurred: {0:?}")] + UnknownError(aws_sdk_s3::primitives::ByteStreamError), +} diff --git a/crates/external_services/src/file_storage/file_system.rs b/crates/external_services/src/file_storage/file_system.rs new file mode 100644 index 0000000000..15ca84deeb --- /dev/null +++ b/crates/external_services/src/file_storage/file_system.rs @@ -0,0 +1,144 @@ +//! +//! Module for local file system storage operations +//! + +use std::{ + fs::{remove_file, File}, + io::{Read, Write}, + path::PathBuf, +}; + +use common_utils::errors::CustomResult; +use error_stack::{IntoReport, ResultExt}; + +use crate::file_storage::{FileStorageError, FileStorageInterface}; + +/// Constructs the file path for a given file key within the file system. +/// The file path is generated based on the workspace path and the provided file key. +fn get_file_path(file_key: impl AsRef) -> PathBuf { + let mut file_path = PathBuf::new(); + #[cfg(feature = "logs")] + file_path.push(router_env::env::workspace_path()); + #[cfg(not(feature = "logs"))] + file_path.push(std::env::current_dir().unwrap_or(".".into())); + + file_path.push("files"); + file_path.push(file_key.as_ref()); + file_path +} + +/// Represents a file system for storing and managing files locally. +#[derive(Debug, Clone)] +pub(super) struct FileSystem; + +impl FileSystem { + /// Saves the provided file data to the file system under the specified file key. + async fn upload_file( + &self, + file_key: &str, + file: Vec, + ) -> CustomResult<(), FileSystemStorageError> { + let file_path = get_file_path(file_key); + + // Ignore the file name and create directories in the `file_path` if not exists + std::fs::create_dir_all( + file_path + .parent() + .ok_or(FileSystemStorageError::CreateDirFailed) + .into_report() + .attach_printable("Failed to obtain parent directory")?, + ) + .into_report() + .change_context(FileSystemStorageError::CreateDirFailed)?; + + let mut file_handler = File::create(file_path) + .into_report() + .change_context(FileSystemStorageError::CreateFailure)?; + file_handler + .write_all(&file) + .into_report() + .change_context(FileSystemStorageError::WriteFailure)?; + Ok(()) + } + + /// Deletes the file associated with the specified file key from the file system. + async fn delete_file(&self, file_key: &str) -> CustomResult<(), FileSystemStorageError> { + let file_path = get_file_path(file_key); + remove_file(file_path) + .into_report() + .change_context(FileSystemStorageError::DeleteFailure)?; + Ok(()) + } + + /// Retrieves the file content associated with the specified file key from the file system. + async fn retrieve_file(&self, file_key: &str) -> CustomResult, FileSystemStorageError> { + let mut received_data: Vec = Vec::new(); + let file_path = get_file_path(file_key); + let mut file = File::open(file_path) + .into_report() + .change_context(FileSystemStorageError::FileOpenFailure)?; + file.read_to_end(&mut received_data) + .into_report() + .change_context(FileSystemStorageError::ReadFailure)?; + Ok(received_data) + } +} + +#[async_trait::async_trait] +impl FileStorageInterface for FileSystem { + /// Saves the provided file data to the file system under the specified file key. + async fn upload_file( + &self, + file_key: &str, + file: Vec, + ) -> CustomResult<(), FileStorageError> { + self.upload_file(file_key, file) + .await + .change_context(FileStorageError::UploadFailed)?; + Ok(()) + } + + /// Deletes the file associated with the specified file key from the file system. + async fn delete_file(&self, file_key: &str) -> CustomResult<(), FileStorageError> { + self.delete_file(file_key) + .await + .change_context(FileStorageError::DeleteFailed)?; + Ok(()) + } + + /// Retrieves the file content associated with the specified file key from the file system. + async fn retrieve_file(&self, file_key: &str) -> CustomResult, FileStorageError> { + Ok(self + .retrieve_file(file_key) + .await + .change_context(FileStorageError::RetrieveFailed)?) + } +} + +/// Represents an error that can occur during local file system storage operations. +#[derive(Debug, thiserror::Error)] +enum FileSystemStorageError { + /// Error indicating opening a file failed + #[error("Failed while opening the file")] + FileOpenFailure, + + /// Error indicating file creation failed. + #[error("Failed to create file")] + CreateFailure, + + /// Error indicating reading a file failed. + #[error("Failed while reading the file")] + ReadFailure, + + /// Error indicating writing to a file failed. + #[error("Failed while writing into file")] + WriteFailure, + + /// Error indicating file deletion failed. + #[error("Failed while deleting the file")] + DeleteFailure, + + /// Error indicating directory creation failed + #[error("Failed while creating a directory")] + CreateDirFailed, +} diff --git a/crates/external_services/src/lib.rs b/crates/external_services/src/lib.rs index 9bf4916eec..bba65873e9 100644 --- a/crates/external_services/src/lib.rs +++ b/crates/external_services/src/lib.rs @@ -9,6 +9,7 @@ pub mod email; #[cfg(feature = "kms")] pub mod kms; +pub mod file_storage; #[cfg(feature = "hashicorp-vault")] pub mod hashicorp_vault; diff --git a/crates/router/Cargo.toml b/crates/router/Cargo.toml index e575daf7e7..3d129edfe3 100644 --- a/crates/router/Cargo.toml +++ b/crates/router/Cargo.toml @@ -10,10 +10,10 @@ license.workspace = true [features] default = ["kv_store", "stripe", "oltp", "olap", "backwards_compatibility", "accounts_cache", "dummy_connector", "payouts", "business_profile_routing", "connector_choice_mca_id", "profile_specific_fallback_routing", "retry", "frm"] -aws_s3 = ["dep:aws-sdk-s3", "dep:aws-config"] -kms = ["external_services/kms", "dep:aws-config"] +aws_s3 = ["external_services/aws_s3"] +kms = ["external_services/kms"] hashicorp-vault = ["external_services/hashicorp-vault"] -email = ["external_services/email", "dep:aws-config", "olap"] +email = ["external_services/email", "olap"] frm = [] stripe = ["dep:serde_qs"] release = ["kms", "stripe", "aws_s3", "email", "backwards_compatibility", "business_profile_routing", "accounts_cache", "kv_store", "connector_choice_mca_id", "profile_specific_fallback_routing", "vergen", "recon"] @@ -42,8 +42,6 @@ actix-web = "4.3.1" async-bb8-diesel = { git = "https://github.com/jarnura/async-bb8-diesel", rev = "53b4ab901aab7635c8215fd1c2d542c8db443094" } argon2 = { version = "0.5.0", features = ["std"] } async-trait = "0.1.68" -aws-config = { version = "0.55.3", optional = true } -aws-sdk-s3 = { version = "0.28.0", optional = true } base64 = "0.21.2" bb8 = "0.8" bigdecimal = "0.3.1" diff --git a/crates/router/src/configs/settings.rs b/crates/router/src/configs/settings.rs index 3c1d9f7d39..146a1ace28 100644 --- a/crates/router/src/configs/settings.rs +++ b/crates/router/src/configs/settings.rs @@ -11,6 +11,7 @@ use common_utils::ext_traits::ConfigExt; use config::{Environment, File}; #[cfg(feature = "email")] use external_services::email::EmailSettings; +use external_services::file_storage::FileStorageConfig; #[cfg(feature = "hashicorp-vault")] use external_services::hashicorp_vault; #[cfg(feature = "kms")] @@ -90,10 +91,9 @@ pub struct Settings { pub api_keys: ApiKeys, #[cfg(feature = "kms")] pub kms: kms::KmsConfig, + pub file_storage: FileStorageConfig, #[cfg(feature = "hashicorp-vault")] pub hc_vault: hashicorp_vault::HashiCorpVaultConfig, - #[cfg(feature = "aws_s3")] - pub file_upload_config: FileUploadConfig, pub tokenization: TokenizationConfig, pub connector_customer: ConnectorCustomer, #[cfg(feature = "dummy_connector")] @@ -721,16 +721,6 @@ pub struct ApiKeys { pub expiry_reminder_days: Vec, } -#[cfg(feature = "aws_s3")] -#[derive(Debug, Deserialize, Clone, Default)] -#[serde(default)] -pub struct FileUploadConfig { - /// The AWS region to send file uploads - pub region: String, - /// The AWS s3 bucket to send file uploads - pub bucket_name: String, -} - #[derive(Debug, Deserialize, Clone, Default)] pub struct DelayedSessionConfig { #[serde(deserialize_with = "deser_to_get_connectors")] @@ -853,8 +843,11 @@ impl Settings { self.kms .validate() .map_err(|error| ApplicationError::InvalidConfigurationValueError(error.into()))?; - #[cfg(feature = "aws_s3")] - self.file_upload_config.validate()?; + + self.file_storage + .validate() + .map_err(|err| ApplicationError::InvalidConfigurationValueError(err.to_string()))?; + self.lock_settings.validate()?; self.events.validate()?; Ok(()) diff --git a/crates/router/src/configs/validations.rs b/crates/router/src/configs/validations.rs index 0b286ece84..910ae75434 100644 --- a/crates/router/src/configs/validations.rs +++ b/crates/router/src/configs/validations.rs @@ -127,25 +127,6 @@ impl super::settings::DrainerSettings { } } -#[cfg(feature = "aws_s3")] -impl super::settings::FileUploadConfig { - pub fn validate(&self) -> Result<(), ApplicationError> { - use common_utils::fp_utils::when; - - when(self.region.is_default_or_empty(), || { - Err(ApplicationError::InvalidConfigurationValueError( - "s3 region must not be empty".into(), - )) - })?; - - when(self.bucket_name.is_default_or_empty(), || { - Err(ApplicationError::InvalidConfigurationValueError( - "s3 bucket name must not be empty".into(), - )) - }) - } -} - impl super::settings::ApiKeys { pub fn validate(&self) -> Result<(), ApplicationError> { use common_utils::fp_utils::when; diff --git a/crates/router/src/core/files.rs b/crates/router/src/core/files.rs index f3e5648980..d3f490a0a6 100644 --- a/crates/router/src/core/files.rs +++ b/crates/router/src/core/files.rs @@ -1,9 +1,4 @@ pub mod helpers; -#[cfg(feature = "aws_s3")] -pub mod s3_utils; - -#[cfg(not(feature = "aws_s3"))] -pub mod fs_utils; use api_models::files; use error_stack::{IntoReport, ResultExt}; @@ -29,10 +24,7 @@ pub async fn files_create_core( ) .await?; let file_id = common_utils::generate_id(consts::ID_LENGTH, "file"); - #[cfg(feature = "aws_s3")] let file_key = format!("{}/{}", merchant_account.merchant_id, file_id); - #[cfg(not(feature = "aws_s3"))] - let file_key = format!("{}_{}", merchant_account.merchant_id, file_id); let file_new = diesel_models::file::FileMetadataNew { file_id: file_id.clone(), merchant_id: merchant_account.merchant_id.clone(), diff --git a/crates/router/src/core/files/fs_utils.rs b/crates/router/src/core/files/fs_utils.rs deleted file mode 100644 index 795f2fad75..0000000000 --- a/crates/router/src/core/files/fs_utils.rs +++ /dev/null @@ -1,57 +0,0 @@ -use std::{ - fs::{remove_file, File}, - io::{Read, Write}, - path::PathBuf, -}; - -use common_utils::errors::CustomResult; -use error_stack::{IntoReport, ResultExt}; - -use crate::{core::errors, env}; - -pub fn get_file_path(file_key: String) -> PathBuf { - let mut file_path = PathBuf::new(); - file_path.push(env::workspace_path()); - file_path.push("files"); - file_path.push(file_key); - file_path -} - -pub fn save_file_to_fs( - file_key: String, - file_data: Vec, -) -> CustomResult<(), errors::ApiErrorResponse> { - let file_path = get_file_path(file_key); - let mut file = File::create(file_path) - .into_report() - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Failed to create file")?; - file.write_all(&file_data) - .into_report() - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Failed while writing into file")?; - Ok(()) -} - -pub fn delete_file_from_fs(file_key: String) -> CustomResult<(), errors::ApiErrorResponse> { - let file_path = get_file_path(file_key); - remove_file(file_path) - .into_report() - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Failed while deleting the file")?; - Ok(()) -} - -pub fn retrieve_file_from_fs(file_key: String) -> CustomResult, errors::ApiErrorResponse> { - let mut received_data: Vec = Vec::new(); - let file_path = get_file_path(file_key); - let mut file = File::open(file_path) - .into_report() - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Failed while opening the file")?; - file.read_to_end(&mut received_data) - .into_report() - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Failed while reading the file")?; - Ok(received_data) -} diff --git a/crates/router/src/core/files/helpers.rs b/crates/router/src/core/files/helpers.rs index 9205d42aee..0a509b238a 100644 --- a/crates/router/src/core/files/helpers.rs +++ b/crates/router/src/core/files/helpers.rs @@ -6,7 +6,7 @@ use futures::TryStreamExt; use crate::{ core::{ errors::{self, StorageErrorExt}, - files, payments, utils, + payments, utils, }, routes::AppState, services, @@ -30,37 +30,6 @@ pub async fn get_file_purpose(field: &mut Field) -> Option { } } -pub async fn upload_file( - #[cfg(feature = "aws_s3")] state: &AppState, - file_key: String, - file: Vec, -) -> CustomResult<(), errors::ApiErrorResponse> { - #[cfg(feature = "aws_s3")] - return files::s3_utils::upload_file_to_s3(state, file_key, file).await; - #[cfg(not(feature = "aws_s3"))] - return files::fs_utils::save_file_to_fs(file_key, file); -} - -pub async fn delete_file( - #[cfg(feature = "aws_s3")] state: &AppState, - file_key: String, -) -> CustomResult<(), errors::ApiErrorResponse> { - #[cfg(feature = "aws_s3")] - return files::s3_utils::delete_file_from_s3(state, file_key).await; - #[cfg(not(feature = "aws_s3"))] - return files::fs_utils::delete_file_from_fs(file_key); -} - -pub async fn retrieve_file( - #[cfg(feature = "aws_s3")] state: &AppState, - file_key: String, -) -> CustomResult, errors::ApiErrorResponse> { - #[cfg(feature = "aws_s3")] - return files::s3_utils::retrieve_file_from_s3(state, file_key).await; - #[cfg(not(feature = "aws_s3"))] - return files::fs_utils::retrieve_file_from_fs(file_key); -} - pub async fn validate_file_upload( state: &AppState, merchant_account: domain::MerchantAccount, @@ -132,14 +101,11 @@ pub async fn delete_file_using_file_id( .attach_printable("File not available")?, }; match provider { - diesel_models::enums::FileUploadProvider::Router => { - delete_file( - #[cfg(feature = "aws_s3")] - state, - provider_file_id, - ) + diesel_models::enums::FileUploadProvider::Router => state + .file_storage_client + .delete_file(&provider_file_id) .await - } + .change_context(errors::ApiErrorResponse::InternalServerError), _ => Err(errors::ApiErrorResponse::FileProviderNotSupported { message: "Not Supported because provider is not Router".to_string(), } @@ -234,12 +200,11 @@ pub async fn retrieve_file_and_provider_file_id_from_file_id( match provider { diesel_models::enums::FileUploadProvider::Router => Ok(( Some( - retrieve_file( - #[cfg(feature = "aws_s3")] - state, - provider_file_id.clone(), - ) - .await?, + state + .file_storage_client + .retrieve_file(&provider_file_id) + .await + .change_context(errors::ApiErrorResponse::InternalServerError)?, ), Some(provider_file_id), )), @@ -364,13 +329,11 @@ pub async fn upload_and_get_provider_provider_file_id_profile_id( payment_attempt.merchant_connector_id, )) } else { - upload_file( - #[cfg(feature = "aws_s3")] - state, - file_key.clone(), - create_file_request.file.clone(), - ) - .await?; + state + .file_storage_client + .upload_file(&file_key, create_file_request.file.clone()) + .await + .change_context(errors::ApiErrorResponse::InternalServerError)?; Ok(( file_key, api_models::enums::FileUploadProvider::Router, diff --git a/crates/router/src/core/files/s3_utils.rs b/crates/router/src/core/files/s3_utils.rs deleted file mode 100644 index 228c23528c..0000000000 --- a/crates/router/src/core/files/s3_utils.rs +++ /dev/null @@ -1,87 +0,0 @@ -use aws_config::{self, meta::region::RegionProviderChain}; -use aws_sdk_s3::{config::Region, Client}; -use common_utils::errors::CustomResult; -use error_stack::{IntoReport, ResultExt}; -use futures::TryStreamExt; - -use crate::{core::errors, routes}; - -async fn get_aws_client(state: &routes::AppState) -> Client { - let region_provider = - RegionProviderChain::first_try(Region::new(state.conf.file_upload_config.region.clone())); - let sdk_config = aws_config::from_env().region(region_provider).load().await; - Client::new(&sdk_config) -} - -pub async fn upload_file_to_s3( - state: &routes::AppState, - file_key: String, - file: Vec, -) -> CustomResult<(), errors::ApiErrorResponse> { - let client = get_aws_client(state).await; - let bucket_name = &state.conf.file_upload_config.bucket_name; - // Upload file to S3 - let upload_res = client - .put_object() - .bucket(bucket_name) - .key(file_key.clone()) - .body(file.into()) - .send() - .await; - upload_res - .into_report() - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("File upload to S3 failed")?; - Ok(()) -} - -pub async fn delete_file_from_s3( - state: &routes::AppState, - file_key: String, -) -> CustomResult<(), errors::ApiErrorResponse> { - let client = get_aws_client(state).await; - let bucket_name = &state.conf.file_upload_config.bucket_name; - // Delete file from S3 - let delete_res = client - .delete_object() - .bucket(bucket_name) - .key(file_key) - .send() - .await; - delete_res - .into_report() - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("File delete from S3 failed")?; - Ok(()) -} - -pub async fn retrieve_file_from_s3( - state: &routes::AppState, - file_key: String, -) -> CustomResult, errors::ApiErrorResponse> { - let client = get_aws_client(state).await; - let bucket_name = &state.conf.file_upload_config.bucket_name; - // Get file data from S3 - let get_res = client - .get_object() - .bucket(bucket_name) - .key(file_key) - .send() - .await; - let mut object = get_res - .into_report() - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("File retrieve from S3 failed")?; - let mut received_data: Vec = Vec::new(); - while let Some(bytes) = object - .body - .try_next() - .await - .into_report() - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Invalid file data received from S3")? - { - received_data.extend_from_slice(&bytes); // Collect the bytes in the Vec - } - Ok(received_data) -} diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index e18e4d85c7..ae0328c56f 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -5,6 +5,7 @@ use actix_web::{web, Scope}; use analytics::AnalyticsConfig; #[cfg(feature = "email")] use external_services::email::{ses::AwsSes, EmailService}; +use external_services::file_storage::FileStorageInterface; #[cfg(all(feature = "olap", feature = "hashicorp-vault"))] use external_services::hashicorp_vault::decrypt::VaultFetch; #[cfg(feature = "kms")] @@ -68,6 +69,7 @@ pub struct AppState { #[cfg(feature = "olap")] pub pool: crate::analytics::AnalyticsProvider, pub request_id: Option, + pub file_storage_client: Box, } impl scheduler::SchedulerAppState for AppState { @@ -266,6 +268,8 @@ impl AppState { #[cfg(feature = "email")] let email_client = Arc::new(create_email_client(&conf).await); + let file_storage_client = conf.file_storage.get_file_storage_client().await; + Self { flow_name: String::from("default"), store, @@ -279,6 +283,7 @@ impl AppState { #[cfg(feature = "olap")] pool, request_id: None, + file_storage_client, } }) .await diff --git a/docker-compose.yml b/docker-compose.yml index 3839269a52..e55008f1e3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -54,6 +54,7 @@ services: - router_net volumes: - ./config:/local/config + - ./files:/local/bin/files labels: logs: "promtail" healthcheck: