feat(payouts): Implement Smart Retries for Payout (#3580)

Co-authored-by: Kashif <kashif.dev@protonmail.com>
Co-authored-by: Kashif <mohammed.kashif@juspay.in>
This commit is contained in:
Sakil Mostak
2024-02-28 15:08:20 +05:30
committed by GitHub
parent 15b367eb79
commit 8b32dffe32
13 changed files with 685 additions and 83 deletions

View File

@ -29,6 +29,7 @@ pub struct Payouts {
pub created_at: PrimitiveDateTime,
#[serde(with = "common_utils::custom_serde::iso8601")]
pub last_modified_at: PrimitiveDateTime,
pub attempt_count: i16,
}
impl Default for Payouts {
@ -53,6 +54,7 @@ impl Default for Payouts {
metadata: Option::default(),
created_at: now,
last_modified_at: now,
attempt_count: i16::default(),
}
}
}
@ -90,6 +92,7 @@ pub struct PayoutsNew {
pub created_at: Option<PrimitiveDateTime>,
#[serde(default, with = "common_utils::custom_serde::iso8601::option")]
pub last_modified_at: Option<PrimitiveDateTime>,
pub attempt_count: i16,
}
#[derive(Debug)]
@ -114,6 +117,9 @@ pub enum PayoutsUpdate {
recurring: bool,
last_modified_at: Option<PrimitiveDateTime>,
},
AttemptCountUpdate {
attempt_count: i16,
},
}
#[derive(Clone, Debug, Default, AsChangeset, router_derive::DebugAsDisplay)]
@ -130,6 +136,7 @@ pub struct PayoutsUpdateInternal {
pub metadata: Option<pii::SecretSerdeValue>,
pub last_modified_at: Option<PrimitiveDateTime>,
pub payout_method_id: Option<String>,
pub attempt_count: Option<i16>,
}
impl From<PayoutsUpdate> for PayoutsUpdateInternal {
@ -175,6 +182,10 @@ impl From<PayoutsUpdate> for PayoutsUpdateInternal {
recurring: Some(recurring),
..Default::default()
},
PayoutsUpdate::AttemptCountUpdate { attempt_count } => Self {
attempt_count: Some(attempt_count),
..Default::default()
},
}
}
}

View File

@ -34,6 +34,20 @@ impl PayoutAttempt {
.await
}
pub async fn find_by_merchant_id_payout_attempt_id(
conn: &PgPooledConn,
merchant_id: &str,
payout_attempt_id: &str,
) -> StorageResult<Self> {
generics::generic_find_one::<<Self as HasTable>::Table, _, _>(
conn,
dsl::merchant_id
.eq(merchant_id.to_owned())
.and(dsl::payout_attempt_id.eq(payout_attempt_id.to_owned())),
)
.await
}
pub async fn update_by_merchant_id_payout_id(
conn: &PgPooledConn,
merchant_id: &str,
@ -54,4 +68,25 @@ impl PayoutAttempt {
report!(errors::DatabaseError::NotFound).attach_printable("Error while updating payout")
})
}
pub async fn update_by_merchant_id_payout_attempt_id(
conn: &PgPooledConn,
merchant_id: &str,
payout_attempt_id: &str,
payout: PayoutAttemptUpdate,
) -> StorageResult<Self> {
generics::generic_update_with_results::<<Self as HasTable>::Table, _, _, _>(
conn,
dsl::merchant_id
.eq(merchant_id.to_owned())
.and(dsl::payout_attempt_id.eq(payout_attempt_id.to_owned())),
PayoutAttemptUpdateInternal::from(payout),
)
.await?
.first()
.cloned()
.ok_or_else(|| {
report!(errors::DatabaseError::NotFound).attach_printable("Error while updating payout")
})
}
}

View File

@ -907,6 +907,7 @@ diesel::table! {
metadata -> Nullable<Jsonb>,
created_at -> Timestamp,
last_modified_at -> Timestamp,
attempt_count -> Int2,
}
}

View File

@ -9,7 +9,7 @@ readme = "README.md"
license.workspace = true
[features]
default = ["kv_store", "stripe", "oltp", "olap", "backwards_compatibility", "accounts_cache", "dummy_connector", "payouts", "business_profile_routing", "connector_choice_mca_id", "profile_specific_fallback_routing", "retry", "frm"]
default = ["kv_store", "stripe", "oltp", "olap", "backwards_compatibility", "accounts_cache", "dummy_connector", "payouts", "payout_retry", "business_profile_routing", "connector_choice_mca_id", "profile_specific_fallback_routing", "retry", "frm"]
email = ["external_services/email", "scheduler/email", "olap"]
frm = []
stripe = ["dep:serde_qs"]
@ -27,6 +27,7 @@ connector_choice_mca_id = ["api_models/connector_choice_mca_id", "euclid/connect
external_access_dc = ["dummy_connector"]
detailed_errors = ["api_models/detailed_errors", "error-stack/serde"]
payouts = ["common_enums/payouts"]
payout_retry = ["payouts"]
recon = ["email", "api_models/recon"]
retry = []

View File

@ -55,7 +55,6 @@ pub struct SubError {
pub message: String,
pub path: Option<String>,
pub field: Option<String>,
pub arguments: Option<Vec<String>>,
}
// Payouts

View File

@ -1,6 +1,10 @@
pub mod helpers;
#[cfg(feature = "payout_retry")]
pub mod retry;
pub mod validator;
use std::vec::IntoIter;
use api_models::enums as api_enums;
use common_utils::{crypto::Encryptable, ext_traits::ValueExt, pii};
use diesel_models::enums as storage_enums;
@ -42,8 +46,18 @@ pub struct PayoutData {
}
// ********************************************** CORE FLOWS **********************************************
pub fn get_next_connector(
connectors: &mut IntoIter<api::ConnectorData>,
) -> RouterResult<api::ConnectorData> {
connectors
.next()
.ok_or(errors::ApiErrorResponse::InternalServerError)
.into_report()
.attach_printable("Connector not found in connectors iterator")
}
#[cfg(feature = "payouts")]
pub async fn get_connector_data(
pub async fn get_connector_choice(
state: &AppState,
merchant_account: &domain::MerchantAccount,
key_store: &domain::MerchantKeyStore,
@ -51,7 +65,7 @@ pub async fn get_connector_data(
routing_algorithm: Option<serde_json::Value>,
payout_data: &mut PayoutData,
eligible_connectors: Option<Vec<api_models::enums::Connector>>,
) -> RouterResult<api::ConnectorData> {
) -> RouterResult<api::ConnectorCallType> {
let eligible_routable_connectors = eligible_connectors.map(|connectors| {
connectors
.into_iter()
@ -59,7 +73,7 @@ pub async fn get_connector_data(
.collect()
});
let connector_choice = helpers::get_default_payout_connector(state, routing_algorithm).await?;
let connector_details = match connector_choice {
match connector_choice {
api::ConnectorChoice::SessionMultiple(_) => {
Err(errors::ApiErrorResponse::InternalServerError)
.into_report()
@ -94,7 +108,7 @@ pub async fn get_connector_data(
payout_data,
eligible_routable_connectors,
)
.await?
.await
}
api::ConnectorChoice::Decide => {
@ -119,39 +133,75 @@ pub async fn get_connector_data(
payout_data,
eligible_routable_connectors,
)
.await?
.await
}
};
let connector_data = match connector_details {
api::ConnectorCallType::SessionMultiple(_) => {
Err(errors::ApiErrorResponse::InternalServerError)
.into_report()
.attach_printable("Invalid connector details - SessionMultiple")?
}
api::ConnectorCallType::PreDetermined(connector) => connector,
}
api::ConnectorCallType::Retryable(connectors) => {
let mut connectors = connectors.into_iter();
payments::get_connector_data(&mut connectors)?
}
};
// Update connector in DB
payout_data.payout_attempt.connector = Some(connector_data.connector_name.to_string());
let updated_payout_attempt = storage::PayoutAttemptUpdate::UpdateRouting {
connector: connector_data.connector_name.to_string(),
routing_info: payout_data.payout_attempt.routing_info.clone(),
};
let db = &*state.store;
db.update_payout_attempt_by_merchant_id_payout_id(
&payout_data.payout_attempt.merchant_id,
&payout_data.payout_attempt.payout_id,
updated_payout_attempt,
#[cfg(feature = "payouts")]
#[instrument(skip_all)]
pub async fn make_connector_decision(
state: &AppState,
merchant_account: &domain::MerchantAccount,
key_store: &domain::MerchantKeyStore,
req: &payouts::PayoutCreateRequest,
connector_call_type: api::ConnectorCallType,
mut payout_data: PayoutData,
) -> RouterResult<PayoutData> {
match connector_call_type {
api::ConnectorCallType::PreDetermined(connector_data) => {
call_connector_payout(
state,
merchant_account,
key_store,
req,
&connector_data,
&mut payout_data,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating routing info in payout_attempt")?;
Ok(connector_data)
}
api::ConnectorCallType::Retryable(connectors) => {
let mut connectors = connectors.into_iter();
let connector_data = get_next_connector(&mut connectors)?;
payout_data = call_connector_payout(
state,
merchant_account,
key_store,
req,
&connector_data,
&mut payout_data,
)
.await?;
#[cfg(feature = "payout_retry")]
{
use crate::core::payouts::retry::{self, GsmValidation};
let config_bool = retry::config_should_call_gsm_payout(
&*state.store,
&merchant_account.merchant_id,
)
.await;
if config_bool && payout_data.should_call_gsm() {
payout_data = retry::do_gsm_actions(
state,
connectors,
connector_data,
payout_data,
merchant_account,
key_store,
req,
)
.await?;
}
}
Ok(payout_data)
}
_ => Err(errors::ApiErrorResponse::InternalServerError)?,
}
}
#[cfg(feature = "payouts")]
@ -178,8 +228,7 @@ pub async fn payouts_create_core(
)
.await?;
// Form connector data
let connector_data = get_connector_data(
let connector_call_type = get_connector_choice(
&state,
&merchant_account,
&key_store,
@ -190,13 +239,21 @@ pub async fn payouts_create_core(
)
.await?;
call_connector_payout(
payout_data = make_connector_decision(
&state,
&merchant_account,
&key_store,
&req,
&connector_data,
&mut payout_data,
connector_call_type,
payout_data,
)
.await?;
response_handler(
&state,
&merchant_account,
&payouts::PayoutRequest::PayoutCreateRequest(req.to_owned()),
&payout_data,
)
.await
}
@ -246,6 +303,8 @@ pub async fn payouts_update_core(
let db = &*state.store;
let payout_id = req.payout_id.clone().get_required_value("payout_id")?;
let payout_attempt_id =
utils::get_payment_attempt_id(payout_id.to_owned(), payouts.attempt_count);
let merchant_id = &merchant_account.merchant_id;
payout_data.payouts = db
.update_payout_by_merchant_id_payout_id(merchant_id, &payout_id, updated_payouts)
@ -279,9 +338,9 @@ pub async fn payouts_update_core(
last_modified_at: Some(common_utils::date_time::now()),
};
payout_data.payout_attempt = db
.update_payout_attempt_by_merchant_id_payout_id(
.update_payout_attempt_by_merchant_id_payout_attempt_id(
merchant_id,
&payout_id,
&payout_attempt_id,
update_payout_attempt,
)
.await
@ -290,31 +349,20 @@ pub async fn payouts_update_core(
}
}
let connector_data = match &payout_attempt.connector {
// Evaluate and fetch connector data
None => {
get_connector_data(
&state,
&merchant_account,
&key_store,
None,
req.routing.clone(),
&mut payout_data,
payout_data = match (
req.connector.clone(),
)
.await?
}
// Use existing connector
Some(connector) => api::ConnectorData::get_payout_connector_by_name(
payout_data.payout_attempt.connector.clone(),
) {
// if the connector is not updated but was provided during payout create
(None, Some(connector)) => {
let connector_data = api::ConnectorData::get_payout_connector_by_name(
&state.conf.connectors,
connector,
connector.as_str(),
api::GetToken::Connector,
payout_attempt.merchant_connector_id.clone(),
)
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to get the connector data")?,
};
.attach_printable("Failed to get the connector data")?;
call_connector_payout(
&state,
@ -324,6 +372,58 @@ pub async fn payouts_update_core(
&connector_data,
&mut payout_data,
)
.await?
}
// if the connector is updated or not present both in create and update call
_ => {
payout_data.payout_attempt.connector = None;
payout_data.payout_attempt.routing_info = None;
//fetch payout_method_data
payout_data.payout_method_data = Some(
helpers::make_payout_method_data(
&state,
req.payout_method_data.as_ref(),
payout_data.payout_attempt.payout_token.as_deref(),
&payout_data.payout_attempt.customer_id,
&payout_data.payout_attempt.merchant_id,
&payout_data.payout_attempt.payout_id,
Some(&payouts.payout_type),
&key_store,
)
.await?
.get_required_value("payout_method_data")?,
);
let connector_call_type = get_connector_choice(
&state,
&merchant_account,
&key_store,
None,
req.routing.clone(),
&mut payout_data,
req.connector.clone(),
)
.await?;
make_connector_decision(
&state,
&merchant_account,
&key_store,
&req,
connector_call_type,
payout_data,
)
.await?
}
};
response_handler(
&state,
&merchant_account,
&payouts::PayoutRequest::PayoutCreateRequest(req.to_owned()),
&payout_data,
)
.await
}
@ -548,9 +648,30 @@ pub async fn call_connector_payout(
req: &payouts::PayoutCreateRequest,
connector_data: &api::ConnectorData,
payout_data: &mut PayoutData,
) -> RouterResponse<payouts::PayoutCreateResponse> {
) -> RouterResult<PayoutData> {
let payout_attempt = &payout_data.payout_attempt.to_owned();
let payouts: &diesel_models::payouts::Payouts = &payout_data.payouts.to_owned();
// update connector_name
if payout_data.payout_attempt.connector.is_none()
|| payout_data.payout_attempt.connector != Some(connector_data.connector_name.to_string())
{
payout_data.payout_attempt.connector = Some(connector_data.connector_name.to_string());
let updated_payout_attempt = storage::PayoutAttemptUpdate::UpdateRouting {
connector: connector_data.connector_name.to_string(),
routing_info: payout_data.payout_attempt.routing_info.clone(),
};
let db = &*state.store;
db.update_payout_attempt_by_merchant_id_payout_attempt_id(
&payout_data.payout_attempt.merchant_id,
&payout_data.payout_attempt.payout_attempt_id,
updated_payout_attempt,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating routing info in payout_attempt")?;
};
// Fetch / store payout_method_data
if payout_data.payout_method_data.is_none() || payout_attempt.payout_token.is_none() {
payout_data.payout_method_data = Some(
@ -568,6 +689,7 @@ pub async fn call_connector_payout(
.get_required_value("payout_method_data")?,
);
}
if let Some(true) = req.confirm {
// Eligibility flow
if payouts.payout_type == storage_enums::PayoutType::Card
@ -659,13 +781,7 @@ pub async fn call_connector_payout(
.attach_printable("Payout fulfillment failed for given Payout request")?;
}
response_handler(
state,
merchant_account,
&payouts::PayoutRequest::PayoutCreateRequest(req.to_owned()),
payout_data,
)
.await
Ok(payout_data.to_owned())
}
#[cfg(feature = "payouts")]
@ -796,6 +912,9 @@ pub async fn check_payout_eligibility(
let db = &*state.store;
let merchant_id = &merchant_account.merchant_id;
let payout_id = &payout_data.payouts.payout_id;
let payout_attempt_id =
&utils::get_payment_attempt_id(payout_id, payout_data.payouts.attempt_count);
match router_data_resp.response {
Ok(payout_response_data) => {
let payout_attempt = &payout_data.payout_attempt;
@ -812,9 +931,9 @@ pub async fn check_payout_eligibility(
last_modified_at: Some(common_utils::date_time::now()),
};
payout_data.payout_attempt = db
.update_payout_attempt_by_merchant_id_payout_id(
.update_payout_attempt_by_merchant_id_payout_attempt_id(
merchant_id,
payout_id,
payout_attempt_id,
updated_payout_attempt,
)
.await
@ -839,9 +958,9 @@ pub async fn check_payout_eligibility(
last_modified_at: Some(common_utils::date_time::now()),
};
payout_data.payout_attempt = db
.update_payout_attempt_by_merchant_id_payout_id(
.update_payout_attempt_by_merchant_id_payout_attempt_id(
merchant_id,
payout_id,
payout_attempt_id,
updated_payout_attempt,
)
.await
@ -902,6 +1021,9 @@ pub async fn create_payout(
let db = &*state.store;
let merchant_id = &merchant_account.merchant_id;
let payout_id = &payout_data.payouts.payout_id;
let payout_attempt_id =
&utils::get_payment_attempt_id(payout_id, payout_data.payouts.attempt_count);
match router_data_resp.response {
Ok(payout_response_data) => {
let payout_attempt = &payout_data.payout_attempt;
@ -918,9 +1040,9 @@ pub async fn create_payout(
last_modified_at: Some(common_utils::date_time::now()),
};
payout_data.payout_attempt = db
.update_payout_attempt_by_merchant_id_payout_id(
.update_payout_attempt_by_merchant_id_payout_attempt_id(
merchant_id,
payout_id,
payout_attempt_id,
updated_payout_attempt,
)
.await
@ -945,9 +1067,9 @@ pub async fn create_payout(
last_modified_at: Some(common_utils::date_time::now()),
};
payout_data.payout_attempt = db
.update_payout_attempt_by_merchant_id_payout_id(
.update_payout_attempt_by_merchant_id_payout_attempt_id(
merchant_id,
payout_id,
payout_attempt_id,
updated_payout_attempt,
)
.await
@ -1095,6 +1217,9 @@ pub async fn fulfill_payout(
let merchant_id = &merchant_account.merchant_id;
let payout_attempt = &payout_data.payout_attempt;
let payout_id = &payout_attempt.payout_id;
let payout_attempt_id =
&utils::get_payment_attempt_id(payout_id, payout_data.payouts.attempt_count);
match router_data_resp.response {
Ok(payout_response_data) => {
if payout_data.payouts.recurring && payout_data.payouts.payout_method_id.is_none() {
@ -1122,9 +1247,9 @@ pub async fn fulfill_payout(
last_modified_at: Some(common_utils::date_time::now()),
};
payout_data.payout_attempt = db
.update_payout_attempt_by_merchant_id_payout_id(
.update_payout_attempt_by_merchant_id_payout_attempt_id(
merchant_id,
payout_id,
payout_attempt_id,
updated_payouts,
)
.await
@ -1148,9 +1273,9 @@ pub async fn fulfill_payout(
last_modified_at: Some(common_utils::date_time::now()),
};
payout_data.payout_attempt = db
.update_payout_attempt_by_merchant_id_payout_id(
.update_payout_attempt_by_merchant_id_payout_attempt_id(
merchant_id,
payout_id,
payout_attempt_id,
updated_payouts,
)
.await
@ -1324,6 +1449,7 @@ pub async fn payout_create_db_entries(
.set_created_at(Some(common_utils::date_time::now()))
.set_last_modified_at(Some(common_utils::date_time::now()))
.set_payout_method_id(payout_method_id)
.set_attempt_count(1)
.to_owned();
let payouts = db
.insert_payout(payouts_req)
@ -1407,8 +1533,10 @@ pub async fn make_payout_data(
.await
.to_not_found_response(errors::ApiErrorResponse::PayoutNotFound)?;
let payout_attempt_id = utils::get_payment_attempt_id(payout_id, payouts.attempt_count);
let payout_attempt = db
.find_payout_attempt_by_merchant_id_payout_id(merchant_id, &payout_id)
.find_payout_attempt_by_merchant_id_payout_attempt_id(merchant_id, &payout_attempt_id)
.await
.to_not_found_response(errors::ApiErrorResponse::PayoutNotFound)?;

View File

@ -6,6 +6,7 @@ use common_utils::{
use diesel_models::encryption::Encryption;
use error_stack::{IntoReport, ResultExt};
use masking::{ExposeInterface, PeekInterface, Secret};
use router_env::logger;
use super::PayoutData;
use crate::{
@ -24,7 +25,7 @@ use crate::{
utils as core_utils,
},
db::StorageInterface,
routes::AppState,
routes::{metrics, AppState},
services,
types::{
api::{self, enums as api_enums},
@ -642,6 +643,49 @@ pub fn should_call_payout_connector_create_customer<'a>(
}
}
pub async fn get_gsm_record(
state: &AppState,
error_code: Option<String>,
error_message: Option<String>,
connector_name: Option<String>,
flow: String,
) -> Option<storage::gsm::GatewayStatusMap> {
let get_gsm = || async {
state.store.find_gsm_rule(
connector_name.clone().unwrap_or_default(),
flow.clone(),
"sub_flow".to_string(),
error_code.clone().unwrap_or_default(), // TODO: make changes in connector to get a mandatory code in case of success or error response
error_message.clone().unwrap_or_default(),
)
.await
.map_err(|err| {
if err.current_context().is_db_not_found() {
logger::warn!(
"GSM miss for connector - {}, flow - {}, error_code - {:?}, error_message - {:?}",
connector_name.unwrap_or_default(),
flow,
error_code,
error_message
);
metrics::AUTO_PAYOUT_RETRY_GSM_MISS_COUNT.add(&metrics::CONTEXT, 1, &[]);
} else {
metrics::AUTO_PAYOUT_RETRY_GSM_FETCH_FAILURE_COUNT.add(&metrics::CONTEXT, 1, &[]);
};
err.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("failed to fetch decision from gsm")
})
};
get_gsm()
.await
.map_err(|err| {
// warn log should suffice here because we are not propagating this error
logger::warn!(get_gsm_decision_fetch_error=?err, "error fetching gsm decision");
err
})
.ok()
}
pub fn is_payout_initiated(status: api_enums::PayoutStatus) -> bool {
matches!(
status,

View File

@ -0,0 +1,273 @@
use std::{str::FromStr, vec::IntoIter};
use api_models::payouts::PayoutCreateRequest;
use error_stack::{IntoReport, ResultExt};
use router_env::{
logger,
tracing::{self, instrument},
};
use super::{call_connector_payout, PayoutData};
use crate::{
core::{
errors::{self, RouterResult, StorageErrorExt},
payouts,
},
db::StorageInterface,
routes::{self, app, metrics},
types::{api, domain, storage},
utils,
};
#[instrument(skip_all)]
#[allow(clippy::too_many_arguments)]
pub async fn do_gsm_actions(
state: &app::AppState,
mut connectors: IntoIter<api::ConnectorData>,
original_connector_data: api::ConnectorData,
mut payout_data: PayoutData,
merchant_account: &domain::MerchantAccount,
key_store: &domain::MerchantKeyStore,
req: &PayoutCreateRequest,
) -> RouterResult<PayoutData> {
let mut retries = None;
metrics::AUTO_PAYOUT_RETRY_ELIGIBLE_REQUEST_COUNT.add(&metrics::CONTEXT, 1, &[]);
let mut connector = original_connector_data;
loop {
let gsm = get_gsm(state, &connector, &payout_data).await?;
match get_gsm_decision(gsm) {
api_models::gsm::GsmDecision::Retry => {
retries = get_retries(state, retries, &merchant_account.merchant_id).await;
if retries.is_none() || retries == Some(0) {
metrics::AUTO_PAYOUT_RETRY_EXHAUSTED_COUNT.add(&metrics::CONTEXT, 1, &[]);
logger::info!("retries exhausted for auto_retry payout");
break;
}
if connectors.len() == 0 {
logger::info!("connectors exhausted for auto_retry payout");
metrics::AUTO_PAYOUT_RETRY_EXHAUSTED_COUNT.add(&metrics::CONTEXT, 1, &[]);
break;
}
connector = super::get_next_connector(&mut connectors)?;
payout_data = do_retry(
&state.clone(),
connector.to_owned(),
merchant_account,
key_store,
payout_data,
req,
)
.await?;
retries = retries.map(|i| i - 1);
}
api_models::gsm::GsmDecision::Requeue => {
Err(errors::ApiErrorResponse::NotImplemented {
message: errors::api_error_response::NotImplementedMessage::Reason(
"Requeue not implemented".to_string(),
),
})
.into_report()?
}
api_models::gsm::GsmDecision::DoDefault => break,
}
}
Ok(payout_data)
}
#[instrument(skip_all)]
pub async fn get_retries(
state: &app::AppState,
retries: Option<i32>,
merchant_id: &str,
) -> Option<i32> {
match retries {
Some(retries) => Some(retries),
None => {
let key = format!("max_auto_payout_retries_enabled_{merchant_id}");
let db = &*state.store;
db.find_config_by_key(key.as_str())
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.and_then(|retries_config| {
retries_config
.config
.parse::<i32>()
.into_report()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Retries config parsing failed")
})
.map_err(|err| {
logger::error!(retries_error=?err);
None::<i32>
})
.ok()
}
}
}
#[instrument(skip_all)]
pub async fn get_gsm(
state: &app::AppState,
original_connector_data: &api::ConnectorData,
payout_data: &PayoutData,
) -> RouterResult<Option<storage::gsm::GatewayStatusMap>> {
let error_code = payout_data.payout_attempt.error_code.to_owned();
let error_message = payout_data.payout_attempt.error_message.to_owned();
let connector_name = Some(original_connector_data.connector_name.to_string());
let flow = "payout_flow".to_string();
Ok(
payouts::helpers::get_gsm_record(state, error_code, error_message, connector_name, flow)
.await,
)
}
#[instrument(skip_all)]
pub fn get_gsm_decision(
option_gsm: Option<storage::gsm::GatewayStatusMap>,
) -> api_models::gsm::GsmDecision {
let option_gsm_decision = option_gsm
.and_then(|gsm| {
api_models::gsm::GsmDecision::from_str(gsm.decision.as_str())
.into_report()
.map_err(|err| {
let api_error = err.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("gsm decision parsing failed");
logger::warn!(get_gsm_decision_parse_error=?api_error, "error fetching gsm decision");
api_error
})
.ok()
});
if option_gsm_decision.is_some() {
metrics::AUTO_PAYOUT_RETRY_GSM_MATCH_COUNT.add(&metrics::CONTEXT, 1, &[]);
}
option_gsm_decision.unwrap_or_default()
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
pub async fn do_retry(
state: &routes::AppState,
connector: api::ConnectorData,
merchant_account: &domain::MerchantAccount,
key_store: &domain::MerchantKeyStore,
mut payout_data: PayoutData,
req: &PayoutCreateRequest,
) -> RouterResult<PayoutData> {
metrics::AUTO_RETRY_PAYOUT_COUNT.add(&metrics::CONTEXT, 1, &[]);
modify_trackers(state, &connector, merchant_account, &mut payout_data).await?;
call_connector_payout(
state,
merchant_account,
key_store,
req,
&connector,
&mut payout_data,
)
.await
}
#[instrument(skip_all)]
pub async fn modify_trackers(
state: &routes::AppState,
connector: &api::ConnectorData,
merchant_account: &domain::MerchantAccount,
payout_data: &mut PayoutData,
) -> RouterResult<()> {
let new_attempt_count = payout_data.payouts.attempt_count + 1;
let db = &*state.store;
// update payout table's attempt count
let payouts = payout_data.payouts.to_owned();
let updated_payouts = storage::PayoutsUpdate::AttemptCountUpdate {
attempt_count: new_attempt_count,
};
let payout_id = payouts.payout_id.clone();
let merchant_id = &merchant_account.merchant_id;
payout_data.payouts = db
.update_payout_by_merchant_id_payout_id(merchant_id, &payout_id.to_owned(), updated_payouts)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error updating payouts")?;
let payout_attempt_id =
utils::get_payment_attempt_id(payout_id.to_owned(), payout_data.payouts.attempt_count);
let new_payout_attempt_req = storage::PayoutAttemptNew::default()
.set_payout_attempt_id(payout_attempt_id.to_string())
.set_payout_id(payout_id.to_owned())
.set_customer_id(payout_data.payout_attempt.customer_id.to_owned())
.set_connector(Some(connector.connector_name.to_string()))
.set_merchant_id(payout_data.payout_attempt.merchant_id.to_owned())
.set_address_id(payout_data.payout_attempt.address_id.to_owned())
.set_business_country(payout_data.payout_attempt.business_country.to_owned())
.set_business_label(payout_data.payout_attempt.business_label.to_owned())
.set_payout_token(payout_data.payout_attempt.payout_token.to_owned())
.set_created_at(Some(common_utils::date_time::now()))
.set_last_modified_at(Some(common_utils::date_time::now()))
.set_profile_id(Some(payout_data.payout_attempt.profile_id.to_string()))
.to_owned();
payout_data.payout_attempt = db
.insert_payout_attempt(new_payout_attempt_req)
.await
.to_duplicate_response(errors::ApiErrorResponse::DuplicatePayout { payout_id })
.attach_printable("Error inserting payouts in db")?;
payout_data.merchant_connector_account = None;
Ok(())
}
pub async fn config_should_call_gsm_payout(
db: &dyn StorageInterface,
merchant_id: &String,
) -> bool {
let config = db
.find_config_by_key_unwrap_or(
format!("should_call_gsm_payout_{}", merchant_id).as_str(),
Some("false".to_string()),
)
.await;
match config {
Ok(conf) => conf.config == "true",
Err(err) => {
logger::error!("{err}");
false
}
}
}
pub trait GsmValidation {
// TODO : move this function to appropriate place later.
fn should_call_gsm(&self) -> bool;
}
impl GsmValidation for PayoutData {
#[inline(always)]
fn should_call_gsm(&self) -> bool {
match self.payout_attempt.status {
common_enums::PayoutStatus::Success
| common_enums::PayoutStatus::Cancelled
| common_enums::PayoutStatus::Pending
| common_enums::PayoutStatus::Ineligible
| common_enums::PayoutStatus::RequiresCreation
| common_enums::PayoutStatus::RequiresPayoutMethodData
| common_enums::PayoutStatus::RequiresFulfillment => false,
common_enums::PayoutStatus::Failed => true,
}
}
}

View File

@ -1333,6 +1333,16 @@ impl PayoutAttemptInterface for KafkaStore {
.await
}
async fn find_payout_attempt_by_merchant_id_payout_attempt_id(
&self,
merchant_id: &str,
payout_attempt_id: &str,
) -> CustomResult<storage::PayoutAttempt, errors::StorageError> {
self.diesel_store
.find_payout_attempt_by_merchant_id_payout_attempt_id(merchant_id, payout_attempt_id)
.await
}
async fn update_payout_attempt_by_merchant_id_payout_id(
&self,
merchant_id: &str,
@ -1344,6 +1354,21 @@ impl PayoutAttemptInterface for KafkaStore {
.await
}
async fn update_payout_attempt_by_merchant_id_payout_attempt_id(
&self,
merchant_id: &str,
payout_attempt_id: &str,
payout: storage::PayoutAttemptUpdate,
) -> CustomResult<storage::PayoutAttempt, errors::StorageError> {
self.diesel_store
.update_payout_attempt_by_merchant_id_payout_attempt_id(
merchant_id,
payout_attempt_id,
payout,
)
.await
}
async fn insert_payout_attempt(
&self,
payout: storage::PayoutAttemptNew,

View File

@ -15,6 +15,12 @@ pub trait PayoutAttemptInterface {
_payout_id: &str,
) -> CustomResult<storage::PayoutAttempt, errors::StorageError>;
async fn find_payout_attempt_by_merchant_id_payout_attempt_id(
&self,
_merchant_id: &str,
_payout_attempt_id: &str,
) -> CustomResult<storage::PayoutAttempt, errors::StorageError>;
async fn update_payout_attempt_by_merchant_id_payout_id(
&self,
_merchant_id: &str,
@ -22,6 +28,13 @@ pub trait PayoutAttemptInterface {
_payout: storage::PayoutAttemptUpdate,
) -> CustomResult<storage::PayoutAttempt, errors::StorageError>;
async fn update_payout_attempt_by_merchant_id_payout_attempt_id(
&self,
_merchant_id: &str,
_payout_attempt_id: &str,
_payout: storage::PayoutAttemptUpdate,
) -> CustomResult<storage::PayoutAttempt, errors::StorageError>;
async fn insert_payout_attempt(
&self,
_payout: storage::PayoutAttemptNew,
@ -42,6 +55,22 @@ impl PayoutAttemptInterface for Store {
.into_report()
}
async fn find_payout_attempt_by_merchant_id_payout_attempt_id(
&self,
merchant_id: &str,
payout_attempt_id: &str,
) -> CustomResult<storage::PayoutAttempt, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage::PayoutAttempt::find_by_merchant_id_payout_attempt_id(
&conn,
merchant_id,
payout_attempt_id,
)
.await
.map_err(Into::into)
.into_report()
}
async fn update_payout_attempt_by_merchant_id_payout_id(
&self,
merchant_id: &str,
@ -60,6 +89,24 @@ impl PayoutAttemptInterface for Store {
.into_report()
}
async fn update_payout_attempt_by_merchant_id_payout_attempt_id(
&self,
merchant_id: &str,
payout_attempt_id: &str,
payout: storage::PayoutAttemptUpdate,
) -> CustomResult<storage::PayoutAttempt, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
storage::PayoutAttempt::update_by_merchant_id_payout_attempt_id(
&conn,
merchant_id,
payout_attempt_id,
payout,
)
.await
.map_err(Into::into)
.into_report()
}
async fn insert_payout_attempt(
&self,
payout: storage::PayoutAttemptNew,
@ -80,6 +127,15 @@ impl PayoutAttemptInterface for MockDb {
Err(errors::StorageError::MockDbError)?
}
async fn find_payout_attempt_by_merchant_id_payout_attempt_id(
&self,
_merchant_id: &str,
_payout_attempt_id: &str,
) -> CustomResult<storage::PayoutAttempt, errors::StorageError> {
// TODO: Implement function for `MockDb`
Err(errors::StorageError::MockDbError)?
}
async fn update_payout_attempt_by_merchant_id_payout_id(
&self,
_merchant_id: &str,
@ -90,6 +146,16 @@ impl PayoutAttemptInterface for MockDb {
Err(errors::StorageError::MockDbError)?
}
async fn update_payout_attempt_by_merchant_id_payout_attempt_id(
&self,
_merchant_id: &str,
_payout_attempt_id: &str,
_payout: storage::PayoutAttemptUpdate,
) -> CustomResult<storage::PayoutAttempt, errors::StorageError> {
// TODO: Implement function for `MockDb`
Err(errors::StorageError::MockDbError)?
}
async fn insert_payout_attempt(
&self,
_payout: storage::PayoutAttemptNew,

View File

@ -101,7 +101,7 @@ counter_metric!(APPLE_PAY_SIMPLIFIED_FLOW_SUCCESSFUL_PAYMENT, GLOBAL_METER);
counter_metric!(APPLE_PAY_MANUAL_FLOW_FAILED_PAYMENT, GLOBAL_METER);
counter_metric!(APPLE_PAY_SIMPLIFIED_FLOW_FAILED_PAYMENT, GLOBAL_METER);
// Metrics for Auto Retries
// Metrics for Payment Auto Retries
counter_metric!(AUTO_RETRY_CONNECTION_CLOSED, GLOBAL_METER);
counter_metric!(AUTO_RETRY_ELIGIBLE_REQUEST_COUNT, GLOBAL_METER);
counter_metric!(AUTO_RETRY_GSM_MISS_COUNT, GLOBAL_METER);
@ -110,6 +110,14 @@ counter_metric!(AUTO_RETRY_GSM_MATCH_COUNT, GLOBAL_METER);
counter_metric!(AUTO_RETRY_EXHAUSTED_COUNT, GLOBAL_METER);
counter_metric!(AUTO_RETRY_PAYMENT_COUNT, GLOBAL_METER);
// Metrics for Payout Auto Retries
counter_metric!(AUTO_PAYOUT_RETRY_ELIGIBLE_REQUEST_COUNT, GLOBAL_METER);
counter_metric!(AUTO_PAYOUT_RETRY_GSM_MISS_COUNT, GLOBAL_METER);
counter_metric!(AUTO_PAYOUT_RETRY_GSM_FETCH_FAILURE_COUNT, GLOBAL_METER);
counter_metric!(AUTO_PAYOUT_RETRY_GSM_MATCH_COUNT, GLOBAL_METER);
counter_metric!(AUTO_PAYOUT_RETRY_EXHAUSTED_COUNT, GLOBAL_METER);
counter_metric!(AUTO_RETRY_PAYOUT_COUNT, GLOBAL_METER);
counter_metric!(TASKS_ADDED_COUNT, GLOBAL_METER); // Tasks added to process tracker
counter_metric!(TASKS_RESET_COUNT, GLOBAL_METER); // Tasks reset in process tracker for requeue flow

View File

@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
ALTER TABLE payouts DROP COLUMN attempt_count;

View File

@ -0,0 +1,9 @@
-- Your SQL goes here
ALTER TABLE payouts
ADD COLUMN attempt_count SMALLINT NOT NULL DEFAULT 1;
UPDATE payouts
SET attempt_count = payout_id_count.count
FROM (SELECT payout_id, count(payout_id) FROM payout_attempt GROUP BY payout_id) as payout_id_count
WHERE payouts.payout_id = payout_id_count.payout_id;