refactor: add support for extending file storage to other schemes and provide a runtime flag for the same (#3348)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Chethan Rao
2024-01-30 13:16:03 +05:30
committed by GitHub
parent 937aea906e
commit a9638d118e
18 changed files with 461 additions and 258 deletions

31
Cargo.lock generated
View File

@ -2481,6 +2481,7 @@ dependencies = [
"async-trait",
"aws-config",
"aws-sdk-kms",
"aws-sdk-s3",
"aws-sdk-sesv2",
"aws-sdk-sts",
"aws-smithy-client",
@ -2726,9 +2727,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
"futures-sink",
@ -2736,9 +2737,9 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
@ -2764,9 +2765,9 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-lite"
@ -2785,9 +2786,9 @@ dependencies = [
[[package]]
name = "futures-macro"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
@ -2796,15 +2797,15 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e"
checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
[[package]]
name = "futures-task"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
[[package]]
name = "futures-timer"
@ -2814,9 +2815,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]]
name = "futures-util"
version = "0.3.28"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-channel",
"futures-core",
@ -5155,8 +5156,6 @@ dependencies = [
"async-bb8-diesel",
"async-trait",
"awc",
"aws-config",
"aws-sdk-s3",
"base64 0.21.5",
"bb8",
"bigdecimal",

View File

@ -535,3 +535,11 @@ refund_analytics_topic = "topic" # Kafka topic to be used for Refund events
api_logs_topic = "topic" # Kafka topic to be used for incoming api events
connector_logs_topic = "topic" # Kafka topic to be used for connector api events
outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webhook events
# File storage configuration
[file_storage]
file_storage_backend = "aws_s3" # File storage backend to be used
[file_storage.aws_s3]
region = "us-east-1" # The AWS region used by the AWS S3 for file storage
bucket_name = "bucket1" # The AWS S3 bucket name for file storage

View File

@ -544,3 +544,6 @@ client_id = ""
client_secret = ""
partner_id = ""
enabled = true
[file_storage]
file_storage_backend = "file_system"

View File

@ -399,3 +399,6 @@ enabled = true
[events]
source = "logs"
[file_storage]
file_storage_backend = "file_system"

View File

@ -10,6 +10,7 @@ license.workspace = true
[features]
kms = ["dep:aws-config", "dep:aws-sdk-kms"]
email = ["dep:aws-config"]
aws_s3 = ["dep:aws-config", "dep:aws-sdk-s3"]
hashicorp-vault = [ "dep:vaultrs" ]
[dependencies]
@ -18,6 +19,7 @@ aws-config = { version = "0.55.3", optional = true }
aws-sdk-kms = { version = "0.28.0", optional = true }
aws-sdk-sesv2 = "0.28.0"
aws-sdk-sts = "0.28.0"
aws-sdk-s3 = { version = "0.28.0", optional = true }
aws-smithy-client = "0.55.3"
base64 = "0.21.2"
dyn-clone = "1.0.11"

View File

@ -0,0 +1,96 @@
//!
//! Module for managing file storage operations with support for multiple storage schemes.
//!
use std::fmt::{Display, Formatter};
use common_utils::errors::CustomResult;
/// Includes functionality for AWS S3 storage operations.
#[cfg(feature = "aws_s3")]
mod aws_s3;
mod file_system;
/// Enum representing different file storage configurations, allowing for multiple storage schemes.
#[derive(Debug, Clone, Default, serde::Deserialize)]
#[serde(tag = "file_storage_backend")]
#[serde(rename_all = "snake_case")]
pub enum FileStorageConfig {
/// AWS S3 storage configuration.
#[cfg(feature = "aws_s3")]
AwsS3 {
/// Configuration for AWS S3 file storage.
aws_s3: aws_s3::AwsFileStorageConfig,
},
/// Local file system storage configuration.
#[default]
FileSystem,
}
impl FileStorageConfig {
/// Validates the file storage configuration.
pub fn validate(&self) -> Result<(), InvalidFileStorageConfig> {
match self {
#[cfg(feature = "aws_s3")]
Self::AwsS3 { aws_s3 } => aws_s3.validate(),
Self::FileSystem => Ok(()),
}
}
/// Retrieves the appropriate file storage client based on the file storage configuration.
pub async fn get_file_storage_client(&self) -> Box<dyn FileStorageInterface> {
match self {
#[cfg(feature = "aws_s3")]
Self::AwsS3 { aws_s3 } => Box::new(aws_s3::AwsFileStorageClient::new(aws_s3).await),
Self::FileSystem => Box::new(file_system::FileSystem),
}
}
}
/// Trait for file storage operations
#[async_trait::async_trait]
pub trait FileStorageInterface: dyn_clone::DynClone + Sync + Send {
/// Uploads a file to the selected storage scheme.
async fn upload_file(
&self,
file_key: &str,
file: Vec<u8>,
) -> CustomResult<(), FileStorageError>;
/// Deletes a file from the selected storage scheme.
async fn delete_file(&self, file_key: &str) -> CustomResult<(), FileStorageError>;
/// Retrieves a file from the selected storage scheme.
async fn retrieve_file(&self, file_key: &str) -> CustomResult<Vec<u8>, FileStorageError>;
}
dyn_clone::clone_trait_object!(FileStorageInterface);
/// Error thrown when the file storage config is invalid
#[derive(Debug, Clone)]
pub struct InvalidFileStorageConfig(&'static str);
impl std::error::Error for InvalidFileStorageConfig {}
impl Display for InvalidFileStorageConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "file_storage: {}", self.0)
}
}
/// Represents errors that can occur during file storage operations.
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum FileStorageError {
/// Indicates that the file upload operation failed.
#[error("Failed to upload file")]
UploadFailed,
/// Indicates that the file retrieval operation failed.
#[error("Failed to retrieve file")]
RetrieveFailed,
/// Indicates that the file deletion operation failed.
#[error("Failed to delete file")]
DeleteFailed,
}

