feat: separate olap oltp (#348)

This commit is contained in:
Sangamesh Kulkarni
2023-01-12 19:49:06 +05:30
committed by GitHub
parent 6a211d6c29
commit 8c5eab8499
14 changed files with 213 additions and 74 deletions

View File

@ -9,7 +9,6 @@ use structopt::StructOpt;
async fn main() -> ApplicationResult<()> {
// get commandline config before initializing config
let cmd_line = CmdLineConf::from_args();
if let Some(Subcommand::GenerateOpenapiSpec) = cmd_line.subcommand {
let file_path = "openapi/generated.json";
#[allow(clippy::expect_used)]
@ -33,7 +32,14 @@ async fn main() -> ApplicationResult<()> {
logger::info!("Application started [{:?}] [{:?}]", conf.server, conf.log);
#[allow(clippy::expect_used)]
let (server, mut state) = router::start_server(conf)
#[cfg(not(feature = "olap"))]
let (server, mut state) = router::start_oltp_server(conf)
.await
.expect("Failed to create the server");
#[allow(clippy::expect_used)]
#[cfg(feature = "olap")]
let (server, mut state) = router::start_olap_server(conf)
.await
.expect("Failed to create the server");

View File

@ -331,6 +331,7 @@ pub async fn payment_intents_cancel(
#[instrument(skip_all)]
#[get("/list")]
#[cfg(feature = "olap")]
pub async fn payment_intent_list(
state: web::Data<routes::AppState>,
req: HttpRequest,

View File

@ -19,7 +19,6 @@ use self::{
flows::{ConstructFlowSpecificData, Feature},
operations::{BoxedOperation, Operation},
};
use super::errors::StorageErrorExt;
use crate::{
core::errors::{self, RouterResponse, RouterResult},
db::StorageInterface,
@ -30,7 +29,6 @@ use crate::{
types::{
self, api,
storage::{self, enums as storage_enums},
transformers::ForeignInto,
},
utils::OptionExt,
};
@ -541,6 +539,7 @@ pub fn should_call_connector<Op: Debug, F: Clone>(
}
}
#[cfg(feature = "olap")]
pub async fn list_payments(
db: &dyn StorageInterface,
merchant: storage::MerchantAccount,
@ -551,11 +550,16 @@ pub async fn list_payments(
let payment_intent =
helpers::filter_by_constraints(db, &constraints, merchant_id, merchant.storage_scheme)
.await
.map_err(|err| err.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound))?;
.map_err(|err| {
errors::StorageErrorExt::to_not_found_response(
err,
errors::ApiErrorResponse::PaymentNotFound,
)
})?;
let data: Vec<api::PaymentsResponse> = payment_intent
.into_iter()
.map(ForeignInto::foreign_into)
.map(types::transformers::ForeignInto::foreign_into)
.collect();
Ok(services::ApplicationResponse::Json(
api::PaymentListResponse {

View File

@ -1053,6 +1053,7 @@ where
Some(func(option1?, option2?))
}
#[cfg(feature = "olap")]
pub(super) async fn filter_by_constraints(
db: &dyn StorageInterface,
constraints: &api::PaymentListConstraints,
@ -1065,6 +1066,7 @@ pub(super) async fn filter_by_constraints(
Ok(result)
}
#[cfg(feature = "olap")]
pub(super) fn validate_payment_list_request(
req: &api::PaymentListConstraints,
) -> CustomResult<(), errors::ApiErrorResponse> {

View File

@ -476,6 +476,7 @@ pub async fn validate_and_create_refund(
/// If payment-id is not provided, lists the refunds associated with that particular merchant - to the limit specified,if no limits given, it is 10 by default
#[instrument(skip_all)]
#[cfg(feature = "olap")]
pub async fn refund_list(
db: &dyn db::StorageInterface,
merchant_account: storage::merchant_account::MerchantAccount,

View File

@ -1,10 +1,9 @@
use super::MockDb;
#[cfg(feature = "olap")]
use crate::types::api;
use crate::{
core::errors::{self, CustomResult},
types::{
api,
storage::{self as types, enums},
},
types::storage::{self as types, enums},
};
#[async_trait::async_trait]
@ -29,6 +28,7 @@ pub trait PaymentIntentInterface {
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<types::PaymentIntent, errors::StorageError>;
#[cfg(feature = "olap")]
async fn filter_payment_intent_by_constraints(
&self,
merchant_id: &str,
@ -44,15 +44,17 @@ mod storage {
use redis_interface::{HsetnxReply, RedisEntryId};
use super::PaymentIntentInterface;
#[cfg(feature = "olap")]
use crate::types::api;
use crate::{
connection::pg_connection,
core::errors::{self, CustomResult},
services::Store,
types::{
api,
storage::{enums, kv, payment_intent::*},
types::storage::{enums, kv, payment_intent::*},
utils::{
self,
storage_partitioning::{self, KvStorePartition},
},
utils::{self, storage_partitioning::KvStorePartition},
};
#[async_trait::async_trait]
@ -110,13 +112,14 @@ mod storage {
insertable: kv::Insertable::PaymentIntent(new),
},
};
let stream_name = self.get_drainer_stream_name(&PaymentIntent::shard_key(
crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId {
merchant_id: &created_intent.merchant_id,
payment_id: &created_intent.payment_id,
},
self.config.drainer_num_partitions,
));
let stream_name =
self.get_drainer_stream_name(&PaymentIntent::shard_key(
storage_partitioning::PartitionKey::MerchantIdPaymentId {
merchant_id: &created_intent.merchant_id,
payment_id: &created_intent.payment_id,
},
self.config.drainer_num_partitions,
));
self.redis_conn
.stream_append_entry(
&stream_name,
@ -179,12 +182,13 @@ mod storage {
};
let stream_name = self.get_drainer_stream_name(&PaymentIntent::shard_key(
crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId {
storage_partitioning::PartitionKey::MerchantIdPaymentId {
merchant_id: &updated_intent.merchant_id,
payment_id: &updated_intent.payment_id,
},
self.config.drainer_num_partitions,
));
self.redis_conn
.stream_append_entry(
&stream_name,
@ -236,6 +240,7 @@ mod storage {
}
}
#[cfg(feature = "olap")]
async fn filter_payment_intent_by_constraints(
&self,
merchant_id: &str,
@ -244,7 +249,7 @@ mod storage {
) -> CustomResult<Vec<PaymentIntent>, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.replica_pool).await;
PaymentIntent::filter_by_constraints(&conn, merchant_id, pc)
.await
.map_err(Into::into)
@ -262,14 +267,13 @@ mod storage {
use error_stack::IntoReport;
use super::PaymentIntentInterface;
#[cfg(feature = "olap")]
use crate::types::api;
use crate::{
connection::pg_connection,
core::errors::{self, CustomResult},
services::Store,
types::{
api,
storage::{enums, payment_intent::*},
},
types::storage::{enums, payment_intent::*},
};
#[async_trait::async_trait]
@ -309,13 +313,14 @@ mod storage {
.into_report()
}
#[cfg(feature = "olap")]
async fn filter_payment_intent_by_constraints(
&self,
merchant_id: &str,
pc: &api::PaymentListConstraints,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<PaymentIntent>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.replica_pool).await;
PaymentIntent::filter_by_constraints(&conn, merchant_id, pc)
.await
.map_err(Into::into)
@ -326,6 +331,7 @@ mod storage {
#[async_trait::async_trait]
impl PaymentIntentInterface for MockDb {
#[cfg(feature = "olap")]
async fn filter_payment_intent_by_constraints(
&self,
_merchant_id: &str,

View File

@ -56,6 +56,7 @@ pub trait RefundInterface {
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError>;
#[cfg(feature = "olap")]
async fn filter_refund_by_constraints(
&self,
merchant_id: &str,
@ -172,6 +173,7 @@ mod storage {
.into_report()
}
#[cfg(feature = "olap")]
async fn filter_refund_by_constraints(
&self,
merchant_id: &str,
@ -179,7 +181,7 @@ mod storage {
_storage_scheme: enums::MerchantStorageScheme,
limit: i64,
) -> CustomResult<Vec<storage_models::refund::Refund>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.replica_pool).await;
<storage_models::refund::Refund as storage_types::RefundDbExt>::filter_by_constraints(
&conn,
merchant_id,
@ -574,6 +576,7 @@ mod storage {
}
}
#[cfg(feature = "olap")]
async fn filter_refund_by_constraints(
&self,
merchant_id: &str,
@ -583,7 +586,7 @@ mod storage {
) -> CustomResult<Vec<storage_models::refund::Refund>, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.replica_pool).await;
<storage_models::refund::Refund as storage_types::RefundDbExt>::filter_by_constraints(&conn, merchant_id, refund_details, limit)
.await
.map_err(Into::into)
@ -700,6 +703,7 @@ impl RefundInterface for MockDb {
Err(errors::StorageError::MockDbError)?
}
#[cfg(feature = "olap")]
async fn filter_refund_by_constraints(
&self,
_merchant_id: &str,

View File

@ -31,7 +31,7 @@ use routes::AppState;
pub use self::env::logger;
use crate::{
configs::settings::Settings,
configs::settings,
core::errors::{self, ApplicationResult},
};
@ -57,7 +57,8 @@ pub mod pii {
pub use masking::*;
}
pub fn mk_app(
#[cfg(feature = "olap")]
pub fn mk_olap_app(
state: AppState,
request_body_limit: usize,
) -> actix_web::App<
@ -68,6 +69,57 @@ pub fn mk_app(
Error = actix_web::Error,
InitError = (),
>,
> {
let application_builder = get_application_builder(request_body_limit);
let mut server_app = application_builder
.service(routes::Payments::olap_server(state.clone()))
.service(routes::Customers::olap_server(state.clone()))
.service(routes::Refunds::olap_server(state.clone()))
.service(routes::Payouts::olap_server(state.clone()))
.service(routes::MerchantAccount::olap_server(state.clone()))
.service(routes::MerchantConnectorAccount::olap_server(state.clone()));
#[cfg(feature = "stripe")]
{
server_app = server_app.service(routes::StripeApis::server(state.clone()));
}
server_app = server_app.service(routes::Health::olap_server(state));
server_app
}
/// Starts the OLAP server with only OLAP services
///
/// # Panics
///
/// Unwrap used because without the value we can't start the server
#[allow(clippy::expect_used, clippy::unwrap_used)]
#[cfg(feature = "olap")]
pub async fn start_olap_server(conf: settings::Settings) -> ApplicationResult<(Server, AppState)> {
logger::debug!(startup_config=?conf);
let server = conf.server.clone();
let state = routes::AppState::new(conf).await;
// Cloning to close connections before shutdown
let app_state = state.clone();
let request_body_limit = server.request_body_limit;
let server = actix_web::HttpServer::new(move || mk_olap_app(state.clone(), request_body_limit))
.bind((server.host.as_str(), server.port))?
.workers(server.workers.unwrap_or_else(num_cpus::get_physical))
.run();
Ok((server, app_state))
}
pub fn get_application_builder(
request_body_limit: usize,
) -> actix_web::App<
impl ServiceFactory<
ServiceRequest,
Config = (),
Response = actix_web::dev::ServiceResponse<impl MessageBody>,
Error = actix_web::Error,
InitError = (),
>,
> {
let json_cfg = actix_web::web::JsonConfig::default()
.limit(request_body_limit)
@ -75,7 +127,7 @@ pub fn mk_app(
.content_type(|mime| mime == mime::APPLICATION_JSON) // FIXME: This doesn't seem to be enforced.
.error_handler(utils::error_parser::custom_json_error_handler);
let mut server_app = actix_web::App::new()
actix_web::App::new()
.app_data(json_cfg)
.wrap(middleware::RequestId)
.wrap(router_env::tracing_actix_web::TracingLogger::default())
@ -88,36 +140,53 @@ pub fn mk_app(
errors::error_handlers::custom_error_handlers,
))
.wrap(cors::cors())
.service(routes::Payments::server(state.clone()))
.service(routes::Customers::server(state.clone()))
.service(routes::Refunds::server(state.clone()))
.service(routes::Payouts::server(state.clone()))
.service(routes::PaymentMethods::server(state.clone()))
.service(routes::MerchantAccount::server(state.clone()))
.service(routes::MerchantConnectorAccount::server(state.clone()))
.service(routes::EphemeralKey::server(state.clone()))
.service(routes::Webhooks::server(state.clone()));
}
pub fn mk_oltp_app(
state: AppState,
request_body_limit: usize,
) -> actix_web::App<
impl ServiceFactory<
ServiceRequest,
Config = (),
Response = actix_web::dev::ServiceResponse<impl MessageBody>,
Error = actix_web::Error,
InitError = (),
>,
> {
let application_builder = get_application_builder(request_body_limit);
let mut server_app = application_builder
.service(routes::Payments::oltp_server(state.clone()))
.service(routes::Customers::oltp_server(state.clone()))
.service(routes::Refunds::oltp_server(state.clone()))
.service(routes::Payouts::oltp_server(state.clone()))
.service(routes::PaymentMethods::oltp_server(state.clone()))
.service(routes::EphemeralKey::oltp_server(state.clone()))
.service(routes::Webhooks::oltp_server(state.clone()));
#[cfg(feature = "stripe")]
{
server_app = server_app.service(routes::StripeApis::server(state.clone()));
}
server_app = server_app.service(routes::Health::server(state));
server_app = server_app.service(routes::Health::oltp_server(state));
server_app
}
#[allow(clippy::expect_used, clippy::unwrap_used)]
/// Starts the OLTP server with only OLTP services
///
/// # Panics
///
/// Unwrap used because without the value we can't start the server
pub async fn start_server(conf: Settings) -> ApplicationResult<(Server, AppState)> {
#[allow(clippy::expect_used, clippy::unwrap_used)]
pub async fn start_oltp_server(conf: settings::Settings) -> ApplicationResult<(Server, AppState)> {
logger::debug!(startup_config=?conf);
let server = conf.server.clone();
let state = routes::AppState::new(conf).await;
// Cloning to close connections before shutdown
let app_state = state.clone();
let request_body_limit = server.request_body_limit;
let server = actix_web::HttpServer::new(move || mk_app(state.clone(), request_body_limit))
let server = actix_web::HttpServer::new(move || mk_oltp_app(state.clone(), request_body_limit))
.bind((server.host.as_str(), server.port))?
.workers(server.workers.unwrap_or_else(num_cpus::get_physical))
.run();

View File

@ -1,8 +1,8 @@
use actix_web::{web, Scope};
use super::{
admin::*, customers::*, ephemeral_key::*, health::*, mandates::*, payment_methods::*,
payments::*, payouts::*, refunds::*, webhooks::*,
customers::*, ephemeral_key::*, health::*, mandates::*, payment_methods::*, payments::*,
payouts::*, refunds::*, webhooks::*,
};
use crate::{
configs::settings::Settings,
@ -43,7 +43,13 @@ impl AppState {
pub struct Health;
impl Health {
pub fn server(state: AppState) -> Scope {
pub fn oltp_server(state: AppState) -> Scope {
web::scope("")
.app_data(web::Data::new(state))
.service(web::resource("/health").route(web::get().to(health)))
}
#[cfg(feature = "olap")]
pub fn olap_server(state: AppState) -> Scope {
web::scope("")
.app_data(web::Data::new(state))
.service(web::resource("/health").route(web::get().to(health)))
@ -53,12 +59,17 @@ impl Health {
pub struct Payments;
impl Payments {
pub fn server(state: AppState) -> Scope {
#[cfg(feature = "olap")]
pub fn olap_server(state: AppState) -> Scope {
web::scope("/payments")
.app_data(web::Data::new(state))
.service(web::resource("/list").route(web::get().to(payments_list)))
}
pub fn oltp_server(state: AppState) -> Scope {
// Routes are matched in the order they are declared.
web::scope("/payments")
.app_data(web::Data::new(state))
.service(web::resource("").route(web::post().to(payments_create)))
.service(web::resource("/list").route(web::get().to(payments_list)))
.service(
web::resource("/session_tokens").route(web::post().to(payments_connector_session)),
)
@ -84,7 +95,16 @@ impl Payments {
pub struct Customers;
impl Customers {
pub fn server(state: AppState) -> Scope {
#[cfg(feature = "olap")]
pub fn olap_server(state: AppState) -> Scope {
web::scope("/customers")
.app_data(web::Data::new(state))
.service(
web::resource("/{customer_id}/mandates")
.route(web::get().to(get_customer_mandates)),
)
}
pub fn oltp_server(state: AppState) -> Scope {
web::scope("/customers")
.app_data(web::Data::new(state))
.service(web::resource("").route(web::post().to(customers_create)))
@ -94,10 +114,6 @@ impl Customers {
.route(web::post().to(customers_update))
.route(web::delete().to(customers_delete)),
)
.service(
web::resource("/{customer_id}/mandates")
.route(web::get().to(get_customer_mandates)),
)
.service(
web::resource("/{customer_id}/payment_methods")
.route(web::get().to(list_customer_payment_method_api)),
@ -108,12 +124,17 @@ impl Customers {
pub struct Refunds;
impl Refunds {
pub fn server(state: AppState) -> Scope {
#[cfg(feature = "olap")]
pub fn olap_server(state: AppState) -> Scope {
web::scope("/refunds")
.app_data(web::Data::new(state))
.service(web::resource("/list").route(web::get().to(refunds_list)))
}
pub fn oltp_server(state: AppState) -> Scope {
// Routes are matches in the order they are declared.
web::scope("/refunds")
.app_data(web::Data::new(state))
.service(web::resource("").route(web::post().to(refunds_create)))
.service(web::resource("/list").route(web::get().to(refunds_list)))
.service(
web::resource("/{id}")
.route(web::get().to(refunds_retrieve))
@ -125,7 +146,13 @@ impl Refunds {
pub struct Payouts;
impl Payouts {
pub fn server(state: AppState) -> Scope {
#[cfg(feature = "olap")]
pub fn olap_server(state: AppState) -> Scope {
web::scope("/payouts")
.app_data(web::Data::new(state))
.service(web::resource("/accounts").route(web::get().to(payouts_accounts)))
}
pub fn oltp_server(state: AppState) -> Scope {
web::scope("/payouts")
.app_data(web::Data::new(state))
.service(web::resource("/create").route(web::post().to(payouts_create)))
@ -133,14 +160,13 @@ impl Payouts {
.service(web::resource("/update").route(web::post().to(payouts_update)))
.service(web::resource("/reverse").route(web::post().to(payouts_reverse)))
.service(web::resource("/cancel").route(web::post().to(payouts_cancel)))
.service(web::resource("/accounts").route(web::get().to(payouts_accounts)))
}
}
pub struct PaymentMethods;
impl PaymentMethods {
pub fn server(state: AppState) -> Scope {
pub fn oltp_server(state: AppState) -> Scope {
web::scope("/payment_methods")
.app_data(web::Data::new(state))
.service(web::resource("").route(web::post().to(create_payment_method_api)))
@ -156,9 +182,11 @@ impl PaymentMethods {
pub struct MerchantAccount;
impl MerchantAccount {
pub fn server(config: AppState) -> Scope {
#[cfg(feature = "olap")]
pub fn olap_server(state: AppState) -> Scope {
use super::admin::*;
web::scope("/accounts")
.app_data(web::Data::new(config))
.app_data(web::Data::new(state))
.service(web::resource("").route(web::post().to(merchant_account_create)))
.service(
web::resource("/{id}")
@ -172,9 +200,11 @@ impl MerchantAccount {
pub struct MerchantConnectorAccount;
impl MerchantConnectorAccount {
pub fn server(config: AppState) -> Scope {
#[cfg(feature = "olap")]
pub fn olap_server(state: AppState) -> Scope {
use super::admin::*;
web::scope("/account")
.app_data(web::Data::new(config))
.app_data(web::Data::new(state))
.service(
web::resource("/{merchant_id}/connectors")
.route(web::post().to(payment_connector_create))
@ -195,7 +225,7 @@ impl MerchantConnectorAccount {
pub struct EphemeralKey;
impl EphemeralKey {
pub fn server(config: AppState) -> Scope {
pub fn oltp_server(config: AppState) -> Scope {
web::scope("/ephemeral_keys")
.app_data(web::Data::new(config))
.service(web::resource("").route(web::post().to(ephemeral_key_create)))
@ -206,10 +236,14 @@ impl EphemeralKey {
pub struct Mandates;
impl Mandates {
pub fn server(config: AppState) -> Scope {
pub fn olap_server(state: AppState) -> Scope {
web::scope("/mandates")
.app_data(web::Data::new(state))
.service(web::resource("/{id}").route(web::get().to(get_mandate)))
}
pub fn oltp_server(config: AppState) -> Scope {
web::scope("/mandates")
.app_data(web::Data::new(config))
.service(web::resource("/{id}").route(web::get().to(get_mandate)))
.service(web::resource("/revoke/{id}").route(web::post().to(revoke_mandate)))
}
}
@ -217,7 +251,7 @@ impl Mandates {
pub struct Webhooks;
impl Webhooks {
pub fn server(config: AppState) -> Scope {
pub fn oltp_server(config: AppState) -> Scope {
web::scope("/webhooks")
.app_data(web::Data::new(config))
.service(

View File

@ -322,6 +322,7 @@ pub async fn payments_cancel(
}
#[instrument(skip_all, fields(flow = ?Flow::PaymentsList))]
#[cfg(feature = "olap")]
// #[get("/list")]
pub async fn payments_list(
state: web::Data<app::AppState>,

View File

@ -80,6 +80,7 @@ pub async fn refunds_update(
}
#[instrument(skip_all, fields(flow = ?Flow::RefundsList))]
#[cfg(feature = "olap")]
// #[get("/list")]
pub async fn refunds_list(
state: web::Data<AppState>,