mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-29 00:49:42 +08:00
feat(router): add /retrieve api for relay (#6918)
This commit is contained in:
@ -84,9 +84,20 @@ pub struct RelayRetrieveRequest {
|
||||
#[serde(default)]
|
||||
pub force_sync: bool,
|
||||
/// The unique identifier for the Relay
|
||||
pub id: String,
|
||||
pub id: common_utils::id_type::RelayId,
|
||||
}
|
||||
|
||||
#[derive(Debug, ToSchema, Clone, Deserialize, Serialize)]
|
||||
pub struct RelayRetrieveBody {
|
||||
/// The unique identifier for the Relay
|
||||
#[serde(default)]
|
||||
pub force_sync: bool,
|
||||
}
|
||||
|
||||
impl common_utils::events::ApiEventMetric for RelayRequest {}
|
||||
|
||||
impl common_utils::events::ApiEventMetric for RelayResponse {}
|
||||
|
||||
impl common_utils::events::ApiEventMetric for RelayRetrieveRequest {}
|
||||
|
||||
impl common_utils::events::ApiEventMetric for RelayRetrieveBody {}
|
||||
|
||||
@ -35,4 +35,15 @@ impl Relay {
|
||||
result => result,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn find_by_id(
|
||||
conn: &PgPooledConn,
|
||||
id: &common_utils::id_type::RelayId,
|
||||
) -> StorageResult<Self> {
|
||||
generics::generic_find_one::<<Self as HasTable>::Table, _, _>(
|
||||
conn,
|
||||
dsl::id.eq(id.to_owned()),
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@ -67,7 +67,7 @@ pub struct RelayNew {
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, AsChangeset, router_derive::DebugAsDisplay)]
|
||||
#[table_name = "relay"]
|
||||
#[diesel(table_name = relay)]
|
||||
pub struct RelayUpdateInternal {
|
||||
pub connector_reference_id: Option<String>,
|
||||
pub status: Option<storage_enums::RelayStatus>,
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
use api_models::relay as relay_models;
|
||||
use common_utils::{self, ext_traits::OptionExt, id_type};
|
||||
use common_enums::RelayStatus;
|
||||
use common_utils::{self, id_type};
|
||||
use error_stack::ResultExt;
|
||||
|
||||
use super::errors::{self, ConnectorErrorExt, RouterResponse, RouterResult, StorageErrorExt};
|
||||
@ -11,6 +12,7 @@ use crate::{
|
||||
api::{self},
|
||||
domain,
|
||||
},
|
||||
utils::OptionExt,
|
||||
};
|
||||
|
||||
pub mod utils;
|
||||
@ -27,11 +29,7 @@ pub async fn relay(
|
||||
let merchant_id = merchant_account.get_id();
|
||||
let connector_id = &req.connector_id;
|
||||
|
||||
let profile_id_from_auth_layer = profile_id_optional
|
||||
.get_required_value("ProfileId")
|
||||
.change_context(errors::ApiErrorResponse::MissingRequiredField {
|
||||
field_name: "profile id",
|
||||
})?;
|
||||
let profile_id_from_auth_layer = profile_id_optional.get_required_value("ProfileId")?;
|
||||
|
||||
let profile = db
|
||||
.find_business_profile_by_merchant_id_profile_id(
|
||||
@ -175,3 +173,158 @@ pub fn validate_relay_refund_data(
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn relay_retrieve(
|
||||
state: SessionState,
|
||||
merchant_account: domain::MerchantAccount,
|
||||
profile_id_optional: Option<id_type::ProfileId>,
|
||||
key_store: domain::MerchantKeyStore,
|
||||
req: relay_models::RelayRetrieveRequest,
|
||||
) -> RouterResponse<relay_models::RelayResponse> {
|
||||
let db = state.store.as_ref();
|
||||
let key_manager_state = &(&state).into();
|
||||
let merchant_id = merchant_account.get_id();
|
||||
let relay_id = &req.id;
|
||||
|
||||
let profile_id_from_auth_layer = profile_id_optional.get_required_value("ProfileId")?;
|
||||
|
||||
db.find_business_profile_by_merchant_id_profile_id(
|
||||
key_manager_state,
|
||||
&key_store,
|
||||
merchant_id,
|
||||
&profile_id_from_auth_layer,
|
||||
)
|
||||
.await
|
||||
.change_context(errors::ApiErrorResponse::ProfileNotFound {
|
||||
id: profile_id_from_auth_layer.get_string_repr().to_owned(),
|
||||
})?;
|
||||
|
||||
let relay_record_result = db
|
||||
.find_relay_by_id(key_manager_state, &key_store, relay_id)
|
||||
.await;
|
||||
|
||||
let relay_record = match relay_record_result {
|
||||
Err(error) => {
|
||||
if error.current_context().is_db_not_found() {
|
||||
Err(error).change_context(errors::ApiErrorResponse::GenericNotFoundError {
|
||||
message: "relay not found".to_string(),
|
||||
})?
|
||||
} else {
|
||||
Err(error)
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable("error while fetch relay record")?
|
||||
}
|
||||
}
|
||||
Ok(relay) => relay,
|
||||
};
|
||||
|
||||
#[cfg(feature = "v1")]
|
||||
let connector_account = db
|
||||
.find_by_merchant_connector_account_merchant_id_merchant_connector_id(
|
||||
key_manager_state,
|
||||
merchant_id,
|
||||
&relay_record.connector_id,
|
||||
&key_store,
|
||||
)
|
||||
.await
|
||||
.to_not_found_response(errors::ApiErrorResponse::MerchantConnectorAccountNotFound {
|
||||
id: relay_record.connector_id.get_string_repr().to_string(),
|
||||
})?;
|
||||
|
||||
#[cfg(feature = "v2")]
|
||||
let connector_account = db
|
||||
.find_merchant_connector_account_by_id(
|
||||
key_manager_state,
|
||||
&relay_record.connector_id,
|
||||
&key_store,
|
||||
)
|
||||
.await
|
||||
.to_not_found_response(errors::ApiErrorResponse::MerchantConnectorAccountNotFound {
|
||||
id: relay_record.connector_id.get_string_repr().to_string(),
|
||||
})?;
|
||||
|
||||
let relay_response = match relay_record.relay_type {
|
||||
common_enums::RelayType::Refund => {
|
||||
if should_call_connector_for_relay_refund_status(&relay_record, req.force_sync) {
|
||||
let relay_response = sync_relay_refund_with_gateway(
|
||||
&state,
|
||||
&merchant_account,
|
||||
&relay_record,
|
||||
connector_account,
|
||||
)
|
||||
.await?;
|
||||
|
||||
db.update_relay(key_manager_state, &key_store, relay_record, relay_response)
|
||||
.await
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable("Failed to update the relay record")?
|
||||
} else {
|
||||
relay_record
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let response = relay_models::RelayResponse::from(relay_response);
|
||||
|
||||
Ok(hyperswitch_domain_models::api::ApplicationResponse::Json(
|
||||
response,
|
||||
))
|
||||
}
|
||||
|
||||
fn should_call_connector_for_relay_refund_status(
|
||||
relay: &hyperswitch_domain_models::relay::Relay,
|
||||
force_sync: bool,
|
||||
) -> bool {
|
||||
// This allows refund sync at connector level if force_sync is enabled, or
|
||||
// check if the refund is in terminal state
|
||||
!matches!(relay.status, RelayStatus::Failure | RelayStatus::Success) && force_sync
|
||||
}
|
||||
|
||||
pub async fn sync_relay_refund_with_gateway(
|
||||
state: &SessionState,
|
||||
merchant_account: &domain::MerchantAccount,
|
||||
relay_record: &hyperswitch_domain_models::relay::Relay,
|
||||
connector_account: domain::MerchantConnectorAccount,
|
||||
) -> RouterResult<hyperswitch_domain_models::relay::RelayUpdate> {
|
||||
let connector_id = &relay_record.connector_id;
|
||||
let merchant_id = merchant_account.get_id();
|
||||
|
||||
let connector_data: api::ConnectorData = api::ConnectorData::get_connector_by_name(
|
||||
&state.conf.connectors,
|
||||
&connector_account.connector_name,
|
||||
api::GetToken::Connector,
|
||||
Some(connector_id.clone()),
|
||||
)
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable("Failed to get the connector")?;
|
||||
|
||||
let router_data = utils::construct_relay_refund_router_data(
|
||||
state,
|
||||
&connector_account.connector_name,
|
||||
merchant_id,
|
||||
&connector_account,
|
||||
relay_record,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let connector_integration: services::BoxedRefundConnectorIntegrationInterface<
|
||||
api::RSync,
|
||||
hyperswitch_domain_models::router_request_types::RefundsData,
|
||||
hyperswitch_domain_models::router_response_types::RefundsResponseData,
|
||||
> = connector_data.connector.get_connector_integration();
|
||||
|
||||
let router_data_res = services::execute_connector_processing_step(
|
||||
state,
|
||||
connector_integration,
|
||||
&router_data,
|
||||
payments::CallConnectorAction::Trigger,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.to_refund_failed_response()?;
|
||||
|
||||
let relay_response =
|
||||
hyperswitch_domain_models::relay::RelayUpdate::from(router_data_res.response);
|
||||
|
||||
Ok(relay_response)
|
||||
}
|
||||
|
||||
@ -28,6 +28,13 @@ pub trait RelayInterface {
|
||||
current_state: hyperswitch_domain_models::relay::Relay,
|
||||
relay_update: hyperswitch_domain_models::relay::RelayUpdate,
|
||||
) -> CustomResult<hyperswitch_domain_models::relay::Relay, errors::StorageError>;
|
||||
|
||||
async fn find_relay_by_id(
|
||||
&self,
|
||||
key_manager_state: &KeyManagerState,
|
||||
merchant_key_store: &domain::MerchantKeyStore,
|
||||
relay_id: &common_utils::id_type::RelayId,
|
||||
) -> CustomResult<hyperswitch_domain_models::relay::Relay, errors::StorageError>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@ -79,6 +86,25 @@ impl RelayInterface for Store {
|
||||
.await
|
||||
.change_context(errors::StorageError::DecryptionError)
|
||||
}
|
||||
|
||||
async fn find_relay_by_id(
|
||||
&self,
|
||||
key_manager_state: &KeyManagerState,
|
||||
merchant_key_store: &domain::MerchantKeyStore,
|
||||
relay_id: &common_utils::id_type::RelayId,
|
||||
) -> CustomResult<hyperswitch_domain_models::relay::Relay, errors::StorageError> {
|
||||
let conn = connection::pg_connection_read(self).await?;
|
||||
diesel_models::relay::Relay::find_by_id(&conn, relay_id)
|
||||
.await
|
||||
.map_err(|error| report!(errors::StorageError::from(error)))?
|
||||
.convert(
|
||||
key_manager_state,
|
||||
merchant_key_store.key.get_inner(),
|
||||
merchant_key_store.merchant_id.clone().into(),
|
||||
)
|
||||
.await
|
||||
.change_context(errors::StorageError::DecryptionError)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@ -101,6 +127,15 @@ impl RelayInterface for MockDb {
|
||||
) -> CustomResult<hyperswitch_domain_models::relay::Relay, errors::StorageError> {
|
||||
Err(errors::StorageError::MockDbError)?
|
||||
}
|
||||
|
||||
async fn find_relay_by_id(
|
||||
&self,
|
||||
_key_manager_state: &KeyManagerState,
|
||||
_merchant_key_store: &domain::MerchantKeyStore,
|
||||
_relay_id: &common_utils::id_type::RelayId,
|
||||
) -> CustomResult<hyperswitch_domain_models::relay::Relay, errors::StorageError> {
|
||||
Err(errors::StorageError::MockDbError)?
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@ -132,4 +167,15 @@ impl RelayInterface for KafkaStore {
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn find_relay_by_id(
|
||||
&self,
|
||||
key_manager_state: &KeyManagerState,
|
||||
merchant_key_store: &domain::MerchantKeyStore,
|
||||
relay_id: &common_utils::id_type::RelayId,
|
||||
) -> CustomResult<hyperswitch_domain_models::relay::Relay, errors::StorageError> {
|
||||
self.diesel_store
|
||||
.find_relay_by_id(key_manager_state, merchant_key_store, relay_id)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@ -598,6 +598,7 @@ impl Relay {
|
||||
web::scope("/relay")
|
||||
.app_data(web::Data::new(state))
|
||||
.service(web::resource("").route(web::post().to(relay::relay)))
|
||||
.service(web::resource("/{relay_id}").route(web::get().to(relay::relay_retrieve)))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -166,7 +166,7 @@ impl From<Flow> for ApiIdentifier {
|
||||
| Flow::RefundsFilters
|
||||
| Flow::RefundsAggregate
|
||||
| Flow::RefundsManualUpdate => Self::Refunds,
|
||||
Flow::Relay => Self::Relay,
|
||||
Flow::Relay | Flow::RelayRetrieve => Self::Relay,
|
||||
|
||||
Flow::FrmFulfillment
|
||||
| Flow::IncomingWebhookReceive
|
||||
|
||||
@ -38,3 +38,39 @@ pub async fn relay(
|
||||
))
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(flow = ?Flow::RelayRetrieve))]
|
||||
#[cfg(feature = "oltp")]
|
||||
pub async fn relay_retrieve(
|
||||
state: web::Data<app::AppState>,
|
||||
path: web::Path<common_utils::id_type::RelayId>,
|
||||
req: actix_web::HttpRequest,
|
||||
query_params: web::Query<api_models::relay::RelayRetrieveBody>,
|
||||
) -> impl Responder {
|
||||
let flow = Flow::RelayRetrieve;
|
||||
let relay_retrieve_request = api_models::relay::RelayRetrieveRequest {
|
||||
force_sync: query_params.force_sync,
|
||||
id: path.into_inner(),
|
||||
};
|
||||
Box::pin(api::server_wrap(
|
||||
flow,
|
||||
state,
|
||||
&req,
|
||||
relay_retrieve_request,
|
||||
|state, auth: auth::AuthenticationData, req, _| {
|
||||
relay::relay_retrieve(
|
||||
state,
|
||||
auth.merchant_account,
|
||||
#[cfg(feature = "v1")]
|
||||
auth.profile_id,
|
||||
#[cfg(feature = "v2")]
|
||||
Some(auth.profile.get_id().clone()),
|
||||
auth.key_store,
|
||||
req,
|
||||
)
|
||||
},
|
||||
&auth::HeaderAuth(auth::ApiKeyAuth),
|
||||
api_locking::LockAction::NotApplicable,
|
||||
))
|
||||
.await
|
||||
}
|
||||
|
||||
@ -533,6 +533,8 @@ pub enum Flow {
|
||||
VolumeSplitOnRoutingType,
|
||||
/// Relay flow
|
||||
Relay,
|
||||
/// Relay retrieve flow
|
||||
RelayRetrieve,
|
||||
}
|
||||
|
||||
/// Trait for providing generic behaviour to flow metric
|
||||
|
||||
Reference in New Issue
Block a user