View File

@ -0,0 +1,158 @@
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::{
operation::{
delete_object::DeleteObjectError, get_object::GetObjectError, put_object::PutObjectError,
},
Client,
};
use aws_sdk_sts::config::Region;
use common_utils::{errors::CustomResult, ext_traits::ConfigExt};
use error_stack::ResultExt;
use super::InvalidFileStorageConfig;
use crate::file_storage::{FileStorageError, FileStorageInterface};
/// Configuration for AWS S3 file storage.
#[derive(Debug, serde::Deserialize, Clone, Default)]
#[serde(default)]
pub struct AwsFileStorageConfig {
/// The AWS region to send file uploads
region: String,
/// The AWS s3 bucket to send file uploads
bucket_name: String,
}
impl AwsFileStorageConfig {
/// Validates the AWS S3 file storage configuration.
pub(super) fn validate(&self) -> Result<(), InvalidFileStorageConfig> {
use common_utils::fp_utils::when;
when(self.region.is_default_or_empty(), || {
Err(InvalidFileStorageConfig("aws s3 region must not be empty"))
})?;
when(self.bucket_name.is_default_or_empty(), || {
Err(InvalidFileStorageConfig(
"aws s3 bucket name must not be empty",
))
})
}
}
/// AWS S3 file storage client.
#[derive(Debug, Clone)]
pub(super) struct AwsFileStorageClient {
/// AWS S3 client
inner_client: Client,
/// The name of the AWS S3 bucket.
bucket_name: String,
}
impl AwsFileStorageClient {
/// Creates a new AWS S3 file storage client.
pub(super) async fn new(config: &AwsFileStorageConfig) -> Self {
let region_provider = RegionProviderChain::first_try(Region::new(config.region.clone()));
let sdk_config = aws_config::from_env().region(region_provider).load().await;
Self {
inner_client: Client::new(&sdk_config),
bucket_name: config.bucket_name.clone(),
}
}
/// Uploads a file to AWS S3.
async fn upload_file(
&self,
file_key: &str,
file: Vec<u8>,
) -> CustomResult<(), AwsS3StorageError> {
self.inner_client
.put_object()
.bucket(&self.bucket_name)
.key(file_key)
.body(file.into())
.send()
.await
.map_err(AwsS3StorageError::UploadFailure)?;
Ok(())
}
/// Deletes a file from AWS S3.
async fn delete_file(&self, file_key: &str) -> CustomResult<(), AwsS3StorageError> {
self.inner_client
.delete_object()
.bucket(&self.bucket_name)
.key(file_key)
.send()
.await
.map_err(AwsS3StorageError::DeleteFailure)?;
Ok(())
}
/// Retrieves a file from AWS S3.
async fn retrieve_file(&self, file_key: &str) -> CustomResult<Vec<u8>, AwsS3StorageError> {
Ok(self
.inner_client
.get_object()
.bucket(&self.bucket_name)
.key(file_key)
.send()
.await
.map_err(AwsS3StorageError::RetrieveFailure)?
.body
.collect()
.await
.map_err(AwsS3StorageError::UnknownError)?
.to_vec())
}
}
#[async_trait::async_trait]
impl FileStorageInterface for AwsFileStorageClient {
/// Uploads a file to AWS S3.
async fn upload_file(
&self,
file_key: &str,
file: Vec<u8>,
) -> CustomResult<(), FileStorageError> {
self.upload_file(file_key, file)
.await
.change_context(FileStorageError::UploadFailed)?;
Ok(())
}
/// Deletes a file from AWS S3.
async fn delete_file(&self, file_key: &str) -> CustomResult<(), FileStorageError> {
self.delete_file(file_key)
.await
.change_context(FileStorageError::DeleteFailed)?;
Ok(())
}
/// Retrieves a file from AWS S3.
async fn retrieve_file(&self, file_key: &str) -> CustomResult<Vec<u8>, FileStorageError> {
Ok(self
.retrieve_file(file_key)
.await
.change_context(FileStorageError::RetrieveFailed)?)
}
}
/// Enum representing errors that can occur during AWS S3 file storage operations.
#[derive(Debug, thiserror::Error)]
enum AwsS3StorageError {
/// Error indicating that file upload to S3 failed.
#[error("File upload to S3 failed: {0:?}")]
UploadFailure(aws_smithy_client::SdkError<PutObjectError>),
/// Error indicating that file retrieval from S3 failed.
#[error("File retrieve from S3 failed: {0:?}")]
RetrieveFailure(aws_smithy_client::SdkError<GetObjectError>),
/// Error indicating that file deletion from S3 failed.
#[error("File delete from S3 failed: {0:?}")]
DeleteFailure(aws_smithy_client::SdkError<DeleteObjectError>),
/// Unknown error occurred.
#[error("Unknown error occurred: {0:?}")]
UnknownError(aws_sdk_s3::primitives::ByteStreamError),
}

