fix(payouts): persist status updates in payouts table (#4280)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Kashif
2024-04-04 19:13:24 +05:30
committed by GitHub
parent 622aac3015
commit 02ffe7e480
11 changed files with 232 additions and 61 deletions

View File

@ -437,8 +437,6 @@ pub struct PayoutAttemptResponse {
pub connector_transaction_id: Option<String>,
/// If the payout was cancelled the reason provided here
pub cancellation_reason: Option<String>,
/// Provide a reference to a stored payout method
pub payout_token: Option<String>,
/// error code unified across the connectors is received here if there was an error while calling connector
pub unified_code: Option<String>,
/// error message unified across the connectors is received here if there was an error while calling connector

View File

@ -51,7 +51,10 @@ pub trait PayoutsInterface {
_merchant_id: &str,
_filters: &PayoutFetchConstraints,
_storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<(Payouts, PayoutAttempt)>, errors::StorageError>;
) -> error_stack::Result<
Vec<(Payouts, PayoutAttempt, diesel_models::Customer)>,
errors::StorageError,
>;
#[cfg(feature = "olap")]
async fn filter_payouts_by_time_range_constraints(
@ -155,7 +158,7 @@ pub enum PayoutsUpdate {
status: Option<storage_enums::PayoutStatus>,
},
PayoutMethodIdUpdate {
payout_method_id: Option<String>,
payout_method_id: String,
},
RecurringUpdate {
recurring: bool,
@ -163,6 +166,9 @@ pub enum PayoutsUpdate {
AttemptCountUpdate {
attempt_count: i16,
},
StatusUpdate {
status: storage_enums::PayoutStatus,
},
}
#[derive(Clone, Debug, Default)]
@ -212,7 +218,7 @@ impl From<PayoutsUpdate> for PayoutsUpdateInternal {
..Default::default()
},
PayoutsUpdate::PayoutMethodIdUpdate { payout_method_id } => Self {
payout_method_id,
payout_method_id: Some(payout_method_id),
..Default::default()
},
PayoutsUpdate::RecurringUpdate { recurring } => Self {
@ -223,6 +229,10 @@ impl From<PayoutsUpdate> for PayoutsUpdateInternal {
attempt_count: Some(attempt_count),
..Default::default()
},
PayoutsUpdate::StatusUpdate { status } => Self {
status: Some(status),
..Default::default()
},
}
}
}

View File

@ -88,7 +88,7 @@ pub enum PayoutsUpdate {
status: Option<storage_enums::PayoutStatus>,
},
PayoutMethodIdUpdate {
payout_method_id: Option<String>,
payout_method_id: String,
},
RecurringUpdate {
recurring: bool,
@ -96,6 +96,9 @@ pub enum PayoutsUpdate {
AttemptCountUpdate {
attempt_count: i16,
},
StatusUpdate {
status: storage_enums::PayoutStatus,
},
}
#[derive(Clone, Debug, AsChangeset, router_derive::DebugAsDisplay)]
@ -168,7 +171,7 @@ impl From<PayoutsUpdate> for PayoutsUpdateInternal {
..Default::default()
},
PayoutsUpdate::PayoutMethodIdUpdate { payout_method_id } => Self {
payout_method_id,
payout_method_id: Some(payout_method_id),
..Default::default()
},
PayoutsUpdate::RecurringUpdate { recurring } => Self {
@ -179,6 +182,10 @@ impl From<PayoutsUpdate> for PayoutsUpdateInternal {
attempt_count: Some(attempt_count),
..Default::default()
},
PayoutsUpdate::StatusUpdate { status } => Self {
status: Some(status),
..Default::default()
},
}
}
}

View File

@ -883,23 +883,23 @@ impl ForeignFrom<(storage::PaymentIntent, storage::PaymentAttempt)> for api::Pay
}
#[cfg(feature = "payouts")]
impl ForeignFrom<(storage::Payouts, storage::PayoutAttempt)> for api::PayoutCreateResponse {
fn foreign_from(item: (storage::Payouts, storage::PayoutAttempt)) -> Self {
let payout = item.0;
let payout_attempt = item.1;
impl ForeignFrom<(storage::Payouts, storage::PayoutAttempt, domain::Customer)>
for api::PayoutCreateResponse
{
fn foreign_from(item: (storage::Payouts, storage::PayoutAttempt, domain::Customer)) -> Self {
let (payout, payout_attempt, customer) = item;
let attempt = PayoutAttemptResponse {
attempt_id: payout_attempt.payout_attempt_id,
status: payout_attempt.status,
amount: payout.amount,
currency: Some(payout.destination_currency),
connector: payout_attempt.connector.clone(),
error_code: payout_attempt.error_code,
error_message: payout_attempt.error_message,
error_code: payout_attempt.error_code.clone(),
error_message: payout_attempt.error_message.clone(),
payment_method: Some(payout.payout_type),
payout_method_type: None,
connector_transaction_id: Some(payout_attempt.connector_payout_id),
cancellation_reason: None,
payout_token: payout_attempt.payout_token,
unified_code: None,
unified_message: None,
};
@ -907,20 +907,31 @@ impl ForeignFrom<(storage::Payouts, storage::PayoutAttempt)> for api::PayoutCrea
Self {
payout_id: payout.payout_id,
merchant_id: payout.merchant_id,
status: payout.status,
amount: payout.amount,
created: Some(payout.created_at),
currency: payout.destination_currency,
description: payout.description,
metadata: payout.metadata,
customer_id: payout.customer_id,
connector: payout_attempt.connector,
payout_type: payout.payout_type,
business_label: payout_attempt.business_label,
customer_id: customer.customer_id,
auto_fulfill: payout.auto_fulfill,
email: customer.email,
name: customer.name,
phone: customer.phone,
phone_country_code: customer.phone_country_code,
return_url: payout.return_url,
business_country: payout_attempt.business_country,
business_label: payout_attempt.business_label,
description: payout.description,
entity_type: payout.entity_type,
recurring: payout.recurring,
metadata: payout.metadata,
status: payout_attempt.status,
error_message: payout_attempt.error_message,
error_code: payout_attempt.error_code,
profile_id: payout.profile_id,
created: Some(payout.created_at),
attempts: Some(attempts),
..Default::default()
billing: None,
client_secret: None,
}
}
}

View File

@ -20,7 +20,7 @@ use serde_json;
use super::errors::{ConnectorErrorExt, StorageErrorExt};
#[cfg(feature = "olap")]
use crate::types::transformers::ForeignFrom;
use crate::types::{domain::behaviour::Conversion, transformers::ForeignFrom};
use crate::{
core::{
errors::{self, RouterResponse, RouterResult},
@ -504,9 +504,10 @@ pub async fn payouts_cancel_core(
// Make local cancellation
} else if helpers::is_eligible_for_local_payout_cancellation(status) {
let status = storage_enums::PayoutStatus::Cancelled;
let updated_payout_attempt = storage::PayoutAttemptUpdate::StatusUpdate {
connector_payout_id: connector_payout_id.to_owned(),
status: storage_enums::PayoutStatus::Cancelled,
status,
error_message: Some("Cancelled by user".to_string()),
error_code: None,
is_eligible: None,
@ -521,6 +522,16 @@ pub async fn payouts_cancel_core(
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating payout_attempt in db")?;
payout_data.payouts = state
.store
.update_payout(
&payout_data.payouts,
storage::PayoutsUpdate::StatusUpdate { status },
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating payouts in db")?;
// Trigger connector's cancellation
} else {
@ -661,6 +672,7 @@ pub async fn payouts_fulfill_core(
pub async fn payouts_list_core(
state: AppState,
merchant_account: domain::MerchantAccount,
key_store: domain::MerchantKeyStore,
constraints: payouts::PayoutListConstraints,
) -> RouterResponse<payouts::PayoutListResponse> {
validator::validate_payout_list_request(&constraints)?;
@ -684,7 +696,34 @@ pub async fn payouts_list_core(
)
.await
{
Ok(payout_attempt) => Some(Ok((payouts, payout_attempt))),
Ok(payout_attempt) => {
match db
.find_customer_by_customer_id_merchant_id(
&payouts.customer_id,
merchant_id,
&key_store,
)
.await
{
Ok(customer) => Some(Ok((payouts, payout_attempt, customer))),
Err(error) => {
if matches!(
error.current_context(),
storage_impl::errors::StorageError::ValueNotFound(_)
) {
logger::warn!(
?error,
"customer missing for customer_id : {}",
payouts.customer_id,
);
return None;
}
Some(Err(error.change_context(StorageError::ValueNotFound(
format!("customer missing for customer_id : {}", payouts.customer_id),
))))
}
}
}
Err(error) => {
if matches!(error.current_context(), StorageError::ValueNotFound(_)) {
logger::warn!(
@ -699,12 +738,14 @@ pub async fn payouts_list_core(
}
});
let pi_pa_tuple_vec: Result<Vec<(storage::Payouts, storage::PayoutAttempt)>, _> =
join_all(collected_futures)
.await
.into_iter()
.flatten()
.collect::<Result<Vec<(storage::Payouts, storage::PayoutAttempt)>, _>>();
let pi_pa_tuple_vec: Result<
Vec<(storage::Payouts, storage::PayoutAttempt, domain::Customer)>,
_,
> = join_all(collected_futures)
.await
.into_iter()
.flatten()
.collect::<Result<Vec<(storage::Payouts, storage::PayoutAttempt, domain::Customer)>, _>>();
let data: Vec<api::PayoutCreateResponse> = pi_pa_tuple_vec
.change_context(errors::ApiErrorResponse::InternalServerError)?
@ -724,12 +765,17 @@ pub async fn payouts_list_core(
pub async fn payouts_filtered_list_core(
state: AppState,
merchant_account: domain::MerchantAccount,
key_store: domain::MerchantKeyStore,
filters: payouts::PayoutListFilterConstraints,
) -> RouterResponse<payouts::PayoutListResponse> {
let limit = &filters.limit;
validator::validate_payout_list_request_for_joins(*limit)?;
let db = state.store.as_ref();
let list: Vec<(storage::Payouts, storage::PayoutAttempt)> = db
let list: Vec<(
storage::Payouts,
storage::PayoutAttempt,
diesel_models::Customer,
)> = db
.filter_payouts_and_attempts(
&merchant_account.merchant_id,
&filters.clone().into(),
@ -738,8 +784,20 @@ pub async fn payouts_filtered_list_core(
.await
.to_not_found_response(errors::ApiErrorResponse::PayoutNotFound)?;
let data: Vec<api::PayoutCreateResponse> =
list.into_iter().map(ForeignFrom::foreign_from).collect();
let data: Vec<api::PayoutCreateResponse> = join_all(list.into_iter().map(|(p, pa, c)| async {
match domain::Customer::convert_back(c, &key_store.key).await {
Ok(domain_cust) => Some((p, pa, domain_cust)),
Err(err) => {
logger::warn!(?err, "failed to convert customer for id: {}", p.customer_id);
None
}
}
}))
.await
.into_iter()
.flatten()
.map(ForeignFrom::foreign_from)
.collect();
Ok(services::ApplicationResponse::Json(
api::PayoutListResponse {
@ -1075,6 +1133,15 @@ pub async fn check_payout_eligibility(
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating payout_attempt in db")?;
payout_data.payouts = db
.update_payout(
&payout_data.payouts,
storage::PayoutsUpdate::StatusUpdate { status },
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating payouts in db")?;
if helpers::is_payout_err_state(status) {
return Err(report!(errors::ApiErrorResponse::PayoutFailed {
data: Some(
@ -1084,9 +1151,10 @@ pub async fn check_payout_eligibility(
}
}
Err(err) => {
let status = storage_enums::PayoutStatus::Failed;
let updated_payout_attempt = storage::PayoutAttemptUpdate::StatusUpdate {
connector_payout_id: String::default(),
status: storage_enums::PayoutStatus::Failed,
status,
error_code: Some(err.code),
error_message: Some(err.message),
is_eligible: Some(false),
@ -1100,6 +1168,15 @@ pub async fn check_payout_eligibility(
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating payout_attempt in db")?;
payout_data.payouts = db
.update_payout(
&payout_data.payouts,
storage::PayoutsUpdate::StatusUpdate { status },
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating payouts in db")?;
}
};
@ -1174,6 +1251,15 @@ pub async fn create_payout(
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating payout_attempt in db")?;
payout_data.payouts = db
.update_payout(
&payout_data.payouts,
storage::PayoutsUpdate::StatusUpdate { status },
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating payouts in db")?;
if helpers::is_payout_err_state(status) {
return Err(report!(errors::ApiErrorResponse::PayoutFailed {
data: Some(
@ -1183,9 +1269,10 @@ pub async fn create_payout(
}
}
Err(err) => {
let status = storage_enums::PayoutStatus::Failed;
let updated_payout_attempt = storage::PayoutAttemptUpdate::StatusUpdate {
connector_payout_id: String::default(),
status: storage_enums::PayoutStatus::Failed,
status,
error_code: Some(err.code),
error_message: Some(err.message),
is_eligible: None,
@ -1199,6 +1286,15 @@ pub async fn create_payout(
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating payout_attempt in db")?;
payout_data.payouts = db
.update_payout(
&payout_data.payouts,
storage::PayoutsUpdate::StatusUpdate { status },
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating payouts in db")?;
}
};
@ -1265,12 +1361,22 @@ pub async fn cancel_payout(
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating payout_attempt in db")?
.attach_printable("Error updating payout_attempt in db")?;
payout_data.payouts = db
.update_payout(
&payout_data.payouts,
storage::PayoutsUpdate::StatusUpdate { status },
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating payouts in db")?;
}
Err(err) => {
let status = storage_enums::PayoutStatus::Failed;
let updated_payout_attempt = storage::PayoutAttemptUpdate::StatusUpdate {
connector_payout_id: String::default(),
status: storage_enums::PayoutStatus::Failed,
status,
error_code: Some(err.code),
error_message: Some(err.message),
is_eligible: None,
@ -1283,7 +1389,16 @@ pub async fn cancel_payout(
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating payout_attempt in db")?
.attach_printable("Error updating payout_attempt in db")?;
payout_data.payouts = db
.update_payout(
&payout_data.payouts,
storage::PayoutsUpdate::StatusUpdate { status },
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating payouts in db")?;
}
};
@ -1366,6 +1481,15 @@ pub async fn fulfill_payout(
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating payout_attempt in db")?;
payout_data.payouts = db
.update_payout(
&payout_data.payouts,
storage::PayoutsUpdate::StatusUpdate { status },
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating payouts in db")?;
if helpers::is_payout_err_state(status) {
return Err(report!(errors::ApiErrorResponse::PayoutFailed {
data: Some(
@ -1375,9 +1499,10 @@ pub async fn fulfill_payout(
}
}
Err(err) => {
let status = storage_enums::PayoutStatus::Failed;
let updated_payout_attempt = storage::PayoutAttemptUpdate::StatusUpdate {
connector_payout_id: String::default(),
status: storage_enums::PayoutStatus::Failed,
status,
error_code: Some(err.code),
error_message: Some(err.message),
is_eligible: None,
@ -1390,7 +1515,16 @@ pub async fn fulfill_payout(
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating payout_attempt in db")?
.attach_printable("Error updating payout_attempt in db")?;
payout_data.payouts = db
.update_payout(
&payout_data.payouts,
storage::PayoutsUpdate::StatusUpdate { status },
merchant_account.storage_scheme,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating payouts in db")?;
}
};

View File

@ -288,7 +288,7 @@ pub async fn save_payout_data_to_locker(
// Store card_reference in payouts table
let db = &*state.store;
let updated_payout = storage::PayoutsUpdate::PayoutMethodIdUpdate {
payout_method_id: Some(stored_resp.card_reference.to_owned()),
payout_method_id: stored_resp.card_reference.to_owned(),
};
db.update_payout(
&payout_data.payouts,

View File

@ -1608,8 +1608,14 @@ impl PayoutsInterface for KafkaStore {
merchant_id: &str,
filters: &data_models::payouts::PayoutFetchConstraints,
storage_scheme: MerchantStorageScheme,
) -> CustomResult<Vec<(storage::Payouts, storage::PayoutAttempt)>, errors::DataStorageError>
{
) -> CustomResult<
Vec<(
storage::Payouts,
storage::PayoutAttempt,
diesel_models::Customer,
)>,
errors::DataStorageError,
> {
self.diesel_store
.filter_payouts_and_attempts(merchant_id, filters, storage_scheme)
.await

View File

@ -228,7 +228,7 @@ pub async fn payouts_list(
state,
&req,
payload,
|state, auth, req| payouts_list_core(state, auth.merchant_account, req),
|state, auth, req| payouts_list_core(state, auth.merchant_account, auth.key_store, req),
auth::auth_type(
&auth::ApiKeyAuth,
&auth::JWTAuth(Permission::PayoutRead),
@ -266,7 +266,9 @@ pub async fn payouts_list_by_filter(
state,
&req,
payload,
|state, auth, req| payouts_filtered_list_core(state, auth.merchant_account, req),
|state, auth, req| {
payouts_filtered_list_core(state, auth.merchant_account, auth.key_store, req)
},
auth::auth_type(
&auth::ApiKeyAuth,
&auth::JWTAuth(Permission::PayoutRead),

View File

@ -67,7 +67,7 @@ impl PayoutsInterface for MockDb {
_merchant_id: &str,
_filters: &data_models::payouts::PayoutFetchConstraints,
_storage_scheme: storage_enums::MerchantStorageScheme,
) -> CustomResult<Vec<(Payouts, PayoutAttempt)>, StorageError> {
) -> CustomResult<Vec<(Payouts, PayoutAttempt, diesel_models::Customer)>, StorageError> {
// TODO: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}

View File

@ -9,6 +9,13 @@ use data_models::{
};
#[cfg(feature = "olap")]
use diesel::{associations::HasTable, ExpressionMethods, JoinOnDsl, QueryDsl};
#[cfg(feature = "olap")]
use diesel_models::{
customers::Customer as DieselCustomer,
payout_attempt::PayoutAttempt as DieselPayoutAttempt,
query::generics::db_metrics,
schema::{customers::dsl as cust_dsl, payout_attempt::dsl as poa_dsl, payouts::dsl as po_dsl},
};
use diesel_models::{
enums::MerchantStorageScheme,
kv,
@ -17,12 +24,6 @@ use diesel_models::{
PayoutsUpdate as DieselPayoutsUpdate,
},
};
#[cfg(feature = "olap")]
use diesel_models::{
payout_attempt::PayoutAttempt as DieselPayoutAttempt,
query::generics::db_metrics,
schema::{payout_attempt::dsl as poa_dsl, payouts::dsl as po_dsl},
};
use error_stack::ResultExt;
use redis_interface::HsetnxReply;
#[cfg(feature = "olap")]
@ -270,7 +271,8 @@ impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {
merchant_id: &str,
filters: &PayoutFetchConstraints,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<(Payouts, PayoutAttempt)>, StorageError> {
) -> error_stack::Result<Vec<(Payouts, PayoutAttempt, diesel_models::Customer)>, StorageError>
{
self.router_store
.filter_payouts_and_attempts(merchant_id, filters, storage_scheme)
.await
@ -485,7 +487,7 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
merchant_id: &str,
filters: &PayoutFetchConstraints,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<(Payouts, PayoutAttempt)>, StorageError> {
) -> error_stack::Result<Vec<(Payouts, PayoutAttempt, DieselCustomer)>, StorageError> {
use common_utils::errors::ReportSwitchExt;
let conn = connection::pg_connection_read(self).await.switch()?;
@ -495,6 +497,10 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
diesel_models::schema::payout_attempt::table
.on(poa_dsl::payout_id.eq(po_dsl::payout_id)),
)
.inner_join(
diesel_models::schema::customers::table
.on(cust_dsl::customer_id.eq(po_dsl::customer_id)),
)
.filter(po_dsl::merchant_id.eq(merchant_id.to_owned()))
.order(po_dsl::created_at.desc())
.into_boxed();
@ -585,15 +591,16 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
logger::debug!(filter = %diesel::debug_query::<diesel::pg::Pg,_>(&query).to_string());
query
.get_results_async::<(DieselPayouts, DieselPayoutAttempt)>(conn)
.get_results_async::<(DieselPayouts, DieselPayoutAttempt, DieselCustomer)>(conn)
.await
.map(|results| {
results
.into_iter()
.map(|(pi, pa)| {
.map(|(pi, pa, c)| {
(
Payouts::from_storage_model(pi),
PayoutAttempt::from_storage_model(pa),
c,
)
})
.collect()
@ -765,6 +772,7 @@ impl DataModelExt for PayoutsUpdate {
Self::AttemptCountUpdate { attempt_count } => {
DieselPayoutsUpdate::AttemptCountUpdate { attempt_count }
}
Self::StatusUpdate { status } => DieselPayoutsUpdate::StatusUpdate { status },
}
}

View File

@ -15430,11 +15430,6 @@
"description": "If the payout was cancelled the reason provided here",
"nullable": true
},
"payout_token": {
"type": "string",
"description": "Provide a reference to a stored payout method",
"nullable": true
},
"unified_code": {
"type": "string",
"description": "error code unified across the connectors is received here if there was an error while calling connector",