diff --git a/crates/hyperswitch_domain_models/src/payouts/payouts.rs b/crates/hyperswitch_domain_models/src/payouts/payouts.rs index e514dbfb4c..42037c3c2d 100644 --- a/crates/hyperswitch_domain_models/src/payouts/payouts.rs +++ b/crates/hyperswitch_domain_models/src/payouts/payouts.rs @@ -54,7 +54,12 @@ pub trait PayoutsInterface { _filters: &PayoutFetchConstraints, _storage_scheme: MerchantStorageScheme, ) -> error_stack::Result< - Vec<(Payouts, PayoutAttempt, Option)>, + Vec<( + Payouts, + PayoutAttempt, + Option, + Option, + )>, errors::StorageError, >; diff --git a/crates/router/src/core/payouts.rs b/crates/router/src/core/payouts.rs index 82128f98a5..85d5b78521 100644 --- a/crates/router/src/core/payouts.rs +++ b/crates/router/src/core/payouts.rs @@ -6,6 +6,8 @@ pub mod transformers; pub mod validator; use std::{collections::HashSet, vec::IntoIter}; +#[cfg(feature = "olap")] +use api_models::payments as payment_enums; use api_models::{self, enums as api_enums, payouts::PayoutLinkResponse}; #[cfg(feature = "payout_retry")] use common_enums::PayoutRetryType; @@ -33,6 +35,8 @@ use time::Duration; #[cfg(feature = "olap")] use crate::types::domain::behaviour::Conversion; +#[cfg(feature = "olap")] +use crate::types::PayoutActionData; use crate::{ core::{ errors::{ @@ -770,7 +774,9 @@ pub async fn payouts_list_core( .to_not_found_response(errors::ApiErrorResponse::PayoutNotFound)?; let payouts = core_utils::filter_objects_based_on_profile_id_list(profile_id_list, payouts); - let collected_futures = payouts.into_iter().map(|payout| async { + let mut pi_pa_tuple_vec = PayoutActionData::new(); + + for payout in payouts { match db .find_payout_attempt_by_merchant_id_payout_attempt_id( merchant_id, @@ -778,73 +784,80 @@ pub async fn payouts_list_core( storage_enums::MerchantStorageScheme::PostgresOnly, ) .await + .change_context(errors::ApiErrorResponse::InternalServerError) { - Ok(ref payout_attempt) => match payout.customer_id.clone() { - Some(ref customer_id) => { + Ok(payout_attempt) => { + let domain_customer = match payout.customer_id.clone() { #[cfg(all(any(feature = "v1", feature = "v2"), not(feature = "customer_v2")))] - match db + Some(customer_id) => db .find_customer_by_customer_id_merchant_id( &(&state).into(), - customer_id, + &customer_id, merchant_id, &key_store, merchant_account.storage_scheme, ) .await - { - Ok(customer) => Ok((payout, payout_attempt.to_owned(), Some(customer))), - Err(err) => { + .map_err(|err| { let err_msg = format!( "failed while fetching customer for customer_id - {:?}", customer_id ); logger::warn!(?err, err_msg); - if err.current_context().is_db_not_found() { - Ok((payout, payout_attempt.to_owned(), None)) - } else { - Err(err - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable(err_msg)) - } - } - } - } - None => Ok((payout.to_owned(), payout_attempt.to_owned(), None)), - }, + }) + .ok(), + _ => None, + }; + + let payout_id_as_payment_id_type = + common_utils::id_type::PaymentId::wrap(payout.payout_id.clone()) + .change_context(errors::ApiErrorResponse::InvalidRequestData { + message: "payout_id contains invalid data".to_string(), + }) + .attach_printable("Error converting payout_id to PaymentId type")?; + + let payment_addr = payment_helpers::create_or_find_address_for_payment_by_request( + &state, + None, + payout.address_id.as_deref(), + merchant_id, + payout.customer_id.as_ref(), + &key_store, + &payout_id_as_payment_id_type, + merchant_account.storage_scheme, + ) + .await + .transpose() + .and_then(|addr| { + addr.map_err(|err| { + let err_msg = format!( + "billing_address missing for address_id : {:?}", + payout.address_id + ); + logger::warn!(?err, err_msg); + }) + .ok() + .map(payment_enums::Address::foreign_from) + }); + + pi_pa_tuple_vec.push(( + payout.to_owned(), + payout_attempt.to_owned(), + domain_customer, + payment_addr, + )); + } Err(err) => { let err_msg = format!( - "failed while fetching payout_attempt for payout_id - {}", - payout.payout_id.clone(), + "failed while fetching payout_attempt for payout_id - {:?}", + payout.payout_id ); logger::warn!(?err, err_msg); - Err(err - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable(err_msg)) } } - }); - - let pi_pa_tuple_vec: Result< - Vec<( - storage::Payouts, - storage::PayoutAttempt, - Option, - )>, - _, - > = join_all(collected_futures) - .await - .into_iter() - .collect::, - )>, - _, - >>(); + } let data: Vec = pi_pa_tuple_vec - .change_context(errors::ApiErrorResponse::InternalServerError)? .into_iter() .map(ForeignFrom::foreign_from) .collect(); @@ -874,6 +887,7 @@ pub async fn payouts_filtered_list_core( storage::Payouts, storage::PayoutAttempt, Option, + Option, )> = db .filter_payouts_and_attempts( merchant_account.get_id(), @@ -883,30 +897,50 @@ pub async fn payouts_filtered_list_core( .await .to_not_found_response(errors::ApiErrorResponse::PayoutNotFound)?; let list = core_utils::filter_objects_based_on_profile_id_list(profile_id_list, list); - let data: Vec = join_all(list.into_iter().map(|(p, pa, c)| async { - let domain_cust = c - .async_and_then(|cust| async { - domain::Customer::convert_back( - &(&state).into(), - cust, - &key_store.key, - key_store.merchant_id.clone().into(), - ) - .await - .map_err(|err| { - let msg = format!("failed to convert customer for id: {:?}", p.customer_id); - logger::warn!(?err, msg); + let data: Vec = + join_all(list.into_iter().map(|(p, pa, customer, address)| async { + let customer: Option = customer + .async_and_then(|cust| async { + domain::Customer::convert_back( + &(&state).into(), + cust, + &key_store.key, + key_store.merchant_id.clone().into(), + ) + .await + .map_err(|err| { + let msg = format!("failed to convert customer for id: {:?}", p.customer_id); + logger::warn!(?err, msg); + }) + .ok() }) - .ok() - }) - .await; - Some((p, pa, domain_cust)) - })) - .await - .into_iter() - .flatten() - .map(ForeignFrom::foreign_from) - .collect(); + .await; + + let payout_addr: Option = address + .async_and_then(|addr| async { + domain::Address::convert_back( + &(&state).into(), + addr, + &key_store.key, + key_store.merchant_id.clone().into(), + ) + .await + .map(ForeignFrom::foreign_from) + .map_err(|err| { + let msg = format!("failed to convert address for id: {:?}", p.address_id); + logger::warn!(?err, msg); + }) + .ok() + }) + .await; + + Some((p, pa, customer, payout_addr)) + })) + .await + .into_iter() + .flatten() + .map(ForeignFrom::foreign_from) + .collect(); let active_payout_ids = db .filter_active_payout_ids_by_constraints(merchant_account.get_id(), &constraints) diff --git a/crates/router/src/core/payouts/transformers.rs b/crates/router/src/core/payouts/transformers.rs index 35919abb1f..ed17eb71af 100644 --- a/crates/router/src/core/payouts/transformers.rs +++ b/crates/router/src/core/payouts/transformers.rs @@ -9,7 +9,7 @@ use common_utils::link_utils::EnabledPaymentMethod; ))] use crate::types::transformers::ForeignInto; #[cfg(feature = "olap")] -use crate::types::{domain, storage}; +use crate::types::{api::payments, domain, storage}; use crate::{ settings::PayoutRequiredFields, types::{api, transformers::ForeignFrom}, @@ -21,6 +21,7 @@ impl storage::Payouts, storage::PayoutAttempt, Option, + Option, )> for api::PayoutCreateResponse { fn foreign_from( @@ -28,6 +29,7 @@ impl storage::Payouts, storage::PayoutAttempt, Option, + Option, ), ) -> Self { todo!() @@ -44,6 +46,7 @@ impl storage::Payouts, storage::PayoutAttempt, Option, + Option, )> for api::PayoutCreateResponse { fn foreign_from( @@ -51,9 +54,10 @@ impl storage::Payouts, storage::PayoutAttempt, Option, + Option, ), ) -> Self { - let (payout, payout_attempt, customer) = item; + let (payout, payout_attempt, customer, address) = item; let attempt = api::PayoutAttemptResponse { attempt_id: payout_attempt.payout_attempt_id, status: payout_attempt.status, @@ -95,7 +99,7 @@ impl connector_transaction_id: attempt.connector_transaction_id.clone(), priority: payout.priority, attempts: Some(vec![attempt]), - billing: None, + billing: address, client_secret: None, payout_link: None, email: customer diff --git a/crates/router/src/core/utils.rs b/crates/router/src/core/utils.rs index 2faa69978c..acbb6fe04d 100644 --- a/crates/router/src/core/utils.rs +++ b/crates/router/src/core/utils.rs @@ -1521,7 +1521,7 @@ impl GetProfileId for storage::Payouts { } } #[cfg(feature = "payouts")] -impl GetProfileId for (storage::Payouts, T, F) { +impl GetProfileId for (storage::Payouts, T, F, R) { fn get_profile_id(&self) -> Option<&common_utils::id_type::ProfileId> { self.0.get_profile_id() } diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index 9c625f1f2b..2088177754 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -2073,6 +2073,7 @@ impl PayoutsInterface for KafkaStore { storage::Payouts, storage::PayoutAttempt, Option, + Option, )>, errors::DataStorageError, > { diff --git a/crates/router/src/types.rs b/crates/router/src/types.rs index a77f3a16ad..16a8ed1ccc 100644 --- a/crates/router/src/types.rs +++ b/crates/router/src/types.rs @@ -207,6 +207,14 @@ pub type PayoutsRouterData = RouterData; pub type PayoutsResponseRouterData = ResponseRouterData; +#[cfg(feature = "payouts")] +pub type PayoutActionData = Vec<( + storage::Payouts, + storage::PayoutAttempt, + Option, + Option, +)>; + #[cfg(feature = "payouts")] pub trait PayoutIndividualDetailsExt { type Error; diff --git a/crates/router/src/types/transformers.rs b/crates/router/src/types/transformers.rs index 5d8c8a87bd..741a7d89d5 100644 --- a/crates/router/src/types/transformers.rs +++ b/crates/router/src/types/transformers.rs @@ -789,6 +789,52 @@ impl<'a> From<&'a domain::Address> for api_types::Address { } } +impl ForeignFrom for api_types::Address { + fn foreign_from(address: domain::Address) -> Self { + // If all the fields of address are none, then pass the address as None + let address_details = if address.city.is_none() + && address.line1.is_none() + && address.line2.is_none() + && address.line3.is_none() + && address.state.is_none() + && address.country.is_none() + && address.zip.is_none() + && address.first_name.is_none() + && address.last_name.is_none() + { + None + } else { + Some(api_types::AddressDetails { + city: address.city.clone(), + country: address.country, + line1: address.line1.clone().map(Encryptable::into_inner), + line2: address.line2.clone().map(Encryptable::into_inner), + line3: address.line3.clone().map(Encryptable::into_inner), + state: address.state.clone().map(Encryptable::into_inner), + zip: address.zip.clone().map(Encryptable::into_inner), + first_name: address.first_name.clone().map(Encryptable::into_inner), + last_name: address.last_name.clone().map(Encryptable::into_inner), + }) + }; + + // If all the fields of phone are none, then pass the phone as None + let phone_details = if address.phone_number.is_none() && address.country_code.is_none() { + None + } else { + Some(api_types::PhoneDetails { + number: address.phone_number.clone().map(Encryptable::into_inner), + country_code: address.country_code.clone(), + }) + }; + + Self { + address: address_details, + phone: phone_details, + email: address.email.clone().map(pii::Email::from), + } + } +} + impl ForeignFrom<( diesel_models::api_keys::ApiKey, diff --git a/crates/storage_impl/src/mock_db/payouts.rs b/crates/storage_impl/src/mock_db/payouts.rs index c151e8acb8..5f2cc8824e 100644 --- a/crates/storage_impl/src/mock_db/payouts.rs +++ b/crates/storage_impl/src/mock_db/payouts.rs @@ -69,8 +69,15 @@ impl PayoutsInterface for MockDb { _merchant_id: &common_utils::id_type::MerchantId, _filters: &hyperswitch_domain_models::payouts::PayoutFetchConstraints, _storage_scheme: storage_enums::MerchantStorageScheme, - ) -> CustomResult)>, StorageError> - { + ) -> CustomResult< + Vec<( + Payouts, + PayoutAttempt, + Option, + Option, + )>, + StorageError, + > { // TODO: Implement function for `MockDb` Err(StorageError::MockDbError)? } diff --git a/crates/storage_impl/src/payouts/payouts.rs b/crates/storage_impl/src/payouts/payouts.rs index 3a5cc2854e..7307de7c3f 100644 --- a/crates/storage_impl/src/payouts/payouts.rs +++ b/crates/storage_impl/src/payouts/payouts.rs @@ -19,12 +19,18 @@ use diesel::{associations::HasTable, ExpressionMethods, NullableExpressionMethod not(feature = "customer_v2") ))] use diesel_models::payout_attempt::PayoutAttempt as DieselPayoutAttempt; +#[cfg(all( + feature = "olap", + any(feature = "v1", feature = "v2"), + not(feature = "customer_v2") +))] +use diesel_models::schema::{ + address::dsl as add_dsl, customers::dsl as cust_dsl, payout_attempt::dsl as poa_dsl, +}; #[cfg(feature = "olap")] use diesel_models::{ - customers::Customer as DieselCustomer, - enums as storage_enums, - query::generics::db_metrics, - schema::{customers::dsl as cust_dsl, payout_attempt::dsl as poa_dsl, payouts::dsl as po_dsl}, + address::Address as DieselAddress, customers::Customer as DieselCustomer, + enums as storage_enums, query::generics::db_metrics, schema::payouts::dsl as po_dsl, }; use diesel_models::{ enums::MerchantStorageScheme, @@ -57,8 +63,8 @@ use crate::connection; not(feature = "customer_v2") ))] use crate::store::schema::{ - customers::all_columns as cust_all_columns, payout_attempt::all_columns as poa_all_columns, - payouts::all_columns as po_all_columns, + address::all_columns as addr_all_columns, customers::all_columns as cust_all_columns, + payout_attempt::all_columns as poa_all_columns, payouts::all_columns as po_all_columns, }; use crate::{ diesel_error_to_data_error, @@ -331,8 +337,15 @@ impl PayoutsInterface for KVRouterStore { merchant_id: &common_utils::id_type::MerchantId, filters: &PayoutFetchConstraints, storage_scheme: MerchantStorageScheme, - ) -> error_stack::Result)>, StorageError> - { + ) -> error_stack::Result< + Vec<( + Payouts, + PayoutAttempt, + Option, + Option, + )>, + StorageError, + > { self.router_store .filter_payouts_and_attempts(merchant_id, filters, storage_scheme) .await @@ -571,8 +584,17 @@ impl PayoutsInterface for crate::RouterStore { merchant_id: &common_utils::id_type::MerchantId, filters: &PayoutFetchConstraints, storage_scheme: MerchantStorageScheme, - ) -> error_stack::Result)>, StorageError> - { + ) -> error_stack::Result< + Vec<( + Payouts, + PayoutAttempt, + Option, + Option, + )>, + StorageError, + > { + use common_utils::errors::ReportSwitchExt; + let conn = connection::pg_connection_read(self).await.switch()?; let conn = async_bb8_diesel::Connection::as_async_conn(&conn); let mut query = DieselPayouts::table() @@ -585,6 +607,10 @@ impl PayoutsInterface for crate::RouterStore { .on(cust_dsl::customer_id.nullable().eq(po_dsl::customer_id)), ) .filter(cust_dsl::merchant_id.eq(merchant_id.to_owned())) + .left_outer_join( + diesel_models::schema::address::table + .on(add_dsl::address_id.nullable().eq(po_dsl::address_id)), + ) .filter(po_dsl::merchant_id.eq(merchant_id.to_owned())) .order(po_dsl::created_at.desc()) .into_boxed(); @@ -675,17 +701,28 @@ impl PayoutsInterface for crate::RouterStore { logger::debug!(filter = %diesel::debug_query::(&query).to_string()); query - .select((po_all_columns, poa_all_columns, cust_all_columns.nullable())) - .get_results_async::<(DieselPayouts, DieselPayoutAttempt, Option)>(conn) + .select(( + po_all_columns, + poa_all_columns, + cust_all_columns.nullable(), + addr_all_columns.nullable(), + )) + .get_results_async::<( + DieselPayouts, + DieselPayoutAttempt, + Option, + Option, + )>(conn) .await .map(|results| { results .into_iter() - .map(|(pi, pa, c)| { + .map(|(pi, pa, c, add)| { ( Payouts::from_storage_model(pi), PayoutAttempt::from_storage_model(pa), c, + add, ) }) .collect() @@ -706,8 +743,15 @@ impl PayoutsInterface for crate::RouterStore { _merchant_id: &common_utils::id_type::MerchantId, _filters: &PayoutFetchConstraints, _storage_scheme: MerchantStorageScheme, - ) -> error_stack::Result)>, StorageError> - { + ) -> error_stack::Result< + Vec<( + Payouts, + PayoutAttempt, + Option, + Option, + )>, + StorageError, + > { todo!() }