View File

@ -0,0 +1,144 @@
//!
//! Module for local file system storage operations
//!
use std::{
fs::{remove_file, File},
io::{Read, Write},
path::PathBuf,
};
use common_utils::errors::CustomResult;
use error_stack::{IntoReport, ResultExt};
use crate::file_storage::{FileStorageError, FileStorageInterface};
/// Constructs the file path for a given file key within the file system.
/// The file path is generated based on the workspace path and the provided file key.
fn get_file_path(file_key: impl AsRef<str>) -> PathBuf {
let mut file_path = PathBuf::new();
#[cfg(feature = "logs")]
file_path.push(router_env::env::workspace_path());
#[cfg(not(feature = "logs"))]
file_path.push(std::env::current_dir().unwrap_or(".".into()));
file_path.push("files");
file_path.push(file_key.as_ref());
file_path
}
/// Represents a file system for storing and managing files locally.
#[derive(Debug, Clone)]
pub(super) struct FileSystem;
impl FileSystem {
/// Saves the provided file data to the file system under the specified file key.
async fn upload_file(
&self,
file_key: &str,
file: Vec<u8>,
) -> CustomResult<(), FileSystemStorageError> {
let file_path = get_file_path(file_key);
// Ignore the file name and create directories in the `file_path` if not exists
std::fs::create_dir_all(
file_path
.parent()
.ok_or(FileSystemStorageError::CreateDirFailed)
.into_report()
.attach_printable("Failed to obtain parent directory")?,
)
.into_report()
.change_context(FileSystemStorageError::CreateDirFailed)?;
let mut file_handler = File::create(file_path)
.into_report()
.change_context(FileSystemStorageError::CreateFailure)?;
file_handler
.write_all(&file)
.into_report()
.change_context(FileSystemStorageError::WriteFailure)?;
Ok(())
}
/// Deletes the file associated with the specified file key from the file system.
async fn delete_file(&self, file_key: &str) -> CustomResult<(), FileSystemStorageError> {
let file_path = get_file_path(file_key);
remove_file(file_path)
.into_report()
.change_context(FileSystemStorageError::DeleteFailure)?;
Ok(())
}
/// Retrieves the file content associated with the specified file key from the file system.
async fn retrieve_file(&self, file_key: &str) -> CustomResult<Vec<u8>, FileSystemStorageError> {
let mut received_data: Vec<u8> = Vec::new();
let file_path = get_file_path(file_key);
let mut file = File::open(file_path)
.into_report()
.change_context(FileSystemStorageError::FileOpenFailure)?;
file.read_to_end(&mut received_data)
.into_report()
.change_context(FileSystemStorageError::ReadFailure)?;
Ok(received_data)
}
}
#[async_trait::async_trait]
impl FileStorageInterface for FileSystem {
/// Saves the provided file data to the file system under the specified file key.
async fn upload_file(
&self,
file_key: &str,
file: Vec<u8>,
) -> CustomResult<(), FileStorageError> {
self.upload_file(file_key, file)
.await
.change_context(FileStorageError::UploadFailed)?;
Ok(())
}
/// Deletes the file associated with the specified file key from the file system.
async fn delete_file(&self, file_key: &str) -> CustomResult<(), FileStorageError> {
self.delete_file(file_key)
.await
.change_context(FileStorageError::DeleteFailed)?;
Ok(())
}
/// Retrieves the file content associated with the specified file key from the file system.
async fn retrieve_file(&self, file_key: &str) -> CustomResult<Vec<u8>, FileStorageError> {
Ok(self
.retrieve_file(file_key)
.await
.change_context(FileStorageError::RetrieveFailed)?)
}
}
/// Represents an error that can occur during local file system storage operations.
#[derive(Debug, thiserror::Error)]
enum FileSystemStorageError {
/// Error indicating opening a file failed
#[error("Failed while opening the file")]
FileOpenFailure,
/// Error indicating file creation failed.
#[error("Failed to create file")]
CreateFailure,
/// Error indicating reading a file failed.
#[error("Failed while reading the file")]
ReadFailure,
/// Error indicating writing to a file failed.
#[error("Failed while writing into file")]
WriteFailure,
/// Error indicating file deletion failed.
#[error("Failed while deleting the file")]
DeleteFailure,
/// Error indicating directory creation failed
#[error("Failed while creating a directory")]
CreateDirFailed,
}

