diff --git a/crates/router/src/compatibility/stripe/refunds.rs b/crates/router/src/compatibility/stripe/refunds.rs index cdeb26113b..516c0ee005 100644 --- a/crates/router/src/compatibility/stripe/refunds.rs +++ b/crates/router/src/compatibility/stripe/refunds.rs @@ -60,7 +60,12 @@ pub(crate) async fn refund_retrieve( &req, refund_id, |state, merchant_account, refund_id| { - refunds::refund_retrieve_core(state, merchant_account, refund_id) + refunds::refund_response_wrapper( + state, + merchant_account, + refund_id, + refunds::refund_retrieve_core, + ) }, api::MerchantAuthentication::ApiKey, ) diff --git a/crates/router/src/core/errors.rs b/crates/router/src/core/errors.rs index 96e44e5c63..6b5f79a537 100644 --- a/crates/router/src/core/errors.rs +++ b/crates/router/src/core/errors.rs @@ -356,6 +356,8 @@ pub enum ProcessTrackerError { FlowExecutionError { flow: String }, #[error("Not Implemented")] NotImplemented, + #[error("Job not found")] + JobNotFound, #[error("Recieved Error ApiResponseError: {0}")] EApiErrorResponse(error_stack::Report), #[error("Recieved Error StorageError: {0}")] diff --git a/crates/router/src/core/refunds.rs b/crates/router/src/core/refunds.rs index c58e1f0cfc..fc2e4937ad 100644 --- a/crates/router/src/core/refunds.rs +++ b/crates/router/src/core/refunds.rs @@ -9,9 +9,9 @@ use crate::{ errors::{self, ConnectorErrorExt, RouterResponse, RouterResult, StorageErrorExt}, payments, utils as core_utils, }, - db::StorageInterface, - logger, + db, logger, routes::AppState, + scheduler::{process_data, utils as process_tracker_utils, workflows::payment_sync}, services, types::{ self, @@ -163,12 +163,28 @@ pub async fn trigger_refund_to_gateway( // ********************************************** REFUND SYNC ********************************************** +pub async fn refund_response_wrapper<'a, F, Fut, T>( + state: &'a AppState, + merchant_account: storage::MerchantAccount, + refund_id: String, + f: F, +) -> RouterResponse +where + F: Fn(&'a AppState, storage::MerchantAccount, String) -> Fut, + Fut: futures::Future>, + refunds::RefundResponse: From, +{ + Ok(services::BachResponse::Json( + f(state, merchant_account, refund_id).await?.into(), + )) +} + #[instrument(skip_all)] pub async fn refund_retrieve_core( state: &AppState, merchant_account: storage::MerchantAccount, refund_id: String, -) -> RouterResponse { +) -> RouterResult { let db = &*state.store; let (merchant_id, payment_intent, payment_attempt, refund, response); @@ -212,7 +228,7 @@ pub async fn refund_retrieve_core( ) .await?; - Ok(services::BachResponse::Json(response.into())) + Ok(response) } #[instrument(skip_all)] @@ -286,7 +302,7 @@ pub async fn sync_refund_with_gateway( // ********************************************** REFUND UPDATE ********************************************** pub async fn refund_update_core( - db: &dyn StorageInterface, + db: &dyn db::StorageInterface, merchant_account: storage::MerchantAccount, refund_id: &str, req: refunds::RefundRequest, @@ -576,9 +592,9 @@ pub async fn schedule_refund_execution( #[instrument(skip_all)] pub async fn sync_refund_with_gateway_workflow( - db: &dyn StorageInterface, + state: &AppState, refund_tracker: &storage::ProcessTracker, -) -> RouterResult<()> { +) -> Result<(), errors::ProcessTrackerError> { let refund_core = serde_json::from_value::(refund_tracker.tracking_data.clone()) .into_report() @@ -590,7 +606,8 @@ pub async fn sync_refund_with_gateway_workflow( ) })?; - let merchant_account = db + let merchant_account = state + .store .find_merchant_account_by_merchant_id(&refund_core.merchant_id) .await .map_err(|error| { @@ -598,7 +615,8 @@ pub async fn sync_refund_with_gateway_workflow( })?; // FIXME we actually don't use this? - let _refund = db + let _refund = state + .store .find_refund_by_internal_reference_id_merchant_id( &refund_core.refund_internal_reference_id, &refund_core.merchant_id, @@ -607,22 +625,53 @@ pub async fn sync_refund_with_gateway_workflow( .await .map_err(|error| error.to_not_found_response(errors::ApiErrorResponse::RefundNotFound))?; + let merchant_account = state + .store + .find_merchant_account_by_merchant_id(&refund_core.merchant_id) + .await?; + + let response = refund_retrieve_core( + state, + merchant_account, + refund_core.refund_internal_reference_id, + ) + .await?; + let terminal_status = vec![ + enums::RefundStatus::Success, + enums::RefundStatus::Failure, + enums::RefundStatus::TransactionFailure, + ]; + match response.refund_status { + status if terminal_status.contains(&status) => { + let id = refund_tracker.id.clone(); + refund_tracker + .clone() + .finish_with_status(&*state.store, format!("COMPLETED_BY_PT_{}", id)) + .await? + } + _ => { + payment_sync::retry_sync_task( + &*state.store, + response.connector, + response.merchant_id, + refund_tracker.to_owned(), + ) + .await? + } + } + Ok(()) - // sync_refund_with_gateway(data, &refund).await.map(|_| ()) } #[instrument(skip_all)] pub async fn start_refund_workflow( state: &AppState, refund_tracker: &storage::ProcessTracker, -) -> RouterResult<()> { +) -> Result<(), errors::ProcessTrackerError> { match refund_tracker.name.as_deref() { Some("EXECUTE_REFUND") => trigger_refund_execute_workflow(state, refund_tracker).await, - Some("SYNC_REFUND") => { - sync_refund_with_gateway_workflow(&*state.store, refund_tracker).await - } - _ => Err(report!(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Job name cannot be identified")), + Some("SYNC_REFUND") => sync_refund_with_gateway_workflow(state, refund_tracker).await, + _ => Err(errors::ProcessTrackerError::JobNotFound), } } @@ -630,7 +679,7 @@ pub async fn start_refund_workflow( pub async fn trigger_refund_execute_workflow( state: &AppState, refund_tracker: &storage::ProcessTracker, -) -> RouterResult<()> { +) -> Result<(), errors::ProcessTrackerError> { let db = &*state.store; let refund_core = serde_json::from_value::(refund_tracker.tracking_data.clone()) @@ -703,11 +752,16 @@ pub async fn trigger_refund_execute_workflow( add_refund_sync_task(db, &updated_refund, "REFUND_WORKFLOW_ROUTER").await?; } (true, enums::RefundStatus::Pending) => { - //create sync task + // create sync task add_refund_sync_task(db, &refund, "REFUND_WORKFLOW_ROUTER").await?; } (_, _) => { //mark task as finished + let id = refund_tracker.id.clone(); + refund_tracker + .clone() + .finish_with_status(db, format!("COMPLETED_BY_PT_{}", id)) + .await?; } }; Ok(()) @@ -727,7 +781,7 @@ pub fn refund_to_refund_core_workflow_model( #[instrument(skip_all)] pub async fn add_refund_sync_task( - db: &dyn StorageInterface, + db: &dyn db::StorageInterface, refund: &storage::Refund, runner: &str, ) -> RouterResult { @@ -762,7 +816,7 @@ pub async fn add_refund_sync_task( #[instrument(skip_all)] pub async fn add_refund_execute_task( - db: &dyn StorageInterface, + db: &dyn db::StorageInterface, refund: &storage::Refund, runner: &str, ) -> RouterResult { @@ -794,3 +848,49 @@ pub async fn add_refund_execute_task( .change_context(errors::ApiErrorResponse::InternalServerError)?; Ok(response) } + +pub async fn get_refund_sync_process_schedule_time( + db: &dyn db::StorageInterface, + connector: &str, + merchant_id: &str, + retry_count: i32, +) -> Result, errors::ProcessTrackerError> { + let redis_mapping: errors::CustomResult = + db::get_and_deserialize_key( + db, + &format!("pt_mapping_refund_sync_{}", connector), + "ConnectorPTMapping", + ) + .await; + + let mapping = match redis_mapping { + Ok(x) => x, + Err(err) => { + logger::error!("Error: while getting connector mapping: {}", err); + process_data::ConnectorPTMapping::default() + } + }; + + let time_delta = + process_tracker_utils::get_schedule_time(mapping, merchant_id, retry_count + 1); + + Ok(process_tracker_utils::get_time_from_delta(time_delta)) +} + +pub async fn retry_refund_sync_task( + db: &dyn db::StorageInterface, + connector: String, + merchant_id: String, + pt: storage::ProcessTracker, +) -> Result<(), errors::ProcessTrackerError> { + let schedule_time = + get_refund_sync_process_schedule_time(db, &connector, &merchant_id, pt.retry_count).await?; + + match schedule_time { + Some(s_time) => pt.retry(db, s_time).await, + None => { + pt.finish_with_status(db, "RETRIES_EXCEEDED".to_string()) + .await + } + } +} diff --git a/crates/router/src/routes/refunds.rs b/crates/router/src/routes/refunds.rs index cc19b35242..bbc26431c0 100644 --- a/crates/router/src/routes/refunds.rs +++ b/crates/router/src/routes/refunds.rs @@ -42,7 +42,7 @@ pub async fn refunds_retrieve( &req, refund_id, |state, merchant_account, refund_id| { - refund_retrieve_core(state, merchant_account, refund_id) + refund_response_wrapper(state, merchant_account, refund_id, refund_retrieve_core) }, api::MerchantAuthentication::ApiKey, ) diff --git a/crates/router/src/scheduler/utils.rs b/crates/router/src/scheduler/utils.rs index 8b70116335..2d94752e43 100644 --- a/crates/router/src/scheduler/utils.rs +++ b/crates/router/src/scheduler/utils.rs @@ -8,7 +8,7 @@ use redis_interface::{RedisConnectionPool, RedisEntryId}; use router_env::opentelemetry; use uuid::Uuid; -use super::{consumer, metrics, workflows}; +use super::{consumer, metrics, process_data, workflows}; use crate::{ configs::settings::SchedulerSettings, core::errors::{self, CustomResult}, @@ -266,3 +266,40 @@ pub fn add_histogram_metrics( ); }; } + +pub fn get_schedule_time( + mapping: process_data::ConnectorPTMapping, + merchant_name: &str, + retry_count: i32, +) -> Option { + let mapping = match mapping.custom_merchant_mapping.get(merchant_name) { + Some(map) => map.clone(), + None => mapping.default_mapping, + }; + + if retry_count == 0 { + Some(mapping.start_after) + } else { + get_delay( + retry_count, + mapping.count.iter().zip(mapping.frequency.iter()), + ) + } +} + +fn get_delay<'a>( + retry_count: i32, + mut array: impl Iterator, +) -> Option { + match array.next() { + Some(ele) => { + let v = retry_count - ele.0; + if v <= 0 { + Some(*ele.1) + } else { + get_delay(v, array) + } + } + None => None, + } +} diff --git a/crates/router/src/scheduler/workflows/payment_sync.rs b/crates/router/src/scheduler/workflows/payment_sync.rs index 5306d7b426..6566e40bb0 100644 --- a/crates/router/src/scheduler/workflows/payment_sync.rs +++ b/crates/router/src/scheduler/workflows/payment_sync.rs @@ -1,10 +1,12 @@ +use router_env::logger; + use super::{PaymentsSyncWorkflow, ProcessTrackerWorkflow}; use crate::{ core::payments::{self as payment_flows, operations}, db::{get_and_deserialize_key, StorageInterface}, errors, routes::AppState, - scheduler::{consumer, process_data}, + scheduler::{consumer, process_data, utils}, types::{ api, storage::{self, enums}, @@ -102,48 +104,14 @@ pub async fn get_sync_process_schedule_time( .await; let mapping = match redis_mapping { Ok(x) => x, - Err(_) => process_data::ConnectorPTMapping::default(), - }; - let time_delta = get_sync_schedule_time(mapping, merchant_id, retry_count + 1); - - Ok(crate::scheduler::utils::get_time_from_delta(time_delta)) -} - -pub fn get_sync_schedule_time( - mapping: process_data::ConnectorPTMapping, - merchant_name: &str, - retry_count: i32, -) -> Option { - let mapping = match mapping.custom_merchant_mapping.get(merchant_name) { - Some(map) => map.clone(), - None => mapping.default_mapping, - }; - - if retry_count == 0 { - Some(mapping.start_after) - } else { - get_delay( - retry_count, - mapping.count.iter().zip(mapping.frequency.iter()), - ) - } -} - -fn get_delay<'a>( - retry_count: i32, - mut array: impl Iterator, -) -> Option { - match array.next() { - Some(ele) => { - let v = retry_count - ele.0; - if v <= 0 { - Some(*ele.1) - } else { - get_delay(v, array) - } + Err(err) => { + logger::error!("Redis Mapping Error: {}", err); + process_data::ConnectorPTMapping::default() } - None => None, - } + }; + let time_delta = utils::get_schedule_time(mapping, merchant_id, retry_count + 1); + + Ok(utils::get_time_from_delta(time_delta)) } pub async fn retry_sync_task( @@ -172,9 +140,9 @@ mod tests { #[test] fn test_get_default_schedule_time() { let schedule_time_delta = - get_sync_schedule_time(process_data::ConnectorPTMapping::default(), "-", 0).unwrap(); + utils::get_schedule_time(process_data::ConnectorPTMapping::default(), "-", 0).unwrap(); let first_retry_time_delta = - get_sync_schedule_time(process_data::ConnectorPTMapping::default(), "-", 1).unwrap(); + utils::get_schedule_time(process_data::ConnectorPTMapping::default(), "-", 1).unwrap(); let cpt_default = process_data::ConnectorPTMapping::default().default_mapping; assert_eq!( vec![schedule_time_delta, first_retry_time_delta],