pub mod flows; pub mod helpers; pub mod operations; pub mod transformers; use std::{fmt::Debug, marker::PhantomData, time::Instant}; use common_utils::ext_traits::AsyncExt; use error_stack::{IntoReport, ResultExt}; use futures::future::join_all; use router_env::{instrument, tracing}; use time; pub use self::operations::{ PaymentCancel, PaymentCapture, PaymentConfirm, PaymentCreate, PaymentMethodValidate, PaymentResponse, PaymentSession, PaymentStatus, PaymentUpdate, }; use self::{ flows::{ConstructFlowSpecificData, Feature}, operations::{BoxedOperation, Operation}, }; use super::errors::StorageErrorExt; use crate::{ core::errors::{self, RouterResponse, RouterResult}, db::StorageInterface, logger, pii, routes::AppState, scheduler::utils as pt_utils, services, types::{ self, api, storage::{self, enums as storage_enums}, transformers::ForeignInto, }, utils::OptionExt, }; #[instrument(skip_all)] pub async fn payments_operation_core( state: &AppState, merchant_account: storage::MerchantAccount, operation: Op, req: Req, call_connector_action: CallConnectorAction, ) -> RouterResult<(PaymentData, Req, Option)> where F: Send + Clone, Op: Operation + Send + Sync, // To create connector flow specific interface data PaymentData: ConstructFlowSpecificData, types::RouterData: Feature, // To construct connector flow specific api dyn types::api::Connector: services::api::ConnectorIntegration, // To perform router related operation for PaymentResponse PaymentResponse: Operation, FData: Send, { let operation: BoxedOperation<'_, F, Req> = Box::new(operation); let (operation, validate_result) = operation .to_validate_request()? .validate_request(&req, &merchant_account)?; tracing::Span::current().record("payment_id", &format!("{:?}", validate_result.payment_id)); let (operation, mut payment_data, customer_details) = operation .to_get_tracker()? .get_trackers( state, &validate_result.payment_id, &req, validate_result.mandate_type, &merchant_account, ) .await?; let (operation, customer) = operation .to_domain()? .get_or_create_customer_details( &*state.store, &mut payment_data, customer_details, validate_result.merchant_id, ) .await .change_context(errors::ApiErrorResponse::InternalServerError)?; let (operation, payment_method_data) = operation .to_domain()? .make_pm_data(state, &mut payment_data, validate_result.storage_scheme) .await?; payment_data.payment_method_data = payment_method_data; let connector_details = operation .to_domain()? .get_connector(&merchant_account, state, &req) .await?; if let api::ConnectorCallType::Single(ref connector) = connector_details { payment_data.payment_attempt.connector = Some(connector.connector_name.to_owned().to_string()); } let (operation, mut payment_data) = operation .to_update_tracker()? .update_trackers( &*state.store, &validate_result.payment_id, payment_data, customer.clone(), validate_result.storage_scheme, ) .await?; operation .to_domain()? .add_task_to_process_tracker(state, &payment_data.payment_attempt) .await?; if should_call_connector(&operation, &payment_data) { payment_data = match connector_details { api::ConnectorCallType::Single(connector) => { call_connector_service( state, &merchant_account, &validate_result.payment_id, connector, &operation, payment_data, &customer, call_connector_action, ) .await? } api::ConnectorCallType::Multiple(connectors) => { call_multiple_connectors_service( state, &merchant_account, connectors, &operation, payment_data, &customer, ) .await? } }; helpers::Vault::delete_locker_payment_method_by_lookup_key(state, &payment_data.token).await } Ok((payment_data, req, customer)) } #[allow(clippy::too_many_arguments)] pub async fn payments_core( state: &AppState, merchant_account: storage::MerchantAccount, operation: Op, req: Req, auth_flow: services::AuthFlow, call_connector_action: CallConnectorAction, ) -> RouterResponse where F: Send + Clone, FData: Send, Op: Operation + Send + Sync + Clone, Req: Debug, Res: transformers::ToResponse, Op> + TryFrom, // To create connector flow specific interface data PaymentData: ConstructFlowSpecificData, types::RouterData: Feature, // To construct connector flow specific api dyn types::api::Connector: services::api::ConnectorIntegration, // To perform router related operation for PaymentResponse PaymentResponse: Operation, { let (payment_data, req, customer) = payments_operation_core( state, merchant_account, operation.clone(), req, call_connector_action, ) .await?; Res::generate_response( Some(req), payment_data, customer, auth_flow, &state.conf.server, operation, ) } fn is_start_pay(operation: &Op) -> bool { format!("{:?}", operation).eq("PaymentStart") } #[allow(clippy::too_many_arguments)] pub async fn handle_payments_redirect_response<'a, F>( state: &AppState, merchant_account: storage::MerchantAccount, req: api::PaymentsRetrieveRequest, ) -> RouterResponse where F: Send + Clone + 'a, { let connector = req.connector.clone().get_required_value("connector")?; let query_params = req.param.clone().get_required_value("param")?; let resource_id = api::PaymentIdTypeExt::get_payment_intent_id(&req.resource_id) .change_context(errors::ApiErrorResponse::MissingRequiredField { field_name: "payment_id".to_string(), })?; let connector_data = api::ConnectorData::get_connector_by_name( &state.conf.connectors, &connector, api::GetToken::Connector, )?; let flow_type = connector_data .connector .get_flow_type(&query_params) .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Failed to decide the response flow")?; let response = payments_response_for_redirection_flows( state, merchant_account.clone(), req.clone(), flow_type, ) .await; let payments_response = match response.change_context(errors::ApiErrorResponse::NotImplemented)? { services::ApplicationResponse::Json(response) => Ok(response), _ => Err(errors::ApiErrorResponse::InternalServerError) .into_report() .attach_printable("Failed to get the response in json"), }?; let result = helpers::get_handle_response_url( resource_id, &merchant_account, payments_response, connector, ) .attach_printable("No redirection response")?; Ok(services::ApplicationResponse::JsonForRedirection(result)) } pub async fn payments_response_for_redirection_flows<'a>( state: &AppState, merchant_account: storage::MerchantAccount, req: api::PaymentsRetrieveRequest, flow_type: CallConnectorAction, ) -> RouterResponse { payments_core::( state, merchant_account, PaymentStatus, req, services::api::AuthFlow::Merchant, flow_type, ) .await } #[allow(clippy::too_many_arguments)] #[instrument(skip_all)] pub async fn call_connector_service( state: &AppState, merchant_account: &storage::MerchantAccount, payment_id: &api::PaymentIdType, connector: api::ConnectorData, _operation: &Op, payment_data: PaymentData, customer: &Option, call_connector_action: CallConnectorAction, ) -> RouterResult> where Op: Debug, F: Send + Clone, // To create connector flow specific interface data PaymentData: ConstructFlowSpecificData, types::RouterData: Feature + Send, // To construct connector flow specific api dyn api::Connector: services::api::ConnectorIntegration, // To perform router related operation for PaymentResponse PaymentResponse: Operation, { let db = &*state.store; let stime_connector = Instant::now(); let router_data = payment_data .construct_router_data(state, connector.connector.id(), merchant_account) .await?; let res = router_data .decide_flows( state, &connector, customer, call_connector_action, merchant_account, ) .await; let response = res .async_and_then(|response| async { let operation = helpers::response_operation::(); let payment_data = operation .to_post_update_tracker()? .update_tracker( db, payment_id, payment_data, response, merchant_account.storage_scheme, ) .await?; Ok(payment_data) }) .await?; let etime_connector = Instant::now(); let duration_connector = etime_connector.saturating_duration_since(stime_connector); tracing::info!(duration = format!("Duration taken: {}", duration_connector.as_millis())); Ok(response) } pub async fn call_multiple_connectors_service( state: &AppState, merchant_account: &storage::MerchantAccount, connectors: Vec, _operation: &Op, mut payment_data: PaymentData, customer: &Option, ) -> RouterResult> where Op: Debug, F: Send + Clone, // To create connector flow specific interface data PaymentData: ConstructFlowSpecificData, types::RouterData: Feature, // To construct connector flow specific api dyn api::Connector: services::api::ConnectorIntegration, // To perform router related operation for PaymentResponse PaymentResponse: Operation, { let call_connectors_start_time = Instant::now(); let mut join_handlers = Vec::with_capacity(connectors.len()); for connector in connectors.iter() { let connector_id = connector.connector.id(); let router_data = payment_data .construct_router_data(state, connector_id, merchant_account) .await?; let res = router_data.decide_flows( state, connector, customer, CallConnectorAction::Trigger, merchant_account, ); join_handlers.push(res); } let result = join_all(join_handlers).await; for (connector_res, connector) in result.into_iter().zip(connectors) { let connector_name = connector.connector_name.to_string(); match connector_res { Ok(connector_response) => { if let Ok(types::PaymentsResponseData::SessionResponse { session_token }) = connector_response.response { payment_data.sessions_token.push(session_token); } } Err(connector_error) => { logger::error!( "sessions_connector_error {} {:?}", connector_name, connector_error ); } } } let call_connectors_end_time = Instant::now(); let call_connectors_duration = call_connectors_end_time.saturating_duration_since(call_connectors_start_time); tracing::info!(duration = format!("Duration taken: {}", call_connectors_duration.as_millis())); Ok(payment_data) } pub enum CallConnectorAction { Trigger, Avoid, StatusUpdate(storage_enums::AttemptStatus), HandleResponse(Vec), } #[derive(Clone, Default, Debug)] pub struct PaymentAddress { pub shipping: Option, pub billing: Option, } #[derive(Clone)] pub struct PaymentData where F: Clone, { pub flow: PhantomData, pub payment_intent: storage::PaymentIntent, pub payment_attempt: storage::PaymentAttempt, pub connector_response: storage::ConnectorResponse, pub amount: api::Amount, pub mandate_id: Option, pub currency: storage_enums::Currency, pub setup_mandate: Option, pub address: PaymentAddress, pub token: Option, pub confirm: Option, pub force_sync: Option, pub payment_method_data: Option, pub refunds: Vec, pub sessions_token: Vec, pub card_cvc: Option>, pub email: Option>, } #[derive(Debug)] pub struct CustomerDetails { pub customer_id: Option, pub name: Option>, pub email: Option>, pub phone: Option>, pub phone_country_code: Option, } pub fn if_not_create_change_operation<'a, Op, F>( status: storage_enums::IntentStatus, confirm: Option, current: &'a Op, ) -> BoxedOperation<'_, F, api::PaymentsRequest> where F: Send + Clone, Op: Operation + Send + Sync, &'a Op: Operation, { if confirm.unwrap_or(false) { Box::new(PaymentConfirm) } else { match status { storage_enums::IntentStatus::RequiresConfirmation | storage_enums::IntentStatus::RequiresCustomerAction | storage_enums::IntentStatus::RequiresPaymentMethod => Box::new(current), _ => Box::new(&PaymentStatus), } } } pub fn is_confirm<'a, F: Clone + Send, R, Op>( operation: &'a Op, confirm: Option, ) -> BoxedOperation<'_, F, R> where PaymentConfirm: Operation, &'a PaymentConfirm: Operation, Op: Operation + Send + Sync, &'a Op: Operation, { if confirm.unwrap_or(false) { Box::new(&PaymentConfirm) } else { Box::new(operation) } } pub fn should_call_connector( operation: &Op, payment_data: &PaymentData, ) -> bool { match format!("{:?}", operation).as_str() { "PaymentConfirm" => true, "PaymentStart" => { !matches!( payment_data.payment_intent.status, storage_enums::IntentStatus::Failed | storage_enums::IntentStatus::Succeeded ) && payment_data .connector_response .authentication_data .is_none() } "PaymentStatus" => { matches!( payment_data.payment_intent.status, storage_enums::IntentStatus::Failed | storage_enums::IntentStatus::Processing | storage_enums::IntentStatus::Succeeded | storage_enums::IntentStatus::RequiresCustomerAction ) && payment_data.force_sync.unwrap_or(false) } "PaymentCancel" => matches!( payment_data.payment_intent.status, storage_enums::IntentStatus::RequiresCapture ), "PaymentCapture" => { matches!( payment_data.payment_intent.status, storage_enums::IntentStatus::RequiresCapture ) } "PaymentSession" => true, _ => false, } } pub async fn list_payments( db: &dyn StorageInterface, merchant: storage::MerchantAccount, constraints: api::PaymentListConstraints, ) -> RouterResponse { helpers::validate_payment_list_request(&constraints)?; let merchant_id = &merchant.merchant_id; let payment_intent = helpers::filter_by_constraints(db, &constraints, merchant_id, merchant.storage_scheme) .await .map_err(|err| err.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound))?; let data: Vec = payment_intent .into_iter() .map(ForeignInto::foreign_into) .collect(); Ok(services::ApplicationResponse::Json( api::PaymentListResponse { size: data.len(), data, }, )) } pub async fn add_process_sync_task( db: &dyn StorageInterface, payment_attempt: &storage::PaymentAttempt, schedule_time: time::PrimitiveDateTime, ) -> Result<(), errors::ProcessTrackerError> { let tracking_data = api::PaymentsRetrieveRequest { force_sync: true, merchant_id: Some(payment_attempt.merchant_id.clone()), resource_id: api::PaymentIdType::PaymentAttemptId(payment_attempt.attempt_id.clone()), param: None, connector: None, }; let runner = "PAYMENTS_SYNC_WORKFLOW"; let task = "PAYMENTS_SYNC"; let process_tracker_id = pt_utils::get_process_tracker_id( runner, task, &payment_attempt.attempt_id, &payment_attempt.merchant_id, ); let process_tracker_entry = ::make_process_tracker_new( process_tracker_id, task, runner, tracking_data, schedule_time, )?; db.insert_process(process_tracker_entry).await?; Ok(()) }