View File

@ -9,6 +9,7 @@ pub mod email;
#[cfg(feature = "kms")]
pub mod kms;
pub mod file_storage;
#[cfg(feature = "hashicorp-vault")]
pub mod hashicorp_vault;

View File

@ -10,10 +10,10 @@ license.workspace = true
[features]
default = ["kv_store", "stripe", "oltp", "olap", "backwards_compatibility", "accounts_cache", "dummy_connector", "payouts", "business_profile_routing", "connector_choice_mca_id", "profile_specific_fallback_routing", "retry", "frm"]
aws_s3 = ["dep:aws-sdk-s3", "dep:aws-config"]
kms = ["external_services/kms", "dep:aws-config"]
aws_s3 = ["external_services/aws_s3"]
kms = ["external_services/kms"]
hashicorp-vault = ["external_services/hashicorp-vault"]
email = ["external_services/email", "dep:aws-config", "olap"]
email = ["external_services/email", "olap"]
frm = []
stripe = ["dep:serde_qs"]
release = ["kms", "stripe", "aws_s3", "email", "backwards_compatibility", "business_profile_routing", "accounts_cache", "kv_store", "connector_choice_mca_id", "profile_specific_fallback_routing", "vergen", "recon"]
@ -42,8 +42,6 @@ actix-web = "4.3.1"
async-bb8-diesel = { git = "https://github.com/jarnura/async-bb8-diesel", rev = "53b4ab901aab7635c8215fd1c2d542c8db443094" }
argon2 = { version = "0.5.0", features = ["std"] }
async-trait = "0.1.68"
aws-config = { version = "0.55.3", optional = true }
aws-sdk-s3 = { version = "0.28.0", optional = true }
base64 = "0.21.2"
bb8 = "0.8"
bigdecimal = "0.3.1"

