mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-30 01:27:31 +08:00
feat(events): add incoming webhook payload to api events logger (#2852)
Co-authored-by: Sampras lopes <lsampras@pm.me>
This commit is contained in:
@ -1,16 +1,17 @@
|
||||
pub mod types;
|
||||
pub mod utils;
|
||||
|
||||
use std::str::FromStr;
|
||||
use std::{str::FromStr, time::Instant};
|
||||
|
||||
use actix_web::FromRequest;
|
||||
use api_models::{
|
||||
payments::HeaderPayload,
|
||||
webhooks::{self, WebhookResponseTracker},
|
||||
};
|
||||
use common_utils::errors::ReportSwitchExt;
|
||||
use common_utils::{errors::ReportSwitchExt, events::ApiEventsType};
|
||||
use error_stack::{report, IntoReport, ResultExt};
|
||||
use masking::ExposeInterface;
|
||||
use router_env::{instrument, tracing};
|
||||
use router_env::{instrument, tracing, tracing_actix_web::RequestId};
|
||||
|
||||
use super::{errors::StorageErrorExt, metrics};
|
||||
#[cfg(feature = "stripe")]
|
||||
@ -24,9 +25,10 @@ use crate::{
|
||||
payments, refunds,
|
||||
},
|
||||
db::StorageInterface,
|
||||
events::api_logs::ApiEvent,
|
||||
logger,
|
||||
routes::{lock_utils, metrics::request::add_attributes, AppState},
|
||||
services,
|
||||
routes::{app::AppStateInfo, lock_utils, metrics::request::add_attributes, AppState},
|
||||
services::{self, authentication as auth},
|
||||
types::{
|
||||
self as router_types,
|
||||
api::{self, mandates::MandateResponseExt},
|
||||
@ -860,6 +862,7 @@ pub async fn trigger_webhook_to_merchant<W: types::OutgoingWebhookType>(
|
||||
}
|
||||
|
||||
pub async fn webhooks_wrapper<W: types::OutgoingWebhookType, Ctx: PaymentMethodRetrieve>(
|
||||
flow: &impl router_env::types::FlowMetric,
|
||||
state: AppState,
|
||||
req: &actix_web::HttpRequest,
|
||||
merchant_account: domain::MerchantAccount,
|
||||
@ -867,21 +870,64 @@ pub async fn webhooks_wrapper<W: types::OutgoingWebhookType, Ctx: PaymentMethodR
|
||||
connector_name_or_mca_id: &str,
|
||||
body: actix_web::web::Bytes,
|
||||
) -> RouterResponse<serde_json::Value> {
|
||||
let (application_response, _webhooks_response_tracker) = Box::pin(webhooks_core::<W, Ctx>(
|
||||
state,
|
||||
req,
|
||||
merchant_account,
|
||||
key_store,
|
||||
connector_name_or_mca_id,
|
||||
body,
|
||||
))
|
||||
.await?;
|
||||
let start_instant = Instant::now();
|
||||
let (application_response, webhooks_response_tracker, serialized_req) =
|
||||
Box::pin(webhooks_core::<W, Ctx>(
|
||||
state.clone(),
|
||||
req,
|
||||
merchant_account.clone(),
|
||||
key_store,
|
||||
connector_name_or_mca_id,
|
||||
body.clone(),
|
||||
))
|
||||
.await?;
|
||||
|
||||
let request_duration = Instant::now()
|
||||
.saturating_duration_since(start_instant)
|
||||
.as_millis();
|
||||
|
||||
let request_id = RequestId::extract(req)
|
||||
.await
|
||||
.into_report()
|
||||
.attach_printable("Unable to extract request id from request")
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)?;
|
||||
let auth_type = auth::AuthenticationType::WebhookAuth {
|
||||
merchant_id: merchant_account.merchant_id.clone(),
|
||||
};
|
||||
let status_code = 200;
|
||||
let api_event = ApiEventsType::Webhooks {
|
||||
connector: connector_name_or_mca_id.to_string(),
|
||||
payment_id: webhooks_response_tracker.get_payment_id(),
|
||||
};
|
||||
let response_value = serde_json::to_value(&webhooks_response_tracker)
|
||||
.into_report()
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable("Could not convert webhook effect to string")?;
|
||||
|
||||
let api_event = ApiEvent::new(
|
||||
flow,
|
||||
&request_id,
|
||||
request_duration,
|
||||
status_code,
|
||||
serialized_req,
|
||||
Some(response_value),
|
||||
None,
|
||||
auth_type,
|
||||
api_event,
|
||||
req,
|
||||
);
|
||||
match api_event.clone().try_into() {
|
||||
Ok(event) => {
|
||||
state.event_handler().log_event(event);
|
||||
}
|
||||
Err(err) => {
|
||||
logger::error!(error=?err, event=?api_event, "Error Logging API Event");
|
||||
}
|
||||
}
|
||||
Ok(application_response)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
|
||||
pub async fn webhooks_core<W: types::OutgoingWebhookType, Ctx: PaymentMethodRetrieve>(
|
||||
state: AppState,
|
||||
req: &actix_web::HttpRequest,
|
||||
@ -892,6 +938,7 @@ pub async fn webhooks_core<W: types::OutgoingWebhookType, Ctx: PaymentMethodRetr
|
||||
) -> errors::RouterResult<(
|
||||
services::ApplicationResponse<serde_json::Value>,
|
||||
WebhookResponseTracker,
|
||||
serde_json::Value,
|
||||
)> {
|
||||
metrics::WEBHOOK_INCOMING_COUNT.add(
|
||||
&metrics::CONTEXT,
|
||||
@ -973,7 +1020,11 @@ pub async fn webhooks_core<W: types::OutgoingWebhookType, Ctx: PaymentMethodRetr
|
||||
.switch()
|
||||
.attach_printable("Failed while early return in case of event type parsing")?;
|
||||
|
||||
return Ok((response, WebhookResponseTracker::NoEffect));
|
||||
return Ok((
|
||||
response,
|
||||
WebhookResponseTracker::NoEffect,
|
||||
serde_json::Value::Null,
|
||||
));
|
||||
}
|
||||
};
|
||||
logger::info!(event_type=?event_type);
|
||||
@ -996,6 +1047,7 @@ pub async fn webhooks_core<W: types::OutgoingWebhookType, Ctx: PaymentMethodRetr
|
||||
logger::info!(process_webhook=?process_webhook_further);
|
||||
|
||||
let flow_type: api::WebhookFlow = event_type.to_owned().into();
|
||||
let mut event_object: Box<dyn masking::ErasedMaskSerialize> = Box::new(serde_json::Value::Null);
|
||||
let webhook_effect = if process_webhook_further
|
||||
&& !matches!(flow_type, api::WebhookFlow::ReturnResponse)
|
||||
{
|
||||
@ -1072,14 +1124,21 @@ pub async fn webhooks_core<W: types::OutgoingWebhookType, Ctx: PaymentMethodRetr
|
||||
|
||||
logger::info!(source_verified=?source_verified);
|
||||
|
||||
let event_object = connector
|
||||
event_object = connector
|
||||
.get_webhook_resource_object(&request_details)
|
||||
.switch()
|
||||
.attach_printable("Could not find resource object in incoming webhook body")?;
|
||||
|
||||
let webhook_details = api::IncomingWebhookDetails {
|
||||
object_reference_id: object_ref_id,
|
||||
resource_object: Encode::<serde_json::Value>::encode_to_vec(&event_object)
|
||||
resource_object: event_object
|
||||
.raw_serialize()
|
||||
.and_then(|ref val| serde_json::to_vec(val))
|
||||
.into_report()
|
||||
.change_context(errors::ParsingError::EncodeError("byte-vec"))
|
||||
.attach_printable_lazy(|| {
|
||||
"Unable to convert webhook paylaod to a value".to_string()
|
||||
})
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable(
|
||||
"There was an issue when encoding the incoming webhook body to bytes",
|
||||
@ -1184,7 +1243,12 @@ pub async fn webhooks_core<W: types::OutgoingWebhookType, Ctx: PaymentMethodRetr
|
||||
.switch()
|
||||
.attach_printable("Could not get incoming webhook api response from connector")?;
|
||||
|
||||
Ok((response, webhook_effect))
|
||||
let serialized_request = event_object
|
||||
.masked_serialize()
|
||||
.into_report()
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||
.attach_printable("Could not convert webhook effect to string")?;
|
||||
Ok((response, webhook_effect, serialized_request))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
||||
Reference in New Issue
Block a user