feat(core): Add support for process tracker retrieve api in v2 (#7602)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Amisha Prabhat
2025-03-27 16:52:03 +05:30
committed by GitHub
parent 56412bf188
commit 87140bfccc
33 changed files with 430 additions and 111 deletions

View File

@ -321,9 +321,9 @@ impl ProcessTrackerWorkflows<routes::SessionState> for WorkflowRunner {
storage::ProcessTrackerRunner::PaymentMethodStatusUpdateWorkflow => Ok(Box::new(
workflows::payment_method_status_update::PaymentMethodStatusUpdateWorkflow,
)),
storage::ProcessTrackerRunner::PassiveRecoveryWorkflow => Ok(Box::new(
workflows::passive_churn_recovery_workflow::ExecutePcrWorkflow,
)),
storage::ProcessTrackerRunner::PassiveRecoveryWorkflow => {
Ok(Box::new(workflows::revenue_recovery::ExecutePcrWorkflow))
}
}
};

View File

@ -57,6 +57,6 @@ pub mod webhooks;
pub mod unified_authentication_service;
#[cfg(feature = "v2")]
pub mod passive_churn_recovery;
pub mod relay;
#[cfg(feature = "v2")]
pub mod revenue_recovery;

View File

@ -1,33 +1,40 @@
pub mod transformers;
pub mod types;
use api_models::payments::{PaymentRevenueRecoveryMetadata, PaymentsRetrieveRequest};
use common_utils::{self, ext_traits::OptionExt, id_type, types::keymanager::KeyManagerState};
use api_models::{payments::PaymentsRetrieveRequest, process_tracker::revenue_recovery};
use common_utils::{
self,
ext_traits::{OptionExt, ValueExt},
id_type,
types::keymanager::KeyManagerState,
};
use diesel_models::process_tracker::business_status;
use error_stack::{self, ResultExt};
use hyperswitch_domain_models::{
behaviour::ReverseConversion,
errors::api_error_response,
api::ApplicationResponse,
payments::{PaymentIntent, PaymentStatusData},
ApiModelToDieselModelConvertor,
};
use scheduler::errors;
use scheduler::errors as sch_errors;
use crate::{
core::{
errors::RouterResult,
passive_churn_recovery::types as pcr_types,
errors::{self, RouterResponse, RouterResult, StorageErrorExt},
payments::{self, operations::Operation},
revenue_recovery::types as pcr_types,
},
db::StorageInterface,
logger,
routes::{metrics, SessionState},
types::{
api,
storage::{self, passive_churn_recovery as pcr},
storage::{self, revenue_recovery as pcr},
transformers::ForeignInto,
},
};
pub const EXECUTE_WORKFLOW: &str = "EXECUTE_WORKFLOW";
pub const PSYNC_WORKFLOW: &str = "PSYNC_WORKFLOW";
pub async fn perform_execute_payment(
state: &SessionState,
execute_task_process: &storage::ProcessTracker,
@ -35,7 +42,7 @@ pub async fn perform_execute_payment(
pcr_data: &pcr::PcrPaymentData,
_key_manager_state: &KeyManagerState,
payment_intent: &PaymentIntent,
) -> Result<(), errors::ProcessTrackerError> {
) -> Result<(), sch_errors::ProcessTrackerError> {
let db = &*state.store;
let mut pcr_metadata = payment_intent
@ -79,9 +86,9 @@ pub async fn perform_execute_payment(
pcr_types::Decision::Psync(attempt_status, attempt_id) => {
// find if a psync task is already present
let task = "PSYNC_WORKFLOW";
let task = PSYNC_WORKFLOW;
let runner = storage::ProcessTrackerRunner::PassiveRecoveryWorkflow;
let process_tracker_id = format!("{runner}_{task}_{}", attempt_id.get_string_repr());
let process_tracker_id = attempt_id.get_psync_revenue_recovery_id(task, runner);
let psync_process = db.find_process_by_id(&process_tracker_id).await?;
match psync_process {
@ -138,8 +145,8 @@ async fn insert_psync_pcr_task(
payment_attempt_id: id_type::GlobalAttemptId,
runner: storage::ProcessTrackerRunner,
) -> RouterResult<storage::ProcessTracker> {
let task = "PSYNC_WORKFLOW";
let process_tracker_id = format!("{runner}_{task}_{}", payment_attempt_id.get_string_repr());
let task = PSYNC_WORKFLOW;
let process_tracker_id = payment_attempt_id.get_psync_revenue_recovery_id(task, runner);
let schedule_time = common_utils::date_time::now();
let psync_workflow_tracking_data = pcr::PcrWorkflowTrackingData {
global_payment_id: payment_id,
@ -158,13 +165,13 @@ async fn insert_psync_pcr_task(
schedule_time,
hyperswitch_domain_models::consts::API_VERSION,
)
.change_context(api_error_response::ApiErrorResponse::InternalServerError)
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to construct delete tokenized data process tracker task")?;
let response = db
.insert_process(process_tracker_entry)
.await
.change_context(api_error_response::ApiErrorResponse::InternalServerError)
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to construct delete tokenized data process tracker task")?;
metrics::TASKS_ADDED_COUNT.add(1, router_env::metric_attributes!(("flow", "PsyncPcr")));
@ -219,3 +226,57 @@ pub async fn call_psync_api(
.await?;
Ok(payment_data)
}
pub async fn retrieve_revenue_recovery_process_tracker(
state: SessionState,
id: id_type::GlobalPaymentId,
) -> RouterResponse<revenue_recovery::RevenueRecoveryResponse> {
let db = &*state.store;
let task = EXECUTE_WORKFLOW;
let runner = storage::ProcessTrackerRunner::PassiveRecoveryWorkflow;
let process_tracker_id = id.get_execute_revenue_recovery_id(task, runner);
let process_tracker = db
.find_process_by_id(&process_tracker_id)
.await
.to_not_found_response(errors::ApiErrorResponse::ResourceIdNotFound)
.attach_printable("error retrieving the process tracker id")?
.get_required_value("Process Tracker")
.change_context(errors::ApiErrorResponse::GenericNotFoundError {
message: "Entry For the following id doesn't exists".to_owned(),
})?;
let tracking_data = process_tracker
.tracking_data
.clone()
.parse_value::<pcr::PcrWorkflowTrackingData>("PCRWorkflowTrackingData")
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("unable to deserialize Pcr Workflow Tracking Data")?;
let psync_task = PSYNC_WORKFLOW;
let process_tracker_id_for_psync = tracking_data
.payment_attempt_id
.get_psync_revenue_recovery_id(psync_task, runner);
let process_tracker_for_psync = db
.find_process_by_id(&process_tracker_id_for_psync)
.await
.map_err(|e| {
logger::error!("Error while retreiving psync task : {:?}", e);
})
.ok()
.flatten();
let schedule_time_for_psync = process_tracker_for_psync.and_then(|pt| pt.schedule_time);
let response = revenue_recovery::RevenueRecoveryResponse {
id: process_tracker.id,
name: process_tracker.name,
schedule_time_for_payment: process_tracker.schedule_time,
schedule_time_for_psync,
status: process_tracker.status,
business_status: process_tracker.business_status,
};
Ok(ApplicationResponse::Json(response))
}

View File

@ -1,8 +1,6 @@
use common_enums::AttemptStatus;
use crate::{
core::passive_churn_recovery::types::PcrAttemptStatus, types::transformers::ForeignFrom,
};
use crate::{core::revenue_recovery::types::PcrAttemptStatus, types::transformers::ForeignFrom};
impl ForeignFrom<AttemptStatus> for PcrAttemptStatus {
fn foreign_from(s: AttemptStatus) -> Self {

View File

@ -19,14 +19,14 @@ use time::PrimitiveDateTime;
use crate::{
core::{
errors::{self, RouterResult},
passive_churn_recovery::{self as core_pcr},
payments::{self, operations::Operation},
revenue_recovery::{self as core_pcr},
},
db::StorageInterface,
logger,
routes::SessionState,
types::{api::payments as api_types, storage, transformers::ForeignInto},
workflows::passive_churn_recovery_workflow::get_schedule_time_to_retry_mit_payments,
workflows::revenue_recovery::get_schedule_time_to_retry_mit_payments,
};
type RecoveryResult<T> = error_stack::Result<T, errors::RecoveryError>;
@ -87,7 +87,7 @@ impl Decision {
intent_status: enums::IntentStatus,
called_connector: enums::PaymentConnectorTransmission,
active_attempt_id: Option<id_type::GlobalAttemptId>,
pcr_data: &storage::passive_churn_recovery::PcrPaymentData,
pcr_data: &storage::revenue_recovery::PcrPaymentData,
payment_id: &id_type::GlobalPaymentId,
) -> RecoveryResult<Self> {
Ok(match (intent_status, called_connector, active_attempt_id) {
@ -132,7 +132,7 @@ impl Action {
merchant_id: &id_type::MerchantId,
payment_intent: &PaymentIntent,
process: &storage::ProcessTracker,
pcr_data: &storage::passive_churn_recovery::PcrPaymentData,
pcr_data: &storage::revenue_recovery::PcrPaymentData,
revenue_recovery_metadata: &PaymentRevenueRecoveryMetadata,
) -> RecoveryResult<Self> {
let db = &*state.store;
@ -171,7 +171,7 @@ impl Action {
state: &SessionState,
payment_intent: &PaymentIntent,
execute_task_process: &storage::ProcessTracker,
pcr_data: &storage::passive_churn_recovery::PcrPaymentData,
pcr_data: &storage::revenue_recovery::PcrPaymentData,
revenue_recovery_metadata: &mut PaymentRevenueRecoveryMetadata,
) -> Result<(), errors::ProcessTrackerError> {
let db = &*state.store;
@ -290,7 +290,7 @@ impl Action {
async fn call_proxy_api(
state: &SessionState,
payment_intent: &PaymentIntent,
pcr_data: &storage::passive_churn_recovery::PcrPaymentData,
pcr_data: &storage::revenue_recovery::PcrPaymentData,
revenue_recovery: &PaymentRevenueRecoveryMetadata,
) -> RouterResult<PaymentConfirmData<api_types::Authorize>> {
let operation = payments::operations::proxy_payments_intent::PaymentProxyIntent;
@ -349,7 +349,7 @@ async fn call_proxy_api(
pub async fn update_payment_intent_api(
state: &SessionState,
global_payment_id: id_type::GlobalPaymentId,
pcr_data: &storage::passive_churn_recovery::PcrPaymentData,
pcr_data: &storage::revenue_recovery::PcrPaymentData,
update_req: PaymentsUpdateIntentRequest,
) -> RouterResult<PaymentIntentData<api_types::PaymentUpdateIntent>> {
// TODO : Use api handler instead of calling payments_intent_operation_core

View File

@ -15,8 +15,8 @@ use crate::{
db::StorageInterface,
routes::{app::ReqState, metrics, SessionState},
services::{self, connector_integration_interface},
types::{api, domain, storage::passive_churn_recovery as storage_churn_recovery},
workflows::passive_churn_recovery_workflow,
types::{api, domain, storage::revenue_recovery as storage_churn_recovery},
workflows::revenue_recovery as revenue_recovery_flow,
};
#[allow(clippy::too_many_arguments)]
@ -466,22 +466,21 @@ impl RevenueRecoveryAttempt {
.and_then(|feature_metadata| feature_metadata.get_retry_count())
.unwrap_or(0);
let schedule_time =
passive_churn_recovery_workflow::get_schedule_time_to_retry_mit_payments(
db,
&merchant_id,
(total_retry_count + 1).into(),
)
.await
.map_or_else(
|| {
Err(
report!(errors::RevenueRecoveryError::ScheduleTimeFetchFailed)
.attach_printable("Failed to get schedule time for pcr workflow"),
)
},
Ok, // Simply returns `time` wrapped in `Ok`
)?;
let schedule_time = revenue_recovery_flow::get_schedule_time_to_retry_mit_payments(
db,
&merchant_id,
(total_retry_count + 1).into(),
)
.await
.map_or_else(
|| {
Err(
report!(errors::RevenueRecoveryError::ScheduleTimeFetchFailed)
.attach_printable("Failed to get schedule time for pcr workflow"),
)
},
Ok, // Simply returns `time` wrapped in `Ok`
)?;
let payment_attempt_id = payment_attempt_id
.ok_or(report!(

View File

@ -202,6 +202,11 @@ pub fn mk_app(
.service(routes::WebhookEvents::server(state.clone()))
.service(routes::FeatureMatrix::server(state.clone()));
}
#[cfg(feature = "v2")]
{
server_app = server_app.service(routes::ProcessTracker::server(state.clone()));
}
}
#[cfg(all(feature = "payouts", feature = "v1"))]

View File

@ -65,6 +65,9 @@ pub mod recovery_webhooks;
pub mod relay;
#[cfg(feature = "olap")]
pub mod process_tracker;
#[cfg(feature = "dummy_connector")]
pub use self::app::DummyConnector;
#[cfg(feature = "v2")]
@ -75,7 +78,8 @@ pub use self::app::{
ApiKeys, AppState, ApplePayCertificatesMigration, Cache, Cards, Configs, ConnectorOnboarding,
Customers, Disputes, EphemeralKey, FeatureMatrix, Files, Forex, Gsm, Health, Hypersense,
Mandates, MerchantAccount, MerchantConnectorAccount, PaymentLink, PaymentMethods, Payments,
Poll, Profile, ProfileNew, Refunds, Relay, RelayWebhooks, SessionState, User, Webhooks,
Poll, ProcessTracker, Profile, ProfileNew, Refunds, Relay, RelayWebhooks, SessionState, User,
Webhooks,
};
#[cfg(feature = "olap")]
pub use self::app::{Blocklist, Organization, Routing, Verify, WebhookEvents};

View File

@ -2496,3 +2496,19 @@ impl FeatureMatrix {
.service(web::resource("").route(web::get().to(feature_matrix::fetch_feature_matrix)))
}
}
#[cfg(feature = "olap")]
pub struct ProcessTracker;
#[cfg(all(feature = "olap", feature = "v2"))]
impl ProcessTracker {
pub fn server(state: AppState) -> Scope {
use super::process_tracker::revenue_recovery;
web::scope("/v2/process_tracker/revenue_recovery_workflow")
.app_data(web::Data::new(state.clone()))
.service(
web::resource("/{revenue_recovery_id}")
.route(web::get().to(revenue_recovery::revenue_recovery_pt_retrieve_api)),
)
}
}

View File

@ -42,6 +42,7 @@ pub enum ApiIdentifier {
CardNetworkTokenization,
Hypersense,
PaymentMethodSession,
ProcessTracker,
}
impl From<Flow> for ApiIdentifier {
@ -333,6 +334,8 @@ impl From<Flow> for ApiIdentifier {
| Flow::PaymentMethodSessionUpdateSavedPaymentMethod
| Flow::PaymentMethodSessionDeleteSavedPaymentMethod
| Flow::PaymentMethodSessionUpdate => Self::PaymentMethodSession,
Flow::RevenueRecoveryRetrieve => Self::ProcessTracker,
}
}
}

View File

@ -0,0 +1,2 @@
#[cfg(feature = "v2")]
pub mod revenue_recovery;

View File

@ -0,0 +1,39 @@
use actix_web::{web, HttpRequest, HttpResponse};
use api_models::process_tracker::revenue_recovery as revenue_recovery_api;
use router_env::Flow;
use crate::{
core::{api_locking, revenue_recovery},
routes::AppState,
services::{api, authentication as auth, authorization::permissions::Permission},
};
pub async fn revenue_recovery_pt_retrieve_api(
state: web::Data<AppState>,
req: HttpRequest,
path: web::Path<common_utils::id_type::GlobalPaymentId>,
) -> HttpResponse {
let flow = Flow::RevenueRecoveryRetrieve;
let id = path.into_inner();
let payload = revenue_recovery_api::RevenueRecoveryId {
revenue_recovery_id: id,
};
Box::pin(api::server_wrap(
flow,
state,
&req,
payload,
|state, _: (), id, _| {
revenue_recovery::retrieve_revenue_recovery_process_tracker(
state,
id.revenue_recovery_id,
)
},
&auth::JWTAuth {
permission: Permission::ProfileRevenueRecoveryRead,
},
api_locking::LockAction::NotApplicable,
))
.await
}

View File

@ -167,11 +167,12 @@ pub static OPERATIONS: [Resource; 8] = [
pub static CONNECTORS: [Resource; 2] = [Resource::Connector, Resource::Account];
pub static WORKFLOWS: [Resource; 4] = [
pub static WORKFLOWS: [Resource; 5] = [
Resource::Routing,
Resource::ThreeDsDecisionManager,
Resource::SurchargeDecisionManager,
Resource::Account,
Resource::RevenueRecovery,
];
pub static ANALYTICS: [Resource; 3] = [Resource::Analytics, Resource::Report, Resource::Account];

View File

@ -95,6 +95,10 @@ generate_permissions! {
scopes: [Read, Write],
entities: [Merchant]
},
RevenueRecovery: {
scopes: [Read],
entities: [Profile]
}
]
}
@ -109,6 +113,7 @@ pub fn get_resource_name(resource: Resource, entity_type: EntityType) -> &'stati
(Resource::ApiKey, _) => "Api Keys",
(Resource::Connector, _) => "Payment Processors, Payout Processors, Fraud & Risk Managers",
(Resource::Routing, _) => "Routing",
(Resource::RevenueRecovery, _) => "Revenue Recovery",
(Resource::ThreeDsDecisionManager, _) => "3DS Decision Manager",
(Resource::SurchargeDecisionManager, _) => "Surcharge Decision Manager",
(Resource::Analytics, _) => "Analytics",

View File

@ -28,14 +28,14 @@ pub mod mandate;
pub mod merchant_account;
pub mod merchant_connector_account;
pub mod merchant_key_store;
#[cfg(feature = "v2")]
pub mod passive_churn_recovery;
pub mod payment_attempt;
pub mod payment_link;
pub mod payment_method;
pub mod payout_attempt;
pub mod payouts;
pub mod refund;
#[cfg(feature = "v2")]
pub mod revenue_recovery;
pub mod reverse_lookup;
pub mod role;
pub mod routing_algorithm;

View File

@ -10,4 +10,4 @@ pub mod refund_router;
pub mod tokenized_data;
pub mod passive_churn_recovery_workflow;
pub mod revenue_recovery;

View File

@ -15,14 +15,14 @@ use scheduler::{types::process_data, utils as scheduler_utils};
#[cfg(feature = "v2")]
use crate::{
core::{
passive_churn_recovery::{self as pcr},
payments,
revenue_recovery::{self as pcr},
},
db::StorageInterface,
errors::StorageError,
types::{
api::{self as api_types},
storage::passive_churn_recovery as pcr_storage_types,
storage::revenue_recovery as pcr_storage_types,
},
};
use crate::{routes::SessionState, types::storage};