refactor(outgoing_webhooks): raise errors in the analytics pipeline in case of API client errors or non-2xx responses (#4894)

This commit is contained in:
Sanchith Hegde
2024-06-06 23:15:45 +05:30
committed by GitHub
parent b4dbe841f8
commit 9da92027ef

View File

@ -274,199 +274,17 @@ async fn trigger_webhook_to_merchant(
);
logger::debug!(outgoing_webhook_response=?response);
let update_event_if_client_error =
|state: SessionState,
merchant_key_store: domain::MerchantKeyStore,
merchant_id: String,
event_id: String,
error_message: String| async move {
let is_webhook_notified = false;
let response_to_store = OutgoingWebhookResponseContent {
body: None,
headers: None,
status_code: None,
error_message: Some(error_message),
};
let event_update = domain::EventUpdate::UpdateResponse {
is_webhook_notified,
response: Some(
domain_types::encrypt(
response_to_store
.encode_to_string_of_json()
.change_context(
errors::WebhooksFlowError::OutgoingWebhookResponseEncodingFailed,
)
.map(Secret::new)?,
merchant_key_store.key.get_inner().peek(),
)
.await
.change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed)
.attach_printable("Failed to encrypt outgoing webhook response content")?,
),
};
state
.store
.update_event_by_merchant_id_event_id(
&merchant_id,
&event_id,
event_update,
&merchant_key_store,
)
.await
.change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed)
};
let api_client_error_handler =
|state: SessionState,
merchant_key_store: domain::MerchantKeyStore,
merchant_id: String,
event_id: String,
client_error: error_stack::Report<errors::ApiClientError>,
delivery_attempt: enums::WebhookDeliveryAttempt| async move {
// Not including detailed error message in response information since it contains too
// much of diagnostic information to be exposed to the merchant.
update_event_if_client_error(
state,
merchant_key_store,
merchant_id,
event_id,
"Unable to send request to merchant server".to_string(),
)
.await?;
let error =
client_error.change_context(errors::WebhooksFlowError::CallToMerchantFailed);
logger::error!(
?error,
?delivery_attempt,
"An error occurred when sending webhook to merchant"
);
Ok::<_, error_stack::Report<errors::WebhooksFlowError>>(())
};
let update_event_in_storage = |state: SessionState,
merchant_key_store: domain::MerchantKeyStore,
merchant_id: String,
event_id: String,
response: reqwest::Response| async move {
let status_code = response.status();
let is_webhook_notified = status_code.is_success();
let response_headers = response
.headers()
.iter()
.map(|(name, value)| {
(
name.as_str().to_owned(),
value
.to_str()
.map(|s| Secret::from(String::from(s)))
.unwrap_or_else(|error| {
logger::warn!(
"Response header {} contains non-UTF-8 characters: {error:?}",
name.as_str()
);
Secret::from(String::from("Non-UTF-8 header value"))
}),
)
})
.collect::<Vec<_>>();
let response_body = response
.text()
.await
.map(Secret::from)
.unwrap_or_else(|error| {
logger::warn!("Response contains non-UTF-8 characters: {error:?}");
Secret::from(String::from("Non-UTF-8 response body"))
});
let response_to_store = OutgoingWebhookResponseContent {
body: Some(response_body),
headers: Some(response_headers),
status_code: Some(status_code.as_u16()),
error_message: None,
};
let event_update = domain::EventUpdate::UpdateResponse {
is_webhook_notified,
response: Some(
domain_types::encrypt(
response_to_store
.encode_to_string_of_json()
.change_context(
errors::WebhooksFlowError::OutgoingWebhookResponseEncodingFailed,
)
.map(Secret::new)?,
merchant_key_store.key.get_inner().peek(),
)
.await
.change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed)
.attach_printable("Failed to encrypt outgoing webhook response content")?,
),
};
state
.store
.update_event_by_merchant_id_event_id(
&merchant_id,
&event_id,
event_update,
&merchant_key_store,
)
.await
.change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed)
};
let increment_webhook_outgoing_received_count = |merchant_id: String| {
metrics::WEBHOOK_OUTGOING_RECEIVED_COUNT.add(
&metrics::CONTEXT,
1,
&[metrics::KeyValue::new(MERCHANT_ID, merchant_id)],
)
};
let success_response_handler =
|state: SessionState,
merchant_id: String,
process_tracker: Option<storage::ProcessTracker>,
business_status: &'static str| async move {
increment_webhook_outgoing_received_count(merchant_id);
match process_tracker {
Some(process_tracker) => state
.store
.as_scheduler()
.finish_process_with_business_status(process_tracker, business_status.into())
.await
.change_context(
errors::WebhooksFlowError::OutgoingWebhookProcessTrackerTaskUpdateFailed,
),
None => Ok(()),
}
};
let error_response_handler = |merchant_id: String,
delivery_attempt: enums::WebhookDeliveryAttempt,
status_code: u16,
log_message: &'static str| {
metrics::WEBHOOK_OUTGOING_NOT_RECEIVED_COUNT.add(
&metrics::CONTEXT,
1,
&[metrics::KeyValue::new(MERCHANT_ID, merchant_id)],
);
let error = report!(errors::WebhooksFlowError::NotReceivedByMerchant);
logger::warn!(?error, ?delivery_attempt, ?status_code, %log_message);
};
match delivery_attempt {
enums::WebhookDeliveryAttempt::InitialAttempt => match response {
Err(client_error) => {
api_client_error_handler(
state.clone(),
merchant_key_store.clone(),
business_profile.merchant_id.clone(),
event_id.clone(),
&business_profile.merchant_id,
&event_id,
client_error,
delivery_attempt,
ScheduleWebhookRetry::NoSchedule,
)
.await?
}
@ -475,8 +293,8 @@ async fn trigger_webhook_to_merchant(
let _updated_event = update_event_in_storage(
state.clone(),
merchant_key_store.clone(),
business_profile.merchant_id.clone(),
event_id.clone(),
&business_profile.merchant_id,
&event_id,
response,
)
.await?;
@ -484,18 +302,21 @@ async fn trigger_webhook_to_merchant(
if status_code.is_success() {
success_response_handler(
state.clone(),
business_profile.merchant_id,
&business_profile.merchant_id,
process_tracker,
"INITIAL_DELIVERY_ATTEMPT_SUCCESSFUL",
)
.await?;
} else {
error_response_handler(
business_profile.merchant_id,
state.clone(),
&business_profile.merchant_id,
delivery_attempt,
status_code.as_u16(),
"Ignoring error when sending webhook to merchant",
);
ScheduleWebhookRetry::NoSchedule,
)
.await?;
}
}
},
@ -509,30 +330,21 @@ async fn trigger_webhook_to_merchant(
api_client_error_handler(
state.clone(),
merchant_key_store.clone(),
business_profile.merchant_id.clone(),
event_id.clone(),
&business_profile.merchant_id,
&event_id,
client_error,
delivery_attempt,
ScheduleWebhookRetry::WithProcessTracker(process_tracker),
)
.await?;
// Schedule a retry attempt for webhook delivery
outgoing_webhook_retry::retry_webhook_delivery_task(
&*state.store,
&business_profile.merchant_id,
process_tracker,
)
.await
.change_context(
errors::WebhooksFlowError::OutgoingWebhookRetrySchedulingFailed,
)?;
}
Ok(response) => {
let status_code = response.status();
let _updated_event = update_event_in_storage(
state.clone(),
merchant_key_store.clone(),
business_profile.merchant_id.clone(),
event_id.clone(),
&business_profile.merchant_id,
&event_id,
response,
)
.await?;
@ -540,28 +352,21 @@ async fn trigger_webhook_to_merchant(
if status_code.is_success() {
success_response_handler(
state.clone(),
business_profile.merchant_id,
&business_profile.merchant_id,
Some(process_tracker),
"COMPLETED_BY_PT",
)
.await?;
} else {
error_response_handler(
business_profile.merchant_id.clone(),
state.clone(),
&business_profile.merchant_id,
delivery_attempt,
status_code.as_u16(),
"An error occurred when sending webhook to merchant",
);
// Schedule a retry attempt for webhook delivery
outgoing_webhook_retry::retry_webhook_delivery_task(
&*state.store,
&business_profile.merchant_id,
process_tracker,
ScheduleWebhookRetry::WithProcessTracker(process_tracker),
)
.await
.change_context(
errors::WebhooksFlowError::OutgoingWebhookRetrySchedulingFailed,
)?;
.await?;
}
}
}
@ -571,10 +376,11 @@ async fn trigger_webhook_to_merchant(
api_client_error_handler(
state.clone(),
merchant_key_store.clone(),
business_profile.merchant_id.clone(),
event_id.clone(),
&business_profile.merchant_id,
&event_id,
client_error,
delivery_attempt,
ScheduleWebhookRetry::NoSchedule,
)
.await?
}
@ -583,21 +389,24 @@ async fn trigger_webhook_to_merchant(
let _updated_event = update_event_in_storage(
state.clone(),
merchant_key_store.clone(),
business_profile.merchant_id.clone(),
event_id.clone(),
&business_profile.merchant_id,
&event_id,
response,
)
.await?;
if status_code.is_success() {
increment_webhook_outgoing_received_count(business_profile.merchant_id.clone());
increment_webhook_outgoing_received_count(&business_profile.merchant_id);
} else {
error_response_handler(
business_profile.merchant_id,
state,
&business_profile.merchant_id,
delivery_attempt,
status_code.as_u16(),
"Ignoring error when sending webhook to merchant",
);
ScheduleWebhookRetry::NoSchedule,
)
.await?;
}
}
},
@ -773,3 +582,229 @@ pub(crate) fn get_outgoing_webhook_request(
),
}
}
#[derive(Debug)]
enum ScheduleWebhookRetry {
WithProcessTracker(storage::ProcessTracker),
NoSchedule,
}
async fn update_event_if_client_error(
state: SessionState,
merchant_key_store: domain::MerchantKeyStore,
merchant_id: &str,
event_id: &str,
error_message: String,
) -> CustomResult<domain::Event, errors::WebhooksFlowError> {
let is_webhook_notified = false;
let response_to_store = OutgoingWebhookResponseContent {
body: None,
headers: None,
status_code: None,
error_message: Some(error_message),
};
let event_update = domain::EventUpdate::UpdateResponse {
is_webhook_notified,
response: Some(
domain_types::encrypt(
response_to_store
.encode_to_string_of_json()
.change_context(
errors::WebhooksFlowError::OutgoingWebhookResponseEncodingFailed,
)
.map(Secret::new)?,
merchant_key_store.key.get_inner().peek(),
)
.await
.change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed)
.attach_printable("Failed to encrypt outgoing webhook response content")?,
),
};
state
.store
.update_event_by_merchant_id_event_id(
merchant_id,
event_id,
event_update,
&merchant_key_store,
)
.await
.change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed)
}
async fn api_client_error_handler(
state: SessionState,
merchant_key_store: domain::MerchantKeyStore,
merchant_id: &str,
event_id: &str,
client_error: error_stack::Report<errors::ApiClientError>,
delivery_attempt: enums::WebhookDeliveryAttempt,
schedule_webhook_retry: ScheduleWebhookRetry,
) -> CustomResult<(), errors::WebhooksFlowError> {
// Not including detailed error message in response information since it contains too
// much of diagnostic information to be exposed to the merchant.
update_event_if_client_error(
state.clone(),
merchant_key_store,
merchant_id,
event_id,
"Unable to send request to merchant server".to_string(),
)
.await?;
let error = client_error.change_context(errors::WebhooksFlowError::CallToMerchantFailed);
logger::error!(
?error,
?delivery_attempt,
"An error occurred when sending webhook to merchant"
);
if let ScheduleWebhookRetry::WithProcessTracker(process_tracker) = schedule_webhook_retry {
// Schedule a retry attempt for webhook delivery
outgoing_webhook_retry::retry_webhook_delivery_task(
&*state.store,
merchant_id,
process_tracker,
)
.await
.change_context(errors::WebhooksFlowError::OutgoingWebhookRetrySchedulingFailed)?;
}
Err(error)
}
async fn update_event_in_storage(
state: SessionState,
merchant_key_store: domain::MerchantKeyStore,
merchant_id: &str,
event_id: &str,
response: reqwest::Response,
) -> CustomResult<domain::Event, errors::WebhooksFlowError> {
let status_code = response.status();
let is_webhook_notified = status_code.is_success();
let response_headers = response
.headers()
.iter()
.map(|(name, value)| {
(
name.as_str().to_owned(),
value
.to_str()
.map(|s| Secret::from(String::from(s)))
.unwrap_or_else(|error| {
logger::warn!(
"Response header {} contains non-UTF-8 characters: {error:?}",
name.as_str()
);
Secret::from(String::from("Non-UTF-8 header value"))
}),
)
})
.collect::<Vec<_>>();
let response_body = response
.text()
.await
.map(Secret::from)
.unwrap_or_else(|error| {
logger::warn!("Response contains non-UTF-8 characters: {error:?}");
Secret::from(String::from("Non-UTF-8 response body"))
});
let response_to_store = OutgoingWebhookResponseContent {
body: Some(response_body),
headers: Some(response_headers),
status_code: Some(status_code.as_u16()),
error_message: None,
};
let event_update = domain::EventUpdate::UpdateResponse {
is_webhook_notified,
response: Some(
domain_types::encrypt(
response_to_store
.encode_to_string_of_json()
.change_context(
errors::WebhooksFlowError::OutgoingWebhookResponseEncodingFailed,
)
.map(Secret::new)?,
merchant_key_store.key.get_inner().peek(),
)
.await
.change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed)
.attach_printable("Failed to encrypt outgoing webhook response content")?,
),
};
state
.store
.update_event_by_merchant_id_event_id(
merchant_id,
event_id,
event_update,
&merchant_key_store,
)
.await
.change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed)
}
fn increment_webhook_outgoing_received_count(merchant_id: &str) {
metrics::WEBHOOK_OUTGOING_RECEIVED_COUNT.add(
&metrics::CONTEXT,
1,
&[metrics::KeyValue::new(MERCHANT_ID, merchant_id.to_owned())],
)
}
async fn success_response_handler(
state: SessionState,
merchant_id: &str,
process_tracker: Option<storage::ProcessTracker>,
business_status: &'static str,
) -> CustomResult<(), errors::WebhooksFlowError> {
increment_webhook_outgoing_received_count(merchant_id);
match process_tracker {
Some(process_tracker) => state
.store
.as_scheduler()
.finish_process_with_business_status(process_tracker, business_status.into())
.await
.change_context(
errors::WebhooksFlowError::OutgoingWebhookProcessTrackerTaskUpdateFailed,
),
None => Ok(()),
}
}
async fn error_response_handler(
state: SessionState,
merchant_id: &str,
delivery_attempt: enums::WebhookDeliveryAttempt,
status_code: u16,
log_message: &'static str,
schedule_webhook_retry: ScheduleWebhookRetry,
) -> CustomResult<(), errors::WebhooksFlowError> {
metrics::WEBHOOK_OUTGOING_NOT_RECEIVED_COUNT.add(
&metrics::CONTEXT,
1,
&[metrics::KeyValue::new(MERCHANT_ID, merchant_id.to_owned())],
);
let error = report!(errors::WebhooksFlowError::NotReceivedByMerchant);
logger::warn!(?error, ?delivery_attempt, ?status_code, %log_message);
if let ScheduleWebhookRetry::WithProcessTracker(process_tracker) = schedule_webhook_retry {
// Schedule a retry attempt for webhook delivery
outgoing_webhook_retry::retry_webhook_delivery_task(
&*state.store,
merchant_id,
process_tracker,
)
.await
.change_context(errors::WebhooksFlowError::OutgoingWebhookRetrySchedulingFailed)?;
}
Err(error)
}