feat(core): call multiple connectors in parallel (#128)

This commit is contained in:
Narayan Bhat
2022-12-13 13:26:11 +05:30
committed by GitHub
parent 124048ce75
commit 9c2f296f25
9 changed files with 35 additions and 29 deletions

View File

@ -54,3 +54,7 @@ batch_size = 200
[drainer] [drainer]
stream_name = "DRAINER_STREAM" stream_name = "DRAINER_STREAM"
num_partitions = 4 num_partitions = 4
[connectors.supported]
wallets = ["klarna","braintree"]
cards = ["stripe","adyen","authorizedotnet","checkout","braintree"]

View File

@ -6,6 +6,7 @@ pub mod transformers;
use std::{fmt::Debug, marker::PhantomData, time::Instant}; use std::{fmt::Debug, marker::PhantomData, time::Instant};
use error_stack::{IntoReport, ResultExt}; use error_stack::{IntoReport, ResultExt};
use futures::future::join_all;
use router_env::{tracing, tracing::instrument}; use router_env::{tracing, tracing::instrument};
use time; use time;
@ -323,7 +324,7 @@ where
let res = router_data let res = router_data
.decide_flows( .decide_flows(
state, state,
connector, &connector,
customer, customer,
call_connector_action, call_connector_action,
merchant_account.storage_scheme, merchant_account.storage_scheme,
@ -377,30 +378,30 @@ where
PaymentResponse: Operation<F, Req>, PaymentResponse: Operation<F, Req>,
{ {
let call_connectors_start_time = Instant::now(); let call_connectors_start_time = Instant::now();
let mut router_data_list = Vec::with_capacity(connectors.len()); let mut join_handlers = Vec::with_capacity(connectors.len());
for connector in connectors { for connector in connectors.iter() {
let connector_id = connector.connector.id(); let connector_id = connector.connector.id();
let router_data = payment_data let router_data = payment_data
.construct_router_data(state, connector_id, merchant_account) .construct_router_data(state, connector_id, merchant_account)
.await?; .await?;
router_data_list.push((connector, router_data)); let res = router_data.decide_flows(
state,
connector,
customer,
CallConnectorAction::Trigger,
merchant_account.storage_scheme,
);
join_handlers.push(res);
} }
for (connector, router_data) in router_data_list { let result = join_all(join_handlers).await;
let connector_name = connector.connector_name.to_owned().to_string();
let res = router_data
.decide_flows(
state,
connector,
customer,
CallConnectorAction::Trigger,
merchant_account.storage_scheme,
)
.await?; //FIXME: remove this error propogation
match res.response { for (connector_res, connector) in result.into_iter().zip(connectors) {
let connector_name = connector.connector_name.to_string();
match connector_res?.response {
Ok(connector_response) => { Ok(connector_response) => {
if let types::PaymentsResponseData::SessionResponse { if let types::PaymentsResponseData::SessionResponse {
session_token, session_token,
@ -416,6 +417,7 @@ where
}); });
} }
} }
Err(connector_error) => { Err(connector_error) => {
logger::debug!( logger::debug!(
"sessions_connector_error {} {:?}", "sessions_connector_error {} {:?}",

View File

@ -32,7 +32,7 @@ pub trait Feature<F, T> {
async fn decide_flows<'a>( async fn decide_flows<'a>(
self, self,
state: &AppState, state: &AppState,
connector: api::ConnectorData, connector: &api::ConnectorData,
maybe_customer: &Option<storage::Customer>, maybe_customer: &Option<storage::Customer>,
call_connector_action: payments::CallConnectorAction, call_connector_action: payments::CallConnectorAction,
storage_scheme: enums::MerchantStorageScheme, storage_scheme: enums::MerchantStorageScheme,

View File

@ -52,7 +52,7 @@ impl Feature<api::Authorize, types::PaymentsAuthorizeData> for types::PaymentsAu
async fn decide_flows<'a>( async fn decide_flows<'a>(
self, self,
state: &AppState, state: &AppState,
connector: api::ConnectorData, connector: &api::ConnectorData,
customer: &Option<storage::Customer>, customer: &Option<storage::Customer>,
call_connector_action: payments::CallConnectorAction, call_connector_action: payments::CallConnectorAction,
storage_scheme: storage_enums::MerchantStorageScheme, storage_scheme: storage_enums::MerchantStorageScheme,
@ -78,7 +78,7 @@ impl PaymentsAuthorizeRouterData {
pub async fn decide_flow<'a, 'b>( pub async fn decide_flow<'a, 'b>(
&'b self, &'b self,
state: &'a AppState, state: &'a AppState,
connector: api::ConnectorData, connector: &api::ConnectorData,
maybe_customer: &Option<storage::Customer>, maybe_customer: &Option<storage::Customer>,
confirm: Option<bool>, confirm: Option<bool>,
call_connector_action: payments::CallConnectorAction, call_connector_action: payments::CallConnectorAction,

View File

@ -42,7 +42,7 @@ impl Feature<api::Void, types::PaymentsCancelData>
async fn decide_flows<'a>( async fn decide_flows<'a>(
self, self,
state: &AppState, state: &AppState,
connector: api::ConnectorData, connector: &api::ConnectorData,
customer: &Option<storage::Customer>, customer: &Option<storage::Customer>,
call_connector_action: payments::CallConnectorAction, call_connector_action: payments::CallConnectorAction,
_storage_scheme: enums::MerchantStorageScheme, _storage_scheme: enums::MerchantStorageScheme,
@ -63,7 +63,7 @@ impl PaymentsCancelRouterData {
pub async fn decide_flow<'a, 'b>( pub async fn decide_flow<'a, 'b>(
&'b self, &'b self,
state: &AppState, state: &AppState,
connector: api::ConnectorData, connector: &api::ConnectorData,
_maybe_customer: &Option<storage::Customer>, _maybe_customer: &Option<storage::Customer>,
_confirm: Option<bool>, _confirm: Option<bool>,
call_connector_action: payments::CallConnectorAction, call_connector_action: payments::CallConnectorAction,

View File

@ -43,7 +43,7 @@ impl Feature<api::Capture, types::PaymentsCaptureData>
async fn decide_flows<'a>( async fn decide_flows<'a>(
self, self,
state: &AppState, state: &AppState,
connector: api::ConnectorData, connector: &api::ConnectorData,
customer: &Option<storage::Customer>, customer: &Option<storage::Customer>,
call_connector_action: payments::CallConnectorAction, call_connector_action: payments::CallConnectorAction,
_storage_scheme: enums::MerchantStorageScheme, _storage_scheme: enums::MerchantStorageScheme,
@ -64,7 +64,7 @@ impl PaymentsCaptureRouterData {
pub async fn decide_flow<'a, 'b>( pub async fn decide_flow<'a, 'b>(
&'b self, &'b self,
state: &'a AppState, state: &'a AppState,
connector: api::ConnectorData, connector: &api::ConnectorData,
_maybe_customer: &Option<storage::Customer>, _maybe_customer: &Option<storage::Customer>,
_confirm: Option<bool>, _confirm: Option<bool>,
call_connector_action: payments::CallConnectorAction, call_connector_action: payments::CallConnectorAction,

View File

@ -44,7 +44,7 @@ impl Feature<api::PSync, types::PaymentsSyncData>
async fn decide_flows<'a>( async fn decide_flows<'a>(
self, self,
state: &AppState, state: &AppState,
connector: api::ConnectorData, connector: &api::ConnectorData,
customer: &Option<storage::Customer>, customer: &Option<storage::Customer>,
call_connector_action: payments::CallConnectorAction, call_connector_action: payments::CallConnectorAction,
_storage_scheme: enums::MerchantStorageScheme, _storage_scheme: enums::MerchantStorageScheme,
@ -64,7 +64,7 @@ impl PaymentsSyncRouterData {
pub async fn decide_flow<'a, 'b>( pub async fn decide_flow<'a, 'b>(
&'b self, &'b self,
state: &'a AppState, state: &'a AppState,
connector: api::ConnectorData, connector: &api::ConnectorData,
_maybe_customer: &Option<storage::Customer>, _maybe_customer: &Option<storage::Customer>,
_confirm: Option<bool>, _confirm: Option<bool>,
call_connector_action: payments::CallConnectorAction, call_connector_action: payments::CallConnectorAction,

View File

@ -39,7 +39,7 @@ impl Feature<api::Session, types::PaymentsSessionData> for types::PaymentsSessio
async fn decide_flows<'a>( async fn decide_flows<'a>(
self, self,
state: &routes::AppState, state: &routes::AppState,
connector: api::ConnectorData, connector: &api::ConnectorData,
customer: &Option<storage::Customer>, customer: &Option<storage::Customer>,
call_connector_action: payments::CallConnectorAction, call_connector_action: payments::CallConnectorAction,
_storage_schema: enums::MerchantStorageScheme, _storage_schema: enums::MerchantStorageScheme,
@ -59,7 +59,7 @@ impl types::PaymentsSessionRouterData {
pub async fn decide_flow<'a, 'b>( pub async fn decide_flow<'a, 'b>(
&'b self, &'b self,
state: &'a routes::AppState, state: &'a routes::AppState,
connector: api::ConnectorData, connector: &api::ConnectorData,
_customer: &Option<storage::Customer>, _customer: &Option<storage::Customer>,
_confirm: Option<bool>, _confirm: Option<bool>,
call_connector_action: payments::CallConnectorAction, call_connector_action: payments::CallConnectorAction,

View File

@ -40,7 +40,7 @@ impl Feature<api::Verify, types::VerifyRequestData> for types::VerifyRouterData
async fn decide_flows<'a>( async fn decide_flows<'a>(
self, self,
state: &AppState, state: &AppState,
connector: api::ConnectorData, connector: &api::ConnectorData,
customer: &Option<storage::Customer>, customer: &Option<storage::Customer>,
call_connector_action: payments::CallConnectorAction, call_connector_action: payments::CallConnectorAction,
storage_scheme: enums::MerchantStorageScheme, storage_scheme: enums::MerchantStorageScheme,
@ -61,7 +61,7 @@ impl types::VerifyRouterData {
pub async fn decide_flow<'a, 'b>( pub async fn decide_flow<'a, 'b>(
&'b self, &'b self,
state: &'a AppState, state: &'a AppState,
connector: api::ConnectorData, connector: &api::ConnectorData,
maybe_customer: &Option<storage::Customer>, maybe_customer: &Option<storage::Customer>,
confirm: Option<bool>, confirm: Option<bool>,
call_connector_action: payments::CallConnectorAction, call_connector_action: payments::CallConnectorAction,