mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-28 04:04:55 +08:00
feat(ucs): add event logging for UCS operations (#9058)
Co-authored-by: Kanika Chaudhary <kanika.c@juspay.in> Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
@ -20,7 +20,7 @@ use crate::{
|
|||||||
unified_connector_service::{
|
unified_connector_service::{
|
||||||
build_unified_connector_service_auth_metadata,
|
build_unified_connector_service_auth_metadata,
|
||||||
handle_unified_connector_service_response_for_payment_authorize,
|
handle_unified_connector_service_response_for_payment_authorize,
|
||||||
handle_unified_connector_service_response_for_payment_repeat,
|
handle_unified_connector_service_response_for_payment_repeat, ucs_logging_wrapper,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
logger,
|
logger,
|
||||||
@ -523,20 +523,20 @@ impl Feature<api::Authorize, types::PaymentsAuthorizeData> for types::PaymentsAu
|
|||||||
merchant_context: &domain::MerchantContext,
|
merchant_context: &domain::MerchantContext,
|
||||||
) -> RouterResult<()> {
|
) -> RouterResult<()> {
|
||||||
if self.request.mandate_id.is_some() {
|
if self.request.mandate_id.is_some() {
|
||||||
call_unified_connector_service_repeat_payment(
|
Box::pin(call_unified_connector_service_repeat_payment(
|
||||||
self,
|
self,
|
||||||
state,
|
state,
|
||||||
merchant_connector_account,
|
merchant_connector_account,
|
||||||
merchant_context,
|
merchant_context,
|
||||||
)
|
))
|
||||||
.await
|
.await
|
||||||
} else {
|
} else {
|
||||||
call_unified_connector_service_authorize(
|
Box::pin(call_unified_connector_service_authorize(
|
||||||
self,
|
self,
|
||||||
state,
|
state,
|
||||||
merchant_connector_account,
|
merchant_connector_account,
|
||||||
merchant_context,
|
merchant_context,
|
||||||
)
|
))
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -854,33 +854,46 @@ async fn call_unified_connector_service_authorize(
|
|||||||
.change_context(ApiErrorResponse::InternalServerError)
|
.change_context(ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Failed to construct request metadata")?;
|
.attach_printable("Failed to construct request metadata")?;
|
||||||
|
|
||||||
let response = client
|
let updated_router_data = Box::pin(ucs_logging_wrapper(
|
||||||
.payment_authorize(
|
router_data.clone(),
|
||||||
payment_authorize_request,
|
state,
|
||||||
connector_auth_metadata,
|
payment_authorize_request,
|
||||||
None,
|
|mut router_data, payment_authorize_request| async move {
|
||||||
state.get_grpc_headers(),
|
let response = client
|
||||||
)
|
.payment_authorize(
|
||||||
.await
|
payment_authorize_request,
|
||||||
.change_context(ApiErrorResponse::InternalServerError)
|
connector_auth_metadata,
|
||||||
.attach_printable("Failed to authorize payment")?;
|
None,
|
||||||
|
state.get_grpc_headers(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.change_context(ApiErrorResponse::InternalServerError)
|
||||||
|
.attach_printable("Failed to authorize payment")?;
|
||||||
|
|
||||||
let payment_authorize_response = response.into_inner();
|
let payment_authorize_response = response.into_inner();
|
||||||
|
|
||||||
let (status, router_data_response, status_code) =
|
let (status, router_data_response, status_code) =
|
||||||
handle_unified_connector_service_response_for_payment_authorize(
|
handle_unified_connector_service_response_for_payment_authorize(
|
||||||
payment_authorize_response.clone(),
|
payment_authorize_response.clone(),
|
||||||
)
|
)
|
||||||
.change_context(ApiErrorResponse::InternalServerError)
|
.change_context(ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Failed to deserialize UCS response")?;
|
.attach_printable("Failed to deserialize UCS response")?;
|
||||||
|
|
||||||
router_data.status = status;
|
router_data.status = status;
|
||||||
router_data.response = router_data_response;
|
router_data.response = router_data_response;
|
||||||
router_data.raw_connector_response = payment_authorize_response
|
router_data.raw_connector_response = payment_authorize_response
|
||||||
.raw_connector_response
|
.raw_connector_response
|
||||||
.map(Secret::new);
|
.clone()
|
||||||
router_data.connector_http_status_code = Some(status_code);
|
.map(Secret::new);
|
||||||
|
router_data.connector_http_status_code = Some(status_code);
|
||||||
|
|
||||||
|
Ok((router_data, payment_authorize_response))
|
||||||
|
},
|
||||||
|
))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Copy back the updated data
|
||||||
|
*router_data = updated_router_data;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -903,40 +916,53 @@ async fn call_unified_connector_service_repeat_payment(
|
|||||||
.attach_printable("Failed to fetch Unified Connector Service client")?;
|
.attach_printable("Failed to fetch Unified Connector Service client")?;
|
||||||
|
|
||||||
let payment_repeat_request =
|
let payment_repeat_request =
|
||||||
payments_grpc::PaymentServiceRepeatEverythingRequest::foreign_try_from(router_data)
|
payments_grpc::PaymentServiceRepeatEverythingRequest::foreign_try_from(&*router_data)
|
||||||
.change_context(ApiErrorResponse::InternalServerError)
|
.change_context(ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Failed to construct Payment Authorize Request")?;
|
.attach_printable("Failed to construct Payment Repeat Request")?;
|
||||||
|
|
||||||
let connector_auth_metadata =
|
let connector_auth_metadata =
|
||||||
build_unified_connector_service_auth_metadata(merchant_connector_account, merchant_context)
|
build_unified_connector_service_auth_metadata(merchant_connector_account, merchant_context)
|
||||||
.change_context(ApiErrorResponse::InternalServerError)
|
.change_context(ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Failed to construct request metadata")?;
|
.attach_printable("Failed to construct request metadata")?;
|
||||||
|
|
||||||
let response = client
|
let updated_router_data = Box::pin(ucs_logging_wrapper(
|
||||||
.payment_repeat(
|
router_data.clone(),
|
||||||
payment_repeat_request,
|
state,
|
||||||
connector_auth_metadata,
|
payment_repeat_request,
|
||||||
state.get_grpc_headers(),
|
|mut router_data, payment_repeat_request| async move {
|
||||||
)
|
let response = client
|
||||||
.await
|
.payment_repeat(
|
||||||
.change_context(ApiErrorResponse::InternalServerError)
|
payment_repeat_request,
|
||||||
.attach_printable("Failed to authorize payment")?;
|
connector_auth_metadata.clone(),
|
||||||
|
state.get_grpc_headers(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.change_context(ApiErrorResponse::InternalServerError)
|
||||||
|
.attach_printable("Failed to repeat payment")?;
|
||||||
|
|
||||||
let payment_repeat_response = response.into_inner();
|
let payment_repeat_response = response.into_inner();
|
||||||
|
|
||||||
let (status, router_data_response, status_code) =
|
let (status, router_data_response, status_code) =
|
||||||
handle_unified_connector_service_response_for_payment_repeat(
|
handle_unified_connector_service_response_for_payment_repeat(
|
||||||
payment_repeat_response.clone(),
|
payment_repeat_response.clone(),
|
||||||
)
|
)
|
||||||
.change_context(ApiErrorResponse::InternalServerError)
|
.change_context(ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Failed to deserialize UCS response")?;
|
.attach_printable("Failed to deserialize UCS response")?;
|
||||||
|
|
||||||
router_data.status = status;
|
router_data.status = status;
|
||||||
router_data.response = router_data_response;
|
router_data.response = router_data_response;
|
||||||
router_data.raw_connector_response = payment_repeat_response
|
router_data.raw_connector_response = payment_repeat_response
|
||||||
.raw_connector_response
|
.raw_connector_response
|
||||||
.map(Secret::new);
|
.clone()
|
||||||
router_data.connector_http_status_code = Some(status_code);
|
.map(Secret::new);
|
||||||
|
router_data.connector_http_status_code = Some(status_code);
|
||||||
|
|
||||||
|
Ok((router_data, payment_repeat_response))
|
||||||
|
},
|
||||||
|
))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Copy back the updated data
|
||||||
|
*router_data = updated_router_data;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,7 +15,7 @@ use crate::{
|
|||||||
payments::{
|
payments::{
|
||||||
self, access_token, customers, helpers, tokenization, transformers, PaymentData,
|
self, access_token, customers, helpers, tokenization, transformers, PaymentData,
|
||||||
},
|
},
|
||||||
unified_connector_service,
|
unified_connector_service::{self, ucs_logging_wrapper},
|
||||||
},
|
},
|
||||||
logger,
|
logger,
|
||||||
routes::{metrics, SessionState},
|
routes::{metrics, SessionState},
|
||||||
@ -394,33 +394,45 @@ impl Feature<api::ExternalVaultProxy, types::ExternalVaultProxyPaymentsData>
|
|||||||
.change_context(ApiErrorResponse::InternalServerError)
|
.change_context(ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Failed to construct external vault proxy metadata")?;
|
.attach_printable("Failed to construct external vault proxy metadata")?;
|
||||||
|
|
||||||
let response = client
|
let updated_router_data = Box::pin(ucs_logging_wrapper(
|
||||||
.payment_authorize(
|
self.clone(),
|
||||||
payment_authorize_request,
|
state,
|
||||||
connector_auth_metadata,
|
payment_authorize_request.clone(),
|
||||||
Some(external_vault_proxy_metadata),
|
|mut router_data, payment_authorize_request| async move {
|
||||||
state.get_grpc_headers(),
|
let response = client
|
||||||
)
|
.payment_authorize(
|
||||||
.await
|
payment_authorize_request,
|
||||||
.change_context(ApiErrorResponse::InternalServerError)
|
connector_auth_metadata,
|
||||||
.attach_printable("Failed to authorize payment")?;
|
Some(external_vault_proxy_metadata),
|
||||||
|
state.get_grpc_headers(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.change_context(ApiErrorResponse::InternalServerError)
|
||||||
|
.attach_printable("Failed to authorize payment")?;
|
||||||
|
|
||||||
let payment_authorize_response = response.into_inner();
|
let payment_authorize_response = response.into_inner();
|
||||||
|
|
||||||
let (status, router_data_response, status_code) =
|
let (status, router_data_response, status_code) =
|
||||||
unified_connector_service::handle_unified_connector_service_response_for_payment_authorize(
|
unified_connector_service::handle_unified_connector_service_response_for_payment_authorize(
|
||||||
payment_authorize_response.clone(),
|
payment_authorize_response.clone(),
|
||||||
)
|
)
|
||||||
.change_context(ApiErrorResponse::InternalServerError)
|
.change_context(ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Failed to deserialize UCS response")?;
|
.attach_printable("Failed to deserialize UCS response")?;
|
||||||
|
|
||||||
self.status = status;
|
router_data.status = status;
|
||||||
self.response = router_data_response;
|
router_data.response = router_data_response;
|
||||||
self.raw_connector_response = payment_authorize_response
|
router_data.raw_connector_response = payment_authorize_response
|
||||||
.raw_connector_response
|
.raw_connector_response
|
||||||
.map(masking::Secret::new);
|
.clone()
|
||||||
self.connector_http_status_code = Some(status_code);
|
.map(masking::Secret::new);
|
||||||
|
router_data.connector_http_status_code = Some(status_code);
|
||||||
|
|
||||||
|
Ok((router_data, payment_authorize_response))
|
||||||
|
}
|
||||||
|
)).await?;
|
||||||
|
|
||||||
|
// Copy back the updated data
|
||||||
|
*self = updated_router_data;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -13,7 +13,7 @@ use crate::{
|
|||||||
payments::{self, access_token, helpers, transformers, PaymentData},
|
payments::{self, access_token, helpers, transformers, PaymentData},
|
||||||
unified_connector_service::{
|
unified_connector_service::{
|
||||||
build_unified_connector_service_auth_metadata,
|
build_unified_connector_service_auth_metadata,
|
||||||
handle_unified_connector_service_response_for_payment_get,
|
handle_unified_connector_service_response_for_payment_get, ucs_logging_wrapper,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
routes::SessionState,
|
routes::SessionState,
|
||||||
@ -233,7 +233,7 @@ impl Feature<api::PSync, types::PaymentsSyncData>
|
|||||||
.ok_or(ApiErrorResponse::InternalServerError)
|
.ok_or(ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Failed to fetch Unified Connector Service client")?;
|
.attach_printable("Failed to fetch Unified Connector Service client")?;
|
||||||
|
|
||||||
let payment_get_request = payments_grpc::PaymentServiceGetRequest::foreign_try_from(self)
|
let payment_get_request = payments_grpc::PaymentServiceGetRequest::foreign_try_from(&*self)
|
||||||
.change_context(ApiErrorResponse::InternalServerError)
|
.change_context(ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Failed to construct Payment Get Request")?;
|
.attach_printable("Failed to construct Payment Get Request")?;
|
||||||
|
|
||||||
@ -244,28 +244,45 @@ impl Feature<api::PSync, types::PaymentsSyncData>
|
|||||||
.change_context(ApiErrorResponse::InternalServerError)
|
.change_context(ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Failed to construct request metadata")?;
|
.attach_printable("Failed to construct request metadata")?;
|
||||||
|
|
||||||
let response = client
|
let updated_router_data = Box::pin(ucs_logging_wrapper(
|
||||||
.payment_get(
|
self.clone(),
|
||||||
payment_get_request,
|
state,
|
||||||
connector_auth_metadata,
|
payment_get_request,
|
||||||
state.get_grpc_headers(),
|
|mut router_data, payment_get_request| async move {
|
||||||
)
|
let response = client
|
||||||
.await
|
.payment_get(
|
||||||
.change_context(ApiErrorResponse::InternalServerError)
|
payment_get_request,
|
||||||
.attach_printable("Failed to get payment")?;
|
connector_auth_metadata,
|
||||||
|
state.get_grpc_headers(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.change_context(ApiErrorResponse::InternalServerError)
|
||||||
|
.attach_printable("Failed to get payment")?;
|
||||||
|
|
||||||
let payment_get_response = response.into_inner();
|
let payment_get_response = response.into_inner();
|
||||||
|
|
||||||
let (status, router_data_response, status_code) =
|
let (status, router_data_response, status_code) =
|
||||||
handle_unified_connector_service_response_for_payment_get(payment_get_response.clone())
|
handle_unified_connector_service_response_for_payment_get(
|
||||||
.change_context(ApiErrorResponse::InternalServerError)
|
payment_get_response.clone(),
|
||||||
.attach_printable("Failed to deserialize UCS response")?;
|
)
|
||||||
|
.change_context(ApiErrorResponse::InternalServerError)
|
||||||
|
.attach_printable("Failed to deserialize UCS response")?;
|
||||||
|
|
||||||
self.status = status;
|
router_data.status = status;
|
||||||
self.response = router_data_response;
|
router_data.response = router_data_response;
|
||||||
self.raw_connector_response = payment_get_response.raw_connector_response.map(Secret::new);
|
router_data.raw_connector_response = payment_get_response
|
||||||
self.connector_http_status_code = Some(status_code);
|
.raw_connector_response
|
||||||
|
.clone()
|
||||||
|
.map(Secret::new);
|
||||||
|
router_data.connector_http_status_code = Some(status_code);
|
||||||
|
|
||||||
|
Ok((router_data, payment_get_response))
|
||||||
|
},
|
||||||
|
))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Copy back the updated data
|
||||||
|
*self = updated_router_data;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -14,7 +14,7 @@ use crate::{
|
|||||||
},
|
},
|
||||||
unified_connector_service::{
|
unified_connector_service::{
|
||||||
build_unified_connector_service_auth_metadata,
|
build_unified_connector_service_auth_metadata,
|
||||||
handle_unified_connector_service_response_for_payment_register,
|
handle_unified_connector_service_response_for_payment_register, ucs_logging_wrapper,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
routes::SessionState,
|
routes::SessionState,
|
||||||
@ -272,7 +272,7 @@ impl Feature<api::SetupMandate, types::SetupMandateRequestData> for types::Setup
|
|||||||
.attach_printable("Failed to fetch Unified Connector Service client")?;
|
.attach_printable("Failed to fetch Unified Connector Service client")?;
|
||||||
|
|
||||||
let payment_register_request =
|
let payment_register_request =
|
||||||
payments_grpc::PaymentServiceRegisterRequest::foreign_try_from(self)
|
payments_grpc::PaymentServiceRegisterRequest::foreign_try_from(&*self)
|
||||||
.change_context(ApiErrorResponse::InternalServerError)
|
.change_context(ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Failed to construct Payment Setup Mandate Request")?;
|
.attach_printable("Failed to construct Payment Setup Mandate Request")?;
|
||||||
|
|
||||||
@ -283,33 +283,41 @@ impl Feature<api::SetupMandate, types::SetupMandateRequestData> for types::Setup
|
|||||||
.change_context(ApiErrorResponse::InternalServerError)
|
.change_context(ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Failed to construct request metadata")?;
|
.attach_printable("Failed to construct request metadata")?;
|
||||||
|
|
||||||
let response = client
|
let updated_router_data = Box::pin(ucs_logging_wrapper(
|
||||||
.payment_setup_mandate(
|
self.clone(),
|
||||||
payment_register_request,
|
state,
|
||||||
connector_auth_metadata,
|
payment_register_request,
|
||||||
state.get_grpc_headers(),
|
|mut router_data, payment_register_request| async move {
|
||||||
)
|
let response = client
|
||||||
.await
|
.payment_setup_mandate(
|
||||||
.change_context(ApiErrorResponse::InternalServerError)
|
payment_register_request,
|
||||||
.attach_printable("Failed to Setup Mandate payment")?;
|
connector_auth_metadata,
|
||||||
|
state.get_grpc_headers(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.change_context(ApiErrorResponse::InternalServerError)
|
||||||
|
.attach_printable("Failed to Setup Mandate payment")?;
|
||||||
|
|
||||||
let payment_register_response = response.into_inner();
|
let payment_register_response = response.into_inner();
|
||||||
|
|
||||||
let (status, router_data_response, status_code) =
|
let (status, router_data_response, status_code) =
|
||||||
handle_unified_connector_service_response_for_payment_register(
|
handle_unified_connector_service_response_for_payment_register(
|
||||||
payment_register_response.clone(),
|
payment_register_response.clone(),
|
||||||
)
|
)
|
||||||
.change_context(ApiErrorResponse::InternalServerError)
|
.change_context(ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Failed to deserialize UCS response")?;
|
.attach_printable("Failed to deserialize UCS response")?;
|
||||||
|
|
||||||
self.status = status;
|
router_data.status = status;
|
||||||
self.response = router_data_response;
|
router_data.response = router_data_response;
|
||||||
self.connector_http_status_code = Some(status_code);
|
router_data.connector_http_status_code = Some(status_code);
|
||||||
// UCS does not return raw connector response for setup mandate right now
|
|
||||||
// self.raw_connector_response = payment_register_response
|
|
||||||
// .raw_connector_response
|
|
||||||
// .map(Secret::new);
|
|
||||||
|
|
||||||
|
Ok((router_data, payment_register_response))
|
||||||
|
},
|
||||||
|
))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Copy back the updated data
|
||||||
|
*self = updated_router_data;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
use std::str::FromStr;
|
use std::{str::FromStr, time::Instant};
|
||||||
|
|
||||||
use api_models::admin;
|
use api_models::admin;
|
||||||
#[cfg(feature = "v2")]
|
#[cfg(feature = "v2")]
|
||||||
@ -23,7 +23,7 @@ use hyperswitch_domain_models::{
|
|||||||
router_response_types::PaymentsResponseData,
|
router_response_types::PaymentsResponseData,
|
||||||
};
|
};
|
||||||
use masking::{ExposeInterface, PeekInterface, Secret};
|
use masking::{ExposeInterface, PeekInterface, Secret};
|
||||||
use router_env::logger;
|
use router_env::{instrument, logger, tracing};
|
||||||
use unified_connector_service_cards::CardNumber;
|
use unified_connector_service_cards::CardNumber;
|
||||||
use unified_connector_service_client::payments::{
|
use unified_connector_service_client::payments::{
|
||||||
self as payments_grpc, payment_method::PaymentMethod, CardDetails, CardPaymentMethodType,
|
self as payments_grpc, payment_method::PaymentMethod, CardDetails, CardPaymentMethodType,
|
||||||
@ -44,6 +44,7 @@ use crate::{
|
|||||||
},
|
},
|
||||||
utils::get_flow_name,
|
utils::get_flow_name,
|
||||||
},
|
},
|
||||||
|
events::connector_api_logs::ConnectorEvent,
|
||||||
routes::SessionState,
|
routes::SessionState,
|
||||||
types::transformers::ForeignTryFrom,
|
types::transformers::ForeignTryFrom,
|
||||||
utils,
|
utils,
|
||||||
@ -804,3 +805,120 @@ pub fn extract_webhook_content_from_ucs_response(
|
|||||||
) -> Option<&unified_connector_service_client::payments::WebhookResponseContent> {
|
) -> Option<&unified_connector_service_client::payments::WebhookResponseContent> {
|
||||||
transform_data.webhook_content.as_ref()
|
transform_data.webhook_content.as_ref()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// UCS Event Logging Wrapper Function
|
||||||
|
/// This function wraps UCS calls with comprehensive event logging.
|
||||||
|
/// It logs the actual gRPC request/response data, timing, and error information.
|
||||||
|
#[instrument(skip_all, fields(connector_name, flow_type, payment_id))]
|
||||||
|
pub async fn ucs_logging_wrapper<T, F, Fut, Req, Resp, GrpcReq, GrpcResp>(
|
||||||
|
router_data: RouterData<T, Req, Resp>,
|
||||||
|
state: &SessionState,
|
||||||
|
grpc_request: GrpcReq,
|
||||||
|
handler: F,
|
||||||
|
) -> RouterResult<RouterData<T, Req, Resp>>
|
||||||
|
where
|
||||||
|
T: std::fmt::Debug + Clone + Send + 'static,
|
||||||
|
Req: std::fmt::Debug + Clone + Send + Sync + 'static,
|
||||||
|
Resp: std::fmt::Debug + Clone + Send + Sync + 'static,
|
||||||
|
GrpcReq: serde::Serialize,
|
||||||
|
GrpcResp: serde::Serialize,
|
||||||
|
F: FnOnce(RouterData<T, Req, Resp>, GrpcReq) -> Fut + Send,
|
||||||
|
Fut: std::future::Future<Output = RouterResult<(RouterData<T, Req, Resp>, GrpcResp)>> + Send,
|
||||||
|
{
|
||||||
|
tracing::Span::current().record("connector_name", &router_data.connector);
|
||||||
|
tracing::Span::current().record("flow_type", std::any::type_name::<T>());
|
||||||
|
tracing::Span::current().record("payment_id", &router_data.payment_id);
|
||||||
|
|
||||||
|
// Capture request data for logging
|
||||||
|
let connector_name = router_data.connector.clone();
|
||||||
|
let payment_id = router_data.payment_id.clone();
|
||||||
|
let merchant_id = router_data.merchant_id.clone();
|
||||||
|
let refund_id = router_data.refund_id.clone();
|
||||||
|
let dispute_id = router_data.dispute_id.clone();
|
||||||
|
|
||||||
|
// Log the actual gRPC request with masking
|
||||||
|
let grpc_request_body = masking::masked_serialize(&grpc_request)
|
||||||
|
.unwrap_or_else(|_| serde_json::json!({"error": "failed_to_serialize_grpc_request"}));
|
||||||
|
|
||||||
|
// Update connector call count metrics for UCS operations
|
||||||
|
crate::routes::metrics::CONNECTOR_CALL_COUNT.add(
|
||||||
|
1,
|
||||||
|
router_env::metric_attributes!(
|
||||||
|
("connector", connector_name.clone()),
|
||||||
|
(
|
||||||
|
"flow",
|
||||||
|
std::any::type_name::<T>()
|
||||||
|
.split("::")
|
||||||
|
.last()
|
||||||
|
.unwrap_or_default()
|
||||||
|
),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Execute UCS function and measure timing
|
||||||
|
let start_time = Instant::now();
|
||||||
|
let result = handler(router_data, grpc_request).await;
|
||||||
|
let external_latency = start_time.elapsed().as_millis();
|
||||||
|
|
||||||
|
// Create and emit connector event after UCS call
|
||||||
|
let (status_code, response_body, router_result) = match result {
|
||||||
|
Ok((updated_router_data, grpc_response)) => {
|
||||||
|
let status = updated_router_data
|
||||||
|
.connector_http_status_code
|
||||||
|
.unwrap_or(200);
|
||||||
|
|
||||||
|
// Log the actual gRPC response
|
||||||
|
let grpc_response_body = serde_json::to_value(&grpc_response).unwrap_or_else(
|
||||||
|
|_| serde_json::json!({"error": "failed_to_serialize_grpc_response"}),
|
||||||
|
);
|
||||||
|
|
||||||
|
(status, Some(grpc_response_body), Ok(updated_router_data))
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
// Update error metrics for UCS calls
|
||||||
|
crate::routes::metrics::CONNECTOR_ERROR_RESPONSE_COUNT.add(
|
||||||
|
1,
|
||||||
|
router_env::metric_attributes!(("connector", connector_name.clone(),)),
|
||||||
|
);
|
||||||
|
|
||||||
|
let error_body = serde_json::json!({
|
||||||
|
"error": error.to_string(),
|
||||||
|
"error_type": "ucs_call_failed"
|
||||||
|
});
|
||||||
|
(500, Some(error_body), Err(error))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut connector_event = ConnectorEvent::new(
|
||||||
|
state.tenant.tenant_id.clone(),
|
||||||
|
connector_name,
|
||||||
|
std::any::type_name::<T>(),
|
||||||
|
grpc_request_body,
|
||||||
|
"grpc://unified-connector-service".to_string(),
|
||||||
|
common_utils::request::Method::Post,
|
||||||
|
payment_id,
|
||||||
|
merchant_id,
|
||||||
|
state.request_id.as_ref(),
|
||||||
|
external_latency,
|
||||||
|
refund_id,
|
||||||
|
dispute_id,
|
||||||
|
status_code,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Set response body based on status code
|
||||||
|
if let Some(body) = response_body {
|
||||||
|
match status_code {
|
||||||
|
400..=599 => {
|
||||||
|
connector_event.set_error_response_body(&body);
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
connector_event.set_response_body(&body);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Emit event
|
||||||
|
state.event_handler.log_event(&connector_event);
|
||||||
|
|
||||||
|
router_result
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user