View File

@ -11,6 +11,7 @@ use common_utils::ext_traits::ConfigExt;
use config::{Environment, File};
#[cfg(feature = "email")]
use external_services::email::EmailSettings;
use external_services::file_storage::FileStorageConfig;
#[cfg(feature = "hashicorp-vault")]
use external_services::hashicorp_vault;
#[cfg(feature = "kms")]
@ -90,10 +91,9 @@ pub struct Settings {
pub api_keys: ApiKeys,
#[cfg(feature = "kms")]
pub kms: kms::KmsConfig,
pub file_storage: FileStorageConfig,
#[cfg(feature = "hashicorp-vault")]
pub hc_vault: hashicorp_vault::HashiCorpVaultConfig,
#[cfg(feature = "aws_s3")]
pub file_upload_config: FileUploadConfig,
pub tokenization: TokenizationConfig,
pub connector_customer: ConnectorCustomer,
#[cfg(feature = "dummy_connector")]
@ -721,16 +721,6 @@ pub struct ApiKeys {
pub expiry_reminder_days: Vec<u8>,
}
#[cfg(feature = "aws_s3")]
#[derive(Debug, Deserialize, Clone, Default)]
#[serde(default)]
pub struct FileUploadConfig {
/// The AWS region to send file uploads
pub region: String,
/// The AWS s3 bucket to send file uploads
pub bucket_name: String,
}
#[derive(Debug, Deserialize, Clone, Default)]
pub struct DelayedSessionConfig {
#[serde(deserialize_with = "deser_to_get_connectors")]
@ -853,8 +843,11 @@ impl Settings {
self.kms
.validate()
.map_err(|error| ApplicationError::InvalidConfigurationValueError(error.into()))?;
#[cfg(feature = "aws_s3")]
self.file_upload_config.validate()?;
self.file_storage
.validate()
.map_err(|err| ApplicationError::InvalidConfigurationValueError(err.to_string()))?;
self.lock_settings.validate()?;
self.events.validate()?;
Ok(())

View File

@ -127,25 +127,6 @@ impl super::settings::DrainerSettings {
}
}
#[cfg(feature = "aws_s3")]
impl super::settings::FileUploadConfig {
pub fn validate(&self) -> Result<(), ApplicationError> {
use common_utils::fp_utils::when;
when(self.region.is_default_or_empty(), || {
Err(ApplicationError::InvalidConfigurationValueError(
"s3 region must not be empty".into(),
))
})?;
when(self.bucket_name.is_default_or_empty(), || {
Err(ApplicationError::InvalidConfigurationValueError(
"s3 bucket name must not be empty".into(),
))
})
}
}
impl super::settings::ApiKeys {
pub fn validate(&self) -> Result<(), ApplicationError> {
use common_utils::fp_utils::when;

View File

@ -1,9 +1,4 @@
pub mod helpers;
#[cfg(feature = "aws_s3")]
pub mod s3_utils;
#[cfg(not(feature = "aws_s3"))]
pub mod fs_utils;
use api_models::files;
use error_stack::{IntoReport, ResultExt};
@ -29,10 +24,7 @@ pub async fn files_create_core(
)
.await?;
let file_id = common_utils::generate_id(consts::ID_LENGTH, "file");
#[cfg(feature = "aws_s3")]
let file_key = format!("{}/{}", merchant_account.merchant_id, file_id);
#[cfg(not(feature = "aws_s3"))]
let file_key = format!("{}_{}", merchant_account.merchant_id, file_id);
let file_new = diesel_models::file::FileMetadataNew {
file_id: file_id.clone(),
merchant_id: merchant_account.merchant_id.clone(),

View File

@ -1,57 +0,0 @@
use std::{
fs::{remove_file, File},
io::{Read, Write},
path::PathBuf,
};
use common_utils::errors::CustomResult;
use error_stack::{IntoReport, ResultExt};
use crate::{core::errors, env};
pub fn get_file_path(file_key: String) -> PathBuf {
let mut file_path = PathBuf::new();
file_path.push(env::workspace_path());
file_path.push("files");
file_path.push(file_key);
file_path
}
pub fn save_file_to_fs(
file_key: String,
file_data: Vec<u8>,
) -> CustomResult<(), errors::ApiErrorResponse> {
let file_path = get_file_path(file_key);
let mut file = File::create(file_path)
.into_report()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to create file")?;
file.write_all(&file_data)
.into_report()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed while writing into file")?;
Ok(())
}
pub fn delete_file_from_fs(file_key: String) -> CustomResult<(), errors::ApiErrorResponse> {
let file_path = get_file_path(file_key);
remove_file(file_path)
.into_report()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed while deleting the file")?;
Ok(())
}
pub fn retrieve_file_from_fs(file_key: String) -> CustomResult<Vec<u8>, errors::ApiErrorResponse> {
let mut received_data: Vec<u8> = Vec::new();
let file_path = get_file_path(file_key);
let mut file = File::open(file_path)
.into_report()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed while opening the file")?;
file.read_to_end(&mut received_data)
.into_report()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed while reading the file")?;
Ok(received_data)
}

View File

@ -6,7 +6,7 @@ use futures::TryStreamExt;
use crate::{
core::{
errors::{self, StorageErrorExt},
files, payments, utils,
payments, utils,
},
routes::AppState,
services,
@ -30,37 +30,6 @@ pub async fn get_file_purpose(field: &mut Field) -> Option<api::FilePurpose> {
}
}
pub async fn upload_file(
#[cfg(feature = "aws_s3")] state: &AppState,
file_key: String,
file: Vec<u8>,
) -> CustomResult<(), errors::ApiErrorResponse> {
#[cfg(feature = "aws_s3")]
return files::s3_utils::upload_file_to_s3(state, file_key, file).await;
#[cfg(not(feature = "aws_s3"))]
return files::fs_utils::save_file_to_fs(file_key, file);
}
pub async fn delete_file(
#[cfg(feature = "aws_s3")] state: &AppState,
file_key: String,
) -> CustomResult<(), errors::ApiErrorResponse> {
#[cfg(feature = "aws_s3")]
return files::s3_utils::delete_file_from_s3(state, file_key).await;
#[cfg(not(feature = "aws_s3"))]
return files::fs_utils::delete_file_from_fs(file_key);
}
pub async fn retrieve_file(
#[cfg(feature = "aws_s3")] state: &AppState,
file_key: String,
) -> CustomResult<Vec<u8>, errors::ApiErrorResponse> {
#[cfg(feature = "aws_s3")]
return files::s3_utils::retrieve_file_from_s3(state, file_key).await;
#[cfg(not(feature = "aws_s3"))]
return files::fs_utils::retrieve_file_from_fs(file_key);
}
pub async fn validate_file_upload(
state: &AppState,
merchant_account: domain::MerchantAccount,
@ -132,14 +101,11 @@ pub async fn delete_file_using_file_id(
.attach_printable("File not available")?,
};
match provider {
diesel_models::enums::FileUploadProvider::Router => {
delete_file(
#[cfg(feature = "aws_s3")]
state,
provider_file_id,
)
diesel_models::enums::FileUploadProvider::Router => state
.file_storage_client
.delete_file(&provider_file_id)
.await
}
.change_context(errors::ApiErrorResponse::InternalServerError),
_ => Err(errors::ApiErrorResponse::FileProviderNotSupported {
message: "Not Supported because provider is not Router".to_string(),
}
@ -234,12 +200,11 @@ pub async fn retrieve_file_and_provider_file_id_from_file_id(
match provider {
diesel_models::enums::FileUploadProvider::Router => Ok((
Some(
retrieve_file(
#[cfg(feature = "aws_s3")]
state,
provider_file_id.clone(),
)
.await?,
state
.file_storage_client
.retrieve_file(&provider_file_id)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)?,
),
Some(provider_file_id),
)),
@ -364,13 +329,11 @@ pub async fn upload_and_get_provider_provider_file_id_profile_id(
payment_attempt.merchant_connector_id,
))
} else {
upload_file(
#[cfg(feature = "aws_s3")]
state,
file_key.clone(),
create_file_request.file.clone(),
)
.await?;
state
.file_storage_client
.upload_file(&file_key, create_file_request.file.clone())
.await
.change_context(errors::ApiErrorResponse::InternalServerError)?;
Ok((
file_key,
api_models::enums::FileUploadProvider::Router,

View File

@ -1,87 +0,0 @@
use aws_config::{self, meta::region::RegionProviderChain};
use aws_sdk_s3::{config::Region, Client};
use common_utils::errors::CustomResult;
use error_stack::{IntoReport, ResultExt};
use futures::TryStreamExt;
use crate::{core::errors, routes};
async fn get_aws_client(state: &routes::AppState) -> Client {
let region_provider =
RegionProviderChain::first_try(Region::new(state.conf.file_upload_config.region.clone()));
let sdk_config = aws_config::from_env().region(region_provider).load().await;
Client::new(&sdk_config)
}
pub async fn upload_file_to_s3(
state: &routes::AppState,
file_key: String,
file: Vec<u8>,
) -> CustomResult<(), errors::ApiErrorResponse> {
let client = get_aws_client(state).await;
let bucket_name = &state.conf.file_upload_config.bucket_name;
// Upload file to S3
let upload_res = client
.put_object()
.bucket(bucket_name)
.key(file_key.clone())
.body(file.into())
.send()
.await;
upload_res
.into_report()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("File upload to S3 failed")?;
Ok(())
}
pub async fn delete_file_from_s3(
state: &routes::AppState,
file_key: String,
) -> CustomResult<(), errors::ApiErrorResponse> {
let client = get_aws_client(state).await;
let bucket_name = &state.conf.file_upload_config.bucket_name;
// Delete file from S3
let delete_res = client
.delete_object()
.bucket(bucket_name)
.key(file_key)
.send()
.await;
delete_res
.into_report()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("File delete from S3 failed")?;
Ok(())
}
pub async fn retrieve_file_from_s3(
state: &routes::AppState,
file_key: String,
) -> CustomResult<Vec<u8>, errors::ApiErrorResponse> {
let client = get_aws_client(state).await;
let bucket_name = &state.conf.file_upload_config.bucket_name;
// Get file data from S3
let get_res = client
.get_object()
.bucket(bucket_name)
.key(file_key)
.send()
.await;
let mut object = get_res
.into_report()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("File retrieve from S3 failed")?;
let mut received_data: Vec<u8> = Vec::new();
while let Some(bytes) = object
.body
.try_next()
.await
.into_report()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Invalid file data received from S3")?
{
received_data.extend_from_slice(&bytes); // Collect the bytes in the Vec
}
Ok(received_data)
}

View File

@ -5,6 +5,7 @@ use actix_web::{web, Scope};
use analytics::AnalyticsConfig;
#[cfg(feature = "email")]
use external_services::email::{ses::AwsSes, EmailService};
use external_services::file_storage::FileStorageInterface;
#[cfg(all(feature = "olap", feature = "hashicorp-vault"))]
use external_services::hashicorp_vault::decrypt::VaultFetch;
#[cfg(feature = "kms")]
@ -68,6 +69,7 @@ pub struct AppState {
#[cfg(feature = "olap")]
pub pool: crate::analytics::AnalyticsProvider,
pub request_id: Option<RequestId>,
pub file_storage_client: Box<dyn FileStorageInterface>,
}
impl scheduler::SchedulerAppState for AppState {
@ -266,6 +268,8 @@ impl AppState {
#[cfg(feature = "email")]
let email_client = Arc::new(create_email_client(&conf).await);
let file_storage_client = conf.file_storage.get_file_storage_client().await;
Self {
flow_name: String::from("default"),
store,
@ -279,6 +283,7 @@ impl AppState {
#[cfg(feature = "olap")]
pool,
request_id: None,
file_storage_client,
}
})
.await

View File

@ -54,6 +54,7 @@ services:
- router_net
volumes:
- ./config:/local/config
- ./files:/local/bin/files
labels:
logs: "promtail"
healthcheck: