feat(async_refund): add sync feature to async refund (#93)

This commit is contained in:
Nishant Joshi
2022-12-12 12:46:04 +05:30
committed by GitHub
parent affa9fc35a
commit ea219dc893
6 changed files with 179 additions and 67 deletions

View File

@ -60,7 +60,12 @@ pub(crate) async fn refund_retrieve(
&req,
refund_id,
|state, merchant_account, refund_id| {
refunds::refund_retrieve_core(state, merchant_account, refund_id)
refunds::refund_response_wrapper(
state,
merchant_account,
refund_id,
refunds::refund_retrieve_core,
)
},
api::MerchantAuthentication::ApiKey,
)

View File

@ -356,6 +356,8 @@ pub enum ProcessTrackerError {
FlowExecutionError { flow: String },
#[error("Not Implemented")]
NotImplemented,
#[error("Job not found")]
JobNotFound,
#[error("Recieved Error ApiResponseError: {0}")]
EApiErrorResponse(error_stack::Report<ApiErrorResponse>),
#[error("Recieved Error StorageError: {0}")]

View File

@ -9,9 +9,9 @@ use crate::{
errors::{self, ConnectorErrorExt, RouterResponse, RouterResult, StorageErrorExt},
payments, utils as core_utils,
},
db::StorageInterface,
logger,
db, logger,
routes::AppState,
scheduler::{process_data, utils as process_tracker_utils, workflows::payment_sync},
services,
types::{
self,
@ -163,12 +163,28 @@ pub async fn trigger_refund_to_gateway(
// ********************************************** REFUND SYNC **********************************************
pub async fn refund_response_wrapper<'a, F, Fut, T>(
state: &'a AppState,
merchant_account: storage::MerchantAccount,
refund_id: String,
f: F,
) -> RouterResponse<refunds::RefundResponse>
where
F: Fn(&'a AppState, storage::MerchantAccount, String) -> Fut,
Fut: futures::Future<Output = RouterResult<T>>,
refunds::RefundResponse: From<T>,
{
Ok(services::BachResponse::Json(
f(state, merchant_account, refund_id).await?.into(),
))
}
#[instrument(skip_all)]
pub async fn refund_retrieve_core(
state: &AppState,
merchant_account: storage::MerchantAccount,
refund_id: String,
) -> RouterResponse<refunds::RefundResponse> {
) -> RouterResult<storage::Refund> {
let db = &*state.store;
let (merchant_id, payment_intent, payment_attempt, refund, response);
@ -212,7 +228,7 @@ pub async fn refund_retrieve_core(
)
.await?;
Ok(services::BachResponse::Json(response.into()))
Ok(response)
}
#[instrument(skip_all)]
@ -286,7 +302,7 @@ pub async fn sync_refund_with_gateway(
// ********************************************** REFUND UPDATE **********************************************
pub async fn refund_update_core(
db: &dyn StorageInterface,
db: &dyn db::StorageInterface,
merchant_account: storage::MerchantAccount,
refund_id: &str,
req: refunds::RefundRequest,
@ -576,9 +592,9 @@ pub async fn schedule_refund_execution(
#[instrument(skip_all)]
pub async fn sync_refund_with_gateway_workflow(
db: &dyn StorageInterface,
state: &AppState,
refund_tracker: &storage::ProcessTracker,
) -> RouterResult<()> {
) -> Result<(), errors::ProcessTrackerError> {
let refund_core =
serde_json::from_value::<storage::RefundCoreWorkflow>(refund_tracker.tracking_data.clone())
.into_report()
@ -590,7 +606,8 @@ pub async fn sync_refund_with_gateway_workflow(
)
})?;
let merchant_account = db
let merchant_account = state
.store
.find_merchant_account_by_merchant_id(&refund_core.merchant_id)
.await
.map_err(|error| {
@ -598,7 +615,8 @@ pub async fn sync_refund_with_gateway_workflow(
})?;
// FIXME we actually don't use this?
let _refund = db
let _refund = state
.store
.find_refund_by_internal_reference_id_merchant_id(
&refund_core.refund_internal_reference_id,
&refund_core.merchant_id,
@ -607,22 +625,53 @@ pub async fn sync_refund_with_gateway_workflow(
.await
.map_err(|error| error.to_not_found_response(errors::ApiErrorResponse::RefundNotFound))?;
let merchant_account = state
.store
.find_merchant_account_by_merchant_id(&refund_core.merchant_id)
.await?;
let response = refund_retrieve_core(
state,
merchant_account,
refund_core.refund_internal_reference_id,
)
.await?;
let terminal_status = vec![
enums::RefundStatus::Success,
enums::RefundStatus::Failure,
enums::RefundStatus::TransactionFailure,
];
match response.refund_status {
status if terminal_status.contains(&status) => {
let id = refund_tracker.id.clone();
refund_tracker
.clone()
.finish_with_status(&*state.store, format!("COMPLETED_BY_PT_{}", id))
.await?
}
_ => {
payment_sync::retry_sync_task(
&*state.store,
response.connector,
response.merchant_id,
refund_tracker.to_owned(),
)
.await?
}
}
Ok(())
// sync_refund_with_gateway(data, &refund).await.map(|_| ())
}
#[instrument(skip_all)]
pub async fn start_refund_workflow(
state: &AppState,
refund_tracker: &storage::ProcessTracker,
) -> RouterResult<()> {
) -> Result<(), errors::ProcessTrackerError> {
match refund_tracker.name.as_deref() {
Some("EXECUTE_REFUND") => trigger_refund_execute_workflow(state, refund_tracker).await,
Some("SYNC_REFUND") => {
sync_refund_with_gateway_workflow(&*state.store, refund_tracker).await
}
_ => Err(report!(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Job name cannot be identified")),
Some("SYNC_REFUND") => sync_refund_with_gateway_workflow(state, refund_tracker).await,
_ => Err(errors::ProcessTrackerError::JobNotFound),
}
}
@ -630,7 +679,7 @@ pub async fn start_refund_workflow(
pub async fn trigger_refund_execute_workflow(
state: &AppState,
refund_tracker: &storage::ProcessTracker,
) -> RouterResult<()> {
) -> Result<(), errors::ProcessTrackerError> {
let db = &*state.store;
let refund_core =
serde_json::from_value::<storage::RefundCoreWorkflow>(refund_tracker.tracking_data.clone())
@ -703,11 +752,16 @@ pub async fn trigger_refund_execute_workflow(
add_refund_sync_task(db, &updated_refund, "REFUND_WORKFLOW_ROUTER").await?;
}
(true, enums::RefundStatus::Pending) => {
//create sync task
// create sync task
add_refund_sync_task(db, &refund, "REFUND_WORKFLOW_ROUTER").await?;
}
(_, _) => {
//mark task as finished
let id = refund_tracker.id.clone();
refund_tracker
.clone()
.finish_with_status(db, format!("COMPLETED_BY_PT_{}", id))
.await?;
}
};
Ok(())
@ -727,7 +781,7 @@ pub fn refund_to_refund_core_workflow_model(
#[instrument(skip_all)]
pub async fn add_refund_sync_task(
db: &dyn StorageInterface,
db: &dyn db::StorageInterface,
refund: &storage::Refund,
runner: &str,
) -> RouterResult<storage::ProcessTracker> {
@ -762,7 +816,7 @@ pub async fn add_refund_sync_task(
#[instrument(skip_all)]
pub async fn add_refund_execute_task(
db: &dyn StorageInterface,
db: &dyn db::StorageInterface,
refund: &storage::Refund,
runner: &str,
) -> RouterResult<storage::ProcessTracker> {
@ -794,3 +848,49 @@ pub async fn add_refund_execute_task(
.change_context(errors::ApiErrorResponse::InternalServerError)?;
Ok(response)
}
pub async fn get_refund_sync_process_schedule_time(
db: &dyn db::StorageInterface,
connector: &str,
merchant_id: &str,
retry_count: i32,
) -> Result<Option<time::PrimitiveDateTime>, errors::ProcessTrackerError> {
let redis_mapping: errors::CustomResult<process_data::ConnectorPTMapping, errors::RedisError> =
db::get_and_deserialize_key(
db,
&format!("pt_mapping_refund_sync_{}", connector),
"ConnectorPTMapping",
)
.await;
let mapping = match redis_mapping {
Ok(x) => x,
Err(err) => {
logger::error!("Error: while getting connector mapping: {}", err);
process_data::ConnectorPTMapping::default()
}
};
let time_delta =
process_tracker_utils::get_schedule_time(mapping, merchant_id, retry_count + 1);
Ok(process_tracker_utils::get_time_from_delta(time_delta))
}
pub async fn retry_refund_sync_task(
db: &dyn db::StorageInterface,
connector: String,
merchant_id: String,
pt: storage::ProcessTracker,
) -> Result<(), errors::ProcessTrackerError> {
let schedule_time =
get_refund_sync_process_schedule_time(db, &connector, &merchant_id, pt.retry_count).await?;
match schedule_time {
Some(s_time) => pt.retry(db, s_time).await,
None => {
pt.finish_with_status(db, "RETRIES_EXCEEDED".to_string())
.await
}
}
}

View File

@ -42,7 +42,7 @@ pub async fn refunds_retrieve(
&req,
refund_id,
|state, merchant_account, refund_id| {
refund_retrieve_core(state, merchant_account, refund_id)
refund_response_wrapper(state, merchant_account, refund_id, refund_retrieve_core)
},
api::MerchantAuthentication::ApiKey,
)

View File

@ -8,7 +8,7 @@ use redis_interface::{RedisConnectionPool, RedisEntryId};
use router_env::opentelemetry;
use uuid::Uuid;
use super::{consumer, metrics, workflows};
use super::{consumer, metrics, process_data, workflows};
use crate::{
configs::settings::SchedulerSettings,
core::errors::{self, CustomResult},
@ -266,3 +266,40 @@ pub fn add_histogram_metrics(
);
};
}
pub fn get_schedule_time(
mapping: process_data::ConnectorPTMapping,
merchant_name: &str,
retry_count: i32,
) -> Option<i32> {
let mapping = match mapping.custom_merchant_mapping.get(merchant_name) {
Some(map) => map.clone(),
None => mapping.default_mapping,
};
if retry_count == 0 {
Some(mapping.start_after)
} else {
get_delay(
retry_count,
mapping.count.iter().zip(mapping.frequency.iter()),
)
}
}
fn get_delay<'a>(
retry_count: i32,
mut array: impl Iterator<Item = (&'a i32, &'a i32)>,
) -> Option<i32> {
match array.next() {
Some(ele) => {
let v = retry_count - ele.0;
if v <= 0 {
Some(*ele.1)
} else {
get_delay(v, array)
}
}
None => None,
}
}

View File

@ -1,10 +1,12 @@
use router_env::logger;
use super::{PaymentsSyncWorkflow, ProcessTrackerWorkflow};
use crate::{
core::payments::{self as payment_flows, operations},
db::{get_and_deserialize_key, StorageInterface},
errors,
routes::AppState,
scheduler::{consumer, process_data},
scheduler::{consumer, process_data, utils},
types::{
api,
storage::{self, enums},
@ -102,48 +104,14 @@ pub async fn get_sync_process_schedule_time(
.await;
let mapping = match redis_mapping {
Ok(x) => x,
Err(_) => process_data::ConnectorPTMapping::default(),
Err(err) => {
logger::error!("Redis Mapping Error: {}", err);
process_data::ConnectorPTMapping::default()
}
};
let time_delta = get_sync_schedule_time(mapping, merchant_id, retry_count + 1);
let time_delta = utils::get_schedule_time(mapping, merchant_id, retry_count + 1);
Ok(crate::scheduler::utils::get_time_from_delta(time_delta))
}
pub fn get_sync_schedule_time(
mapping: process_data::ConnectorPTMapping,
merchant_name: &str,
retry_count: i32,
) -> Option<i32> {
let mapping = match mapping.custom_merchant_mapping.get(merchant_name) {
Some(map) => map.clone(),
None => mapping.default_mapping,
};
if retry_count == 0 {
Some(mapping.start_after)
} else {
get_delay(
retry_count,
mapping.count.iter().zip(mapping.frequency.iter()),
)
}
}
fn get_delay<'a>(
retry_count: i32,
mut array: impl Iterator<Item = (&'a i32, &'a i32)>,
) -> Option<i32> {
match array.next() {
Some(ele) => {
let v = retry_count - ele.0;
if v <= 0 {
Some(*ele.1)
} else {
get_delay(v, array)
}
}
None => None,
}
Ok(utils::get_time_from_delta(time_delta))
}
pub async fn retry_sync_task(
@ -172,9 +140,9 @@ mod tests {
#[test]
fn test_get_default_schedule_time() {
let schedule_time_delta =
get_sync_schedule_time(process_data::ConnectorPTMapping::default(), "-", 0).unwrap();
utils::get_schedule_time(process_data::ConnectorPTMapping::default(), "-", 0).unwrap();
let first_retry_time_delta =
get_sync_schedule_time(process_data::ConnectorPTMapping::default(), "-", 1).unwrap();
utils::get_schedule_time(process_data::ConnectorPTMapping::default(), "-", 1).unwrap();
let cpt_default = process_data::ConnectorPTMapping::default().default_mapping;
assert_eq!(
vec![schedule_time_delta, first_retry_time_delta],