diff --git a/api-reference/v2/openapi_spec_v2.json b/api-reference/v2/openapi_spec_v2.json index ca5b4e8b5b..919c3ac457 100644 --- a/api-reference/v2/openapi_spec_v2.json +++ b/api-reference/v2/openapi_spec_v2.json @@ -15707,7 +15707,9 @@ "type": "object", "required": [ "id", - "status" + "status", + "amount", + "created_at" ], "properties": { "id": { @@ -15717,6 +15719,20 @@ "status": { "$ref": "#/components/schemas/AttemptStatus" }, + "amount": { + "type": "integer", + "format": "int64", + "description": "The amount of the payment attempt", + "example": 6540 + }, + "error_details": { + "allOf": [ + { + "$ref": "#/components/schemas/RecordAttemptErrorDetails" + } + ], + "nullable": true + }, "payment_intent_feature_metadata": { "allOf": [ { @@ -15732,6 +15748,11 @@ } ], "nullable": true + }, + "created_at": { + "type": "string", + "format": "date-time", + "description": "attempt created at timestamp" } } }, @@ -18118,6 +18139,12 @@ "description": "Invoice Next billing time", "nullable": true }, + "invoice_billing_started_at_time": { + "type": "string", + "format": "date-time", + "description": "Invoice Next billing time", + "nullable": true + }, "first_payment_attempt_pg_error_code": { "type": "string", "description": "First Payment Attempt Payment Gateway Error Code", @@ -21917,6 +21944,39 @@ "disabled" ] }, + "RecordAttemptErrorDetails": { + "type": "object", + "description": "Error details for the payment", + "required": [ + "code", + "message" + ], + "properties": { + "code": { + "type": "string", + "description": "error code sent by billing connector." + }, + "message": { + "type": "string", + "description": "error message sent by billing connector." + }, + "network_advice_code": { + "type": "string", + "description": "This field can be returned for both approved and refused Mastercard payments.\nThis code provides additional information about the type of transaction or the reason why the payment failed.\nIf the payment failed, the network advice code gives guidance on if and when you can retry the payment.", + "nullable": true + }, + "network_decline_code": { + "type": "string", + "description": "For card errors resulting from a card issuer decline, a brand specific 2, 3, or 4 digit code which indicates the reason the authorization failed.", + "nullable": true + }, + "network_error_message": { + "type": "string", + "description": "A string indicating how to proceed with an network error if payment gateway provide one. This is used to understand the network error code better.", + "nullable": true + } + } + }, "RecurringDetails": { "oneOf": [ { diff --git a/config/config.example.toml b/config/config.example.toml index 43dd174a76..de0e954639 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -996,6 +996,7 @@ payout_analytics_topic = "topic" # Kafka topic to be used for Payouts an consolidated_events_topic = "topic" # Kafka topic to be used for Consolidated events authentication_analytics_topic = "topic" # Kafka topic to be used for Authentication events routing_logs_topic = "topic" # Kafka topic to be used for Routing events +revenue_recovery_topic = "topic" # Kafka topic to be used for revenue recovery events # File storage configuration [file_storage] diff --git a/config/deployments/env_specific.toml b/config/deployments/env_specific.toml index 4428b50801..23ff9f16e7 100644 --- a/config/deployments/env_specific.toml +++ b/config/deployments/env_specific.toml @@ -94,6 +94,7 @@ consolidated_events_topic = "topic" # Kafka topic to be used for Consolidat authentication_analytics_topic = "topic" # Kafka topic to be used for Authentication events fraud_check_analytics_topic = "topic" # Kafka topic to be used for Fraud Check events routing_logs_topic = "topic" # Kafka topic to be used for Routing events +revenue_recovery_topic = "topic" # Kafka topic to be used for Revenue Recovery Events # File storage configuration [file_storage] diff --git a/config/development.toml b/config/development.toml index e92b49d52b..95c9f9d932 100644 --- a/config/development.toml +++ b/config/development.toml @@ -1099,6 +1099,7 @@ payout_analytics_topic = "hyperswitch-payout-events" consolidated_events_topic = "hyperswitch-consolidated-events" authentication_analytics_topic = "hyperswitch-authentication-events" routing_logs_topic = "hyperswitch-routing-api-events" +revenue_recovery_topic = "hyperswitch-revenue-recovery-events" [debit_routing_config] supported_currencies = "USD" diff --git a/config/docker_compose.toml b/config/docker_compose.toml index fa4d9794b1..8debb95f52 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -957,6 +957,7 @@ payout_analytics_topic = "hyperswitch-payout-events" consolidated_events_topic = "hyperswitch-consolidated-events" authentication_analytics_topic = "hyperswitch-authentication-events" routing_logs_topic = "hyperswitch-routing-api-events" +revenue_recovery_topic = "hyperswitch-revenue-recovery-events" [analytics] source = "sqlx" diff --git a/config/vector.yaml b/config/vector.yaml index 4b801935ae..89e33da57b 100644 --- a/config/vector.yaml +++ b/config/vector.yaml @@ -1,15 +1,5 @@ acknowledgements: enabled: true -enrichment_tables: - sdk_map: - type: file - file: - path: /etc/vector/config/sdk_map.csv - encoding: - type: csv - schema: - publishable_key: string - merchant_id: string api: @@ -28,6 +18,15 @@ sources: - hyperswitch-dispute-events decoding: codec: json + + revenue_events: + type: kafka + bootstrap_servers: kafka0:29092 + group_id: hyperswitch_revenue_recovery + topics: + - hyperswitch-revenue-recovery-events + decoding: + codec: json sessionized_kafka_tx_events: type: kafka @@ -98,19 +97,6 @@ transforms: key_field: "{{ .payment_id }}{{ .merchant_id }}" threshold: 1000 window_secs: 60 - - amend_sdk_logs: - type: remap - inputs: - - sdk_transformed - source: | - .before_transform = now() - - merchant_id = .merchant_id - row = get_enrichment_table_record!("sdk_map", { "publishable_key" : merchant_id }, case_sensitive: true) - .merchant_id = row.merchant_id - - .after_transform = now() @@ -256,7 +242,54 @@ sinks: - "path" - "source_type" inputs: - - "amend_sdk_logs" + - "sdk_transformed" bootstrap_servers: kafka0:29092 topic: hyper-sdk-logs key_field: ".merchant_id" + + revenue_recovery_s3: + type: aws_s3 + inputs: + - revenue_events + bucket: BUCKET_NAME + region: us-east-1 + content_type: json + batch: + max_events: 1000 + timeout_secs: 86400 + encoding: + codec: csv + csv: + fields: + - merchant_id + - invoice_id + - invoice_amount + - invoice_currency + - invoice_due_date + - invoice_date + - billing_city + - billing_country + - billing_state + - attempt_id + - attempt_amount + - attempt_currency + - attempt_status + - pg_error_code + - network_advice_code + - network_error_code + - first_pg_error_code + - first_network_advice_code + - first_network_error_code + - attempt_created_at + - payment_method_type + - payment_method_subtype + - card_network + - card_issuer + - retry_count + - payment_gateway + key_prefix: "{{ .merchant_id }}/" + timezone: "Europe/London" + + filename_append_uuid: false + filename_time_format: "%Y/%m/%+" + filename_extension: csv diff --git a/crates/api_models/src/payments.rs b/crates/api_models/src/payments.rs index 12256b5b8c..3700b7faad 100644 --- a/crates/api_models/src/payments.rs +++ b/crates/api_models/src/payments.rs @@ -1562,11 +1562,14 @@ pub struct PaymentAttemptResponse { /// The global identifier for the payment attempt #[schema(value_type = String)] pub id: id_type::GlobalAttemptId, + /// /// The status of the attempt #[schema(value_type = AttemptStatus, example = "charged")] pub status: enums::AttemptStatus, + /// Amount related information for this payment and attempt pub amount: PaymentAttemptAmountDetails, + /// Name of the connector that was used for the payment attempt. #[schema(example = "stripe")] pub connector: Option, @@ -1624,6 +1627,7 @@ pub struct PaymentAttemptResponse { /// Value passed in X-CLIENT-SOURCE header during payments confirm request by the client pub client_source: Option, + /// Value passed in X-CLIENT-VERSION header during payments confirm request by the client pub client_version: Option, @@ -1643,12 +1647,21 @@ pub struct PaymentAttemptRecordResponse { /// The status of the attempt #[schema(value_type = AttemptStatus, example = "charged")] pub status: enums::AttemptStatus, + /// The amount of the payment attempt + #[schema(value_type = i64, example = 6540)] + pub amount: MinorUnit, + /// Error details for the payment attempt, if any. + /// This includes fields like error code, network advice code, and network decline code. + pub error_details: Option, /// Additional data that might be required by hyperswitch based on the requested features by the merchants. #[schema(value_type = Option)] pub payment_intent_feature_metadata: Option, /// Additional data that might be required by hyperswitch, to enable some specific features. pub payment_attempt_feature_metadata: Option, + /// attempt created at timestamp + pub created_at: PrimitiveDateTime, } + #[cfg(feature = "v2")] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize, PartialEq, ToSchema)] pub struct PaymentAttemptFeatureMetadata { @@ -8803,6 +8816,9 @@ pub struct PaymentRevenueRecoveryMetadata { /// Invoice Next billing time #[serde(default, with = "common_utils::custom_serde::iso8601::option")] pub invoice_next_billing_time: Option, + /// Invoice Next billing time + #[serde(default, with = "common_utils::custom_serde::iso8601::option")] + pub invoice_billing_started_at_time: Option, /// First Payment Attempt Payment Gateway Error Code #[schema(value_type = Option, example = "card_declined")] pub first_payment_attempt_pg_error_code: Option, @@ -8832,6 +8848,15 @@ pub struct BillingConnectorAdditionalCardInfo { pub card_issuer: Option, } +#[cfg(feature = "v2")] +impl BillingConnectorPaymentMethodDetails { + pub fn get_billing_connector_card_info(&self) -> Option<&BillingConnectorAdditionalCardInfo> { + match self { + Self::Card(card_details) => Some(card_details), + } + } +} + #[cfg(feature = "v2")] impl PaymentRevenueRecoveryMetadata { pub fn set_payment_transmission_field_for_api_request( @@ -8942,6 +8967,11 @@ pub struct PaymentsAttemptRecordRequest { #[serde(default, with = "common_utils::custom_serde::iso8601::option")] pub invoice_next_billing_time: Option, + /// Next Billing time of the Invoice + #[schema(example = "2022-09-10T10:11:12Z")] + #[serde(default, with = "common_utils::custom_serde::iso8601::option")] + pub invoice_billing_started_at_time: Option, + /// source where the payment was triggered by #[schema(value_type = TriggeredBy, example = "internal" )] pub triggered_by: common_enums::TriggeredBy, diff --git a/crates/diesel_models/src/types.rs b/crates/diesel_models/src/types.rs index e3efd3dae2..8f795d67ea 100644 --- a/crates/diesel_models/src/types.rs +++ b/crates/diesel_models/src/types.rs @@ -174,6 +174,8 @@ pub struct PaymentRevenueRecoveryMetadata { pub connector: common_enums::connector_enums::Connector, /// Time at which next invoice will be created pub invoice_next_billing_time: Option, + /// Time at which invoice started + pub invoice_billing_started_at_time: Option, /// Extra Payment Method Details that are needed to be stored pub billing_connector_payment_method_details: Option, /// First Payment Attempt Payment Gateway Error Code diff --git a/crates/hyperswitch_connectors/src/connectors/chargebee/transformers.rs b/crates/hyperswitch_connectors/src/connectors/chargebee/transformers.rs index e0c84b4b45..5a3ec2f96f 100644 --- a/crates/hyperswitch_connectors/src/connectors/chargebee/transformers.rs +++ b/crates/hyperswitch_connectors/src/connectors/chargebee/transformers.rs @@ -280,6 +280,8 @@ pub struct ChargebeeWebhookContent { #[derive(Serialize, Deserialize, Debug)] pub struct ChargebeeSubscriptionData { + #[serde(default, with = "common_utils::custom_serde::timestamp::option")] + pub current_term_start: Option, #[serde(default, with = "common_utils::custom_serde::timestamp::option")] pub next_billing_at: Option, } @@ -488,6 +490,11 @@ impl TryFrom for revenue_recovery::RevenueRecoveryAttemptD .subscription .as_ref() .and_then(|subscription| subscription.next_billing_at); + let invoice_billing_started_at_time = item + .content + .subscription + .as_ref() + .and_then(|subscription| subscription.current_term_start); Ok(Self { amount, currency, @@ -507,6 +514,7 @@ impl TryFrom for revenue_recovery::RevenueRecoveryAttemptD network_error_message: None, retry_count, invoice_next_billing_time, + invoice_billing_started_at_time, card_network: Some(payment_method_details.card.brand), card_isin: Some(payment_method_details.card.iin), // This field is none because it is specific to stripebilling. @@ -576,6 +584,11 @@ impl TryFrom for revenue_recovery::RevenueRecoveryInvoiceD .subscription .as_ref() .and_then(|subscription| subscription.next_billing_at); + let billing_started_at = item + .content + .subscription + .as_ref() + .and_then(|subscription| subscription.current_term_start); Ok(Self { amount: item.content.invoice.total, currency: item.content.invoice.currency_code, @@ -583,6 +596,7 @@ impl TryFrom for revenue_recovery::RevenueRecoveryInvoiceD billing_address: Some(api_models::payments::Address::from(item.content.invoice)), retry_count, next_billing_at: invoice_next_billing_time, + billing_started_at, }) } } diff --git a/crates/hyperswitch_connectors/src/connectors/stripebilling/transformers.rs b/crates/hyperswitch_connectors/src/connectors/stripebilling/transformers.rs index ca0875ba4c..ae80cb48ea 100644 --- a/crates/hyperswitch_connectors/src/connectors/stripebilling/transformers.rs +++ b/crates/hyperswitch_connectors/src/connectors/stripebilling/transformers.rs @@ -402,6 +402,13 @@ impl TryFrom for revenue_recovery::RevenueRecoveryInvo .data .first() .map(|linedata| linedata.period.end); + let billing_started_at = item + .data + .object + .lines + .data + .first() + .map(|linedata| linedata.period.start); Ok(Self { amount: item.data.object.amount, currency: item.data.object.currency, @@ -413,6 +420,7 @@ impl TryFrom for revenue_recovery::RevenueRecoveryInvo .map(api_models::payments::Address::from), retry_count: Some(item.data.object.attempt_count), next_billing_at, + billing_started_at, }) } } diff --git a/crates/hyperswitch_domain_models/src/lib.rs b/crates/hyperswitch_domain_models/src/lib.rs index 1f9dee6863..c7df1e3749 100644 --- a/crates/hyperswitch_domain_models/src/lib.rs +++ b/crates/hyperswitch_domain_models/src/lib.rs @@ -335,6 +335,7 @@ impl ApiModelToDieselModelConvertor for PaymentReven first_payment_attempt_network_decline_code: from .first_payment_attempt_network_decline_code, first_payment_attempt_pg_error_code: from.first_payment_attempt_pg_error_code, + invoice_billing_started_at_time: from.invoice_billing_started_at_time, } } @@ -359,6 +360,7 @@ impl ApiModelToDieselModelConvertor for PaymentReven first_payment_attempt_network_decline_code: self .first_payment_attempt_network_decline_code, first_payment_attempt_pg_error_code: self.first_payment_attempt_pg_error_code, + invoice_billing_started_at_time: self.invoice_billing_started_at_time, } } } diff --git a/crates/hyperswitch_domain_models/src/payments.rs b/crates/hyperswitch_domain_models/src/payments.rs index 0445b2b9a2..e9b234eb46 100644 --- a/crates/hyperswitch_domain_models/src/payments.rs +++ b/crates/hyperswitch_domain_models/src/payments.rs @@ -744,6 +744,7 @@ impl PaymentIntent { network_error_message: None, retry_count: None, invoice_next_billing_time: None, + invoice_billing_started_at_time: None, card_isin: None, card_network: None, // No charge id is present here since it is an internal payment and we didn't call connector yet. @@ -1068,6 +1069,9 @@ where errors::api_error_response::ApiErrorResponse::InternalServerError })?, invoice_next_billing_time: self.revenue_recovery_data.invoice_next_billing_time, + invoice_billing_started_at_time: self + .revenue_recovery_data + .invoice_next_billing_time, billing_connector_payment_method_details, first_payment_attempt_network_advice_code: first_network_advice_code, first_payment_attempt_network_decline_code: first_network_decline_code, diff --git a/crates/hyperswitch_domain_models/src/payments/payment_attempt.rs b/crates/hyperswitch_domain_models/src/payments/payment_attempt.rs index 3baddfa98c..e16285e3df 100644 --- a/crates/hyperswitch_domain_models/src/payments/payment_attempt.rs +++ b/crates/hyperswitch_domain_models/src/payments/payment_attempt.rs @@ -373,6 +373,19 @@ pub struct ErrorDetails { pub network_error_message: Option, } +#[cfg(feature = "v2")] +impl From for api_models::payments::RecordAttemptErrorDetails { + fn from(error_details: ErrorDetails) -> Self { + Self { + code: error_details.code, + message: error_details.message, + network_decline_code: error_details.network_decline_code, + network_advice_code: error_details.network_advice_code, + network_error_message: error_details.network_error_message, + } + } +} + /// Domain model for the payment attempt. /// Few fields which are related are grouped together for better readability and understandability. /// These fields will be flattened and stored in the database in individual columns diff --git a/crates/hyperswitch_domain_models/src/revenue_recovery.rs b/crates/hyperswitch_domain_models/src/revenue_recovery.rs index 0485644628..6481a81d53 100644 --- a/crates/hyperswitch_domain_models/src/revenue_recovery.rs +++ b/crates/hyperswitch_domain_models/src/revenue_recovery.rs @@ -3,8 +3,12 @@ use common_enums::enums as common_enums; use common_utils::{id_type, types as util_types}; use time::PrimitiveDateTime; -use crate::router_response_types::revenue_recovery::{ - BillingConnectorInvoiceSyncResponse, BillingConnectorPaymentsSyncResponse, +use crate::{ + payments, + router_response_types::revenue_recovery::{ + BillingConnectorInvoiceSyncResponse, BillingConnectorPaymentsSyncResponse, + }, + ApiModelToDieselModelConvertor, }; /// Recovery payload is unified struct constructed from billing connectors @@ -48,6 +52,8 @@ pub struct RevenueRecoveryAttemptData { pub retry_count: Option, /// Time when next invoice will be generated which will be equal to the end time of the current invoice pub invoice_next_billing_time: Option, + /// Time at which the invoice created + pub invoice_billing_started_at_time: Option, /// card network type pub card_network: Option, /// card isin @@ -71,6 +77,8 @@ pub struct RevenueRecoveryInvoiceData { pub retry_count: Option, /// Ending date of the invoice or the Next billing time of the Subscription pub next_billing_at: Option, + /// Invoice Starting Time + pub billing_started_at: Option, } /// type of action that needs to taken after consuming recovery payload @@ -89,17 +97,30 @@ pub enum RecoveryAction { /// Invalid event has been received. InvalidAction, } -#[derive(Clone)] + +#[derive(Clone, Debug)] pub struct RecoveryPaymentIntent { pub payment_id: id_type::GlobalPaymentId, pub status: common_enums::IntentStatus, pub feature_metadata: Option, + pub merchant_id: id_type::MerchantId, + pub merchant_reference_id: Option, + pub invoice_amount: util_types::MinorUnit, + pub invoice_currency: common_enums::Currency, + pub created_at: Option, + pub billing_address: Option, } +#[derive(Clone, Debug)] pub struct RecoveryPaymentAttempt { pub attempt_id: id_type::GlobalAttemptId, pub attempt_status: common_enums::AttemptStatus, pub feature_metadata: Option, + pub amount: util_types::MinorUnit, + pub network_advice_code: Option, + pub network_decline_code: Option, + pub error_code: Option, + pub created_at: PrimitiveDateTime, } impl RecoveryPaymentAttempt { @@ -231,6 +252,7 @@ impl From<&BillingConnectorInvoiceSyncResponse> for RevenueRecoveryInvoiceData { billing_address: data.billing_address.clone(), retry_count: data.retry_count, next_billing_at: data.ends_at, + billing_started_at: data.created_at, } } } @@ -281,6 +303,7 @@ impl card_network: billing_connector_payment_details.card_network.clone(), card_isin: billing_connector_payment_details.card_isin.clone(), charge_id: billing_connector_payment_details.charge_id.clone(), + invoice_billing_started_at_time: invoice_details.billing_started_at, } } } @@ -313,3 +336,61 @@ impl From<&RevenueRecoveryAttemptData> for Option for RecoveryPaymentIntent { + fn from(payment_intent: &payments::PaymentIntent) -> Self { + Self { + payment_id: payment_intent.id.clone(), + status: payment_intent.status, + feature_metadata: payment_intent + .feature_metadata + .clone() + .map(|feature_metadata| feature_metadata.convert_back()), + merchant_reference_id: payment_intent.merchant_reference_id.clone(), + invoice_amount: payment_intent.amount_details.order_amount, + invoice_currency: payment_intent.amount_details.currency, + billing_address: payment_intent + .billing_address + .clone() + .map(|address| api_payments::Address::from(address.into_inner())), + merchant_id: payment_intent.merchant_id.clone(), + created_at: Some(payment_intent.created_at), + } + } +} + +impl From<&payments::payment_attempt::PaymentAttempt> for RecoveryPaymentAttempt { + fn from(payment_attempt: &payments::payment_attempt::PaymentAttempt) -> Self { + Self { + attempt_id: payment_attempt.id.clone(), + attempt_status: payment_attempt.status, + feature_metadata: payment_attempt + .feature_metadata + .clone() + .map( + |feature_metadata| api_payments::PaymentAttemptFeatureMetadata { + revenue_recovery: feature_metadata.revenue_recovery.map(|recovery| { + api_payments::PaymentAttemptRevenueRecoveryData { + attempt_triggered_by: recovery.attempt_triggered_by, + charge_id: recovery.charge_id, + } + }), + }, + ), + amount: payment_attempt.amount_details.get_net_amount(), + network_advice_code: payment_attempt + .error + .clone() + .and_then(|error| error.network_advice_code), + network_decline_code: payment_attempt + .error + .clone() + .and_then(|error| error.network_decline_code), + error_code: payment_attempt + .error + .as_ref() + .map(|error| error.code.clone()), + created_at: payment_attempt.created_at, + } + } +} diff --git a/crates/openapi/src/openapi_v2.rs b/crates/openapi/src/openapi_v2.rs index 4b88ec4c1a..81e8f10dcd 100644 --- a/crates/openapi/src/openapi_v2.rs +++ b/crates/openapi/src/openapi_v2.rs @@ -360,6 +360,7 @@ Never share your secret api keys. Keep them guarded and secure. api_models::payments::PaymentAttemptFeatureMetadata, api_models::payments::PaymentAttemptRevenueRecoveryData, api_models::payments::BillingConnectorPaymentMethodDetails, + api_models::payments::RecordAttemptErrorDetails, api_models::payments::BillingConnectorAdditionalCardInfo, api_models::payments::AliPayRedirection, api_models::payments::MomoRedirection, diff --git a/crates/router/src/core/payments/transformers.rs b/crates/router/src/core/payments/transformers.rs index 36db7dc1b8..6f5689ed1c 100644 --- a/crates/router/src/core/payments/transformers.rs +++ b/crates/router/src/core/payments/transformers.rs @@ -2146,8 +2146,9 @@ where let payment_attempt = self.payment_attempt; let payment_intent = self.payment_intent; let response = api_models::payments::PaymentAttemptRecordResponse { - id: payment_attempt.get_id().to_owned(), + id: payment_attempt.id.clone(), status: payment_attempt.status, + amount: payment_attempt.amount_details.get_net_amount(), payment_intent_feature_metadata: payment_intent .feature_metadata .as_ref() @@ -2156,6 +2157,10 @@ where .feature_metadata .as_ref() .map(api_models::payments::PaymentAttemptFeatureMetadata::foreign_from), + error_details: payment_attempt + .error + .map(api_models::payments::RecordAttemptErrorDetails::from), + created_at: payment_attempt.created_at, }; Ok(services::ApplicationResponse::JsonWithHeaders(( response, @@ -5217,6 +5222,8 @@ impl ForeignFrom<&diesel_models::types::FeatureMetadata> for api_models::payment first_payment_attempt_pg_error_code: payment_revenue_recovery_metadata .first_payment_attempt_pg_error_code .clone(), + invoice_billing_started_at_time: payment_revenue_recovery_metadata + .invoice_billing_started_at_time, } }); let apple_pay_details = feature_metadata diff --git a/crates/router/src/core/revenue_recovery/types.rs b/crates/router/src/core/revenue_recovery/types.rs index 65bdbaa628..c043641dab 100644 --- a/crates/router/src/core/revenue_recovery/types.rs +++ b/crates/router/src/core/revenue_recovery/types.rs @@ -34,6 +34,7 @@ use crate::{ errors::{self, RouterResult}, payments::{self, helpers, operations::Operation}, revenue_recovery::{self as revenue_recovery_core}, + webhooks::recovery_incoming as recovery_incoming_flow, }, db::StorageInterface, logger, @@ -101,6 +102,23 @@ impl RevenueRecoveryPaymentsAttemptStatus { ) -> Result<(), errors::ProcessTrackerError> { let db = &*state.store; + let recovery_payment_intent = + hyperswitch_domain_models::revenue_recovery::RecoveryPaymentIntent::from( + payment_intent, + ); + + let recovery_payment_attempt = + hyperswitch_domain_models::revenue_recovery::RecoveryPaymentAttempt::from( + &payment_attempt, + ); + + let recovery_payment_tuple = recovery_incoming_flow::RecoveryPaymentTuple::new( + &recovery_payment_intent, + &recovery_payment_attempt, + ); + + let retry_count = process_tracker.retry_count; + match self { Self::Succeeded => { // finish psync task as the payment was a success @@ -110,6 +128,19 @@ impl RevenueRecoveryPaymentsAttemptStatus { business_status::PSYNC_WORKFLOW_COMPLETE, ) .await?; + // publish events to kafka + if let Err(e) = recovery_incoming_flow::RecoveryPaymentTuple::publish_revenue_recovery_event_to_kafka( + state, + &recovery_payment_tuple, + Some(retry_count+1) + ) + .await{ + router_env::logger::error!( + "Failed to publish revenue recovery event to kafka: {:?}", + e + ); + }; + // Record a successful transaction back to Billing Connector // TODO: Add support for retrying failed outgoing recordback webhooks record_back_to_billing_connector( @@ -130,6 +161,17 @@ impl RevenueRecoveryPaymentsAttemptStatus { business_status::PSYNC_WORKFLOW_COMPLETE, ) .await?; + // publish events to kafka + if let Err(e) = recovery_incoming_flow::RecoveryPaymentTuple::publish_revenue_recovery_event_to_kafka( + state, + &recovery_payment_tuple, + Some(retry_count+1) + ) + .await{ + router_env::logger::error!( + "Failed to publish revenue recovery event to kafka : {:?}", e + ); + }; // get a reschedule time let schedule_time = get_schedule_time_to_retry_mit_payments( @@ -291,13 +333,66 @@ impl Action { revenue_recovery_metadata, ) .await; + let recovery_payment_intent = + hyperswitch_domain_models::revenue_recovery::RecoveryPaymentIntent::from( + payment_intent, + ); + // handle proxy api's response match response { Ok(payment_data) => match payment_data.payment_attempt.status.foreign_into() { - RevenueRecoveryPaymentsAttemptStatus::Succeeded => Ok(Self::SuccessfulPayment( - payment_data.payment_attempt.clone(), - )), + RevenueRecoveryPaymentsAttemptStatus::Succeeded => { + let recovery_payment_attempt = + hyperswitch_domain_models::revenue_recovery::RecoveryPaymentAttempt::from( + &payment_data.payment_attempt, + ); + + let recovery_payment_tuple = recovery_incoming_flow::RecoveryPaymentTuple::new( + &recovery_payment_intent, + &recovery_payment_attempt, + ); + + // publish events to kafka + if let Err(e) = recovery_incoming_flow::RecoveryPaymentTuple::publish_revenue_recovery_event_to_kafka( + state, + &recovery_payment_tuple, + Some(process.retry_count+1) + ) + .await{ + router_env::logger::error!( + "Failed to publish revenue recovery event to kafka: {:?}", + e + ); + }; + + Ok(Self::SuccessfulPayment( + payment_data.payment_attempt.clone(), + )) + } RevenueRecoveryPaymentsAttemptStatus::Failed => { + let recovery_payment_attempt = + hyperswitch_domain_models::revenue_recovery::RecoveryPaymentAttempt::from( + &payment_data.payment_attempt, + ); + + let recovery_payment_tuple = recovery_incoming_flow::RecoveryPaymentTuple::new( + &recovery_payment_intent, + &recovery_payment_attempt, + ); + + // publish events to kafka + if let Err(e) = recovery_incoming_flow::RecoveryPaymentTuple::publish_revenue_recovery_event_to_kafka( + state, + &recovery_payment_tuple, + Some(process.retry_count+1) + ) + .await{ + router_env::logger::error!( + "Failed to publish revenue recovery event to kafka: {:?}", + e + ); + }; + Self::decide_retry_failure_action( db, merchant_id, diff --git a/crates/router/src/core/webhooks/recovery_incoming.rs b/crates/router/src/core/webhooks/recovery_incoming.rs index b46b0651e6..fe5613af88 100644 --- a/crates/router/src/core/webhooks/recovery_incoming.rs +++ b/crates/router/src/core/webhooks/recovery_incoming.rs @@ -13,7 +13,9 @@ use hyperswitch_domain_models::{ router_response_types::revenue_recovery as revenue_recovery_response, types as router_types, }; use hyperswitch_interfaces::webhooks as interface_webhooks; +use masking::{PeekInterface, Secret}; use router_env::{instrument, tracing}; +use services::kafka; use crate::{ core::{ @@ -133,6 +135,25 @@ pub async fn recovery_incoming_webhook_flow( ) .await?; + // Publish event to Kafka + if let Some(ref attempt) = recovery_attempt_from_payment_attempt { + // Passing `merchant_context` here + let recovery_payment_tuple = + &RecoveryPaymentTuple::new(&recovery_intent_from_payment_attempt, attempt); + if let Err(e) = RecoveryPaymentTuple::publish_revenue_recovery_event_to_kafka( + &state, + recovery_payment_tuple, + None, + ) + .await + { + router_env::logger::error!( + "Failed to publish revenue recovery event to kafka : {:?}", + e + ); + }; + } + let attempt_triggered_by = recovery_attempt_from_payment_attempt .as_ref() .and_then(|attempt| attempt.get_attempt_triggered_by()); @@ -342,10 +363,20 @@ impl RevenueRecoveryInvoice { let payment_id = payments_response.id.clone(); let status = payments_response.status; let feature_metadata = payments_response.feature_metadata; + let merchant_id = merchant_context.get_merchant_account().get_id().clone(); + let revenue_recovery_invoice_data = &self.0; Ok(Some(revenue_recovery::RecoveryPaymentIntent { payment_id, status, feature_metadata, + merchant_id, + merchant_reference_id: Some( + revenue_recovery_invoice_data.merchant_reference_id.clone(), + ), + invoice_amount: revenue_recovery_invoice_data.amount, + invoice_currency: revenue_recovery_invoice_data.currency, + created_at: revenue_recovery_invoice_data.billing_started_at, + billing_address: revenue_recovery_invoice_data.billing_address.clone(), })) } Err(err) @@ -402,10 +433,21 @@ impl RevenueRecoveryInvoice { .change_context(errors::RevenueRecoveryError::PaymentIntentCreateFailed) .attach_printable("expected json response")?; + let merchant_id = merchant_context.get_merchant_account().get_id().clone(); + let revenue_recovery_invoice_data = &self.0; + Ok(revenue_recovery::RecoveryPaymentIntent { payment_id: response.id, status: response.status, feature_metadata: response.feature_metadata, + merchant_id, + merchant_reference_id: Some( + revenue_recovery_invoice_data.merchant_reference_id.clone(), + ), + invoice_amount: revenue_recovery_invoice_data.amount, + invoice_currency: revenue_recovery_invoice_data.currency, + created_at: revenue_recovery_invoice_data.billing_started_at, + billing_address: revenue_recovery_invoice_data.billing_address.clone(), }) } } @@ -511,10 +553,18 @@ impl RevenueRecoveryAttempt { }) }); let payment_attempt = - final_attempt.map(|attempt_res| revenue_recovery::RecoveryPaymentAttempt { - attempt_id: attempt_res.id.to_owned(), - attempt_status: attempt_res.status.to_owned(), - feature_metadata: attempt_res.feature_metadata.to_owned(), + final_attempt.map(|res| revenue_recovery::RecoveryPaymentAttempt { + attempt_id: res.id.to_owned(), + attempt_status: res.status.to_owned(), + feature_metadata: res.feature_metadata.to_owned(), + amount: res.amount.net_amount, + network_advice_code: res.error.clone().and_then(|e| e.network_advice_code), // Placeholder, to be populated if available + network_decline_code: res + .error + .clone() + .and_then(|e| e.network_decline_code), // Placeholder, to be populated if available + error_code: res.error.clone().map(|error| error.code), + created_at: res.created_at, }); // If we have an attempt, combine it with payment_intent in a tuple. let res_with_payment_intent_and_attempt = @@ -579,11 +629,31 @@ impl RevenueRecoveryAttempt { attempt_id: attempt_response.id.clone(), attempt_status: attempt_response.status, feature_metadata: attempt_response.payment_attempt_feature_metadata, + amount: attempt_response.amount, + network_advice_code: attempt_response + .error_details + .clone() + .and_then(|error| error.network_decline_code), // Placeholder, to be populated if available + network_decline_code: attempt_response + .error_details + .clone() + .and_then(|error| error.network_decline_code), // Placeholder, to be populated if available + error_code: attempt_response + .error_details + .clone() + .map(|error| error.code), + created_at: attempt_response.created_at, }, revenue_recovery::RecoveryPaymentIntent { payment_id: payment_intent.payment_id.clone(), status: attempt_response.status.into(), // Using status from attempt_response feature_metadata: attempt_response.payment_intent_feature_metadata, // Using feature_metadata from attempt_response + merchant_id: payment_intent.merchant_id.clone(), + merchant_reference_id: payment_intent.merchant_reference_id.clone(), + invoice_amount: payment_intent.invoice_amount, + invoice_currency: payment_intent.invoice_currency, + created_at: payment_intent.created_at, + billing_address: payment_intent.billing_address.clone(), }, )) } @@ -665,6 +735,8 @@ impl RevenueRecoveryAttempt { connector_customer_id: revenue_recovery_attempt_data.connector_customer_id.clone(), retry_count: revenue_recovery_attempt_data.retry_count, invoice_next_billing_time: revenue_recovery_attempt_data.invoice_next_billing_time, + invoice_billing_started_at_time: revenue_recovery_attempt_data + .invoice_billing_started_at_time, triggered_by, card_network: revenue_recovery_attempt_data.card_network.clone(), card_issuer, @@ -1180,3 +1252,104 @@ impl BillingConnectorInvoiceSyncFlowRouterData { self.0 } } + +#[derive(Clone, Debug)] +pub struct RecoveryPaymentTuple( + revenue_recovery::RecoveryPaymentIntent, + revenue_recovery::RecoveryPaymentAttempt, +); + +impl RecoveryPaymentTuple { + pub fn new( + payment_intent: &revenue_recovery::RecoveryPaymentIntent, + payment_attempt: &revenue_recovery::RecoveryPaymentAttempt, + ) -> Self { + Self(payment_intent.clone(), payment_attempt.clone()) + } + + pub async fn publish_revenue_recovery_event_to_kafka( + state: &SessionState, + recovery_payment_tuple: &Self, + retry_count: Option, + ) -> CustomResult<(), errors::RevenueRecoveryError> { + let recovery_payment_intent = &recovery_payment_tuple.0; + let recovery_payment_attempt = &recovery_payment_tuple.1; + let revenue_recovery_feature_metadata = recovery_payment_intent + .feature_metadata + .as_ref() + .and_then(|metadata| metadata.revenue_recovery.as_ref()); + + let billing_city = recovery_payment_intent + .billing_address + .as_ref() + .and_then(|billing_address| billing_address.address.as_ref()) + .and_then(|address| address.city.clone()) + .map(Secret::new); + + let billing_state = recovery_payment_intent + .billing_address + .as_ref() + .and_then(|billing_address| billing_address.address.as_ref()) + .and_then(|address| address.state.clone()); + + let billing_country = recovery_payment_intent + .billing_address + .as_ref() + .and_then(|billing_address| billing_address.address.as_ref()) + .and_then(|address| address.country); + + let card_info = revenue_recovery_feature_metadata.and_then(|metadata| { + metadata + .billing_connector_payment_method_details + .as_ref() + .and_then(|details| details.get_billing_connector_card_info()) + }); + + #[allow(clippy::as_conversions)] + let retry_count = Some(retry_count.unwrap_or_else(|| { + revenue_recovery_feature_metadata + .map(|data| data.total_retry_count as i32) + .unwrap_or(0) + })); + + let event = kafka::revenue_recovery::RevenueRecovery { + merchant_id: &recovery_payment_intent.merchant_id, + invoice_amount: recovery_payment_intent.invoice_amount, + invoice_currency: &recovery_payment_intent.invoice_currency, + invoice_date: revenue_recovery_feature_metadata.and_then(|data| { + data.invoice_billing_started_at_time + .map(|time| time.assume_utc()) + }), + invoice_due_date: revenue_recovery_feature_metadata + .and_then(|data| data.invoice_next_billing_time.map(|time| time.assume_utc())), + billing_city, + billing_country: billing_country.as_ref(), + billing_state, + attempt_amount: recovery_payment_attempt.amount, + attempt_currency: &recovery_payment_intent.invoice_currency.clone(), + attempt_status: &recovery_payment_attempt.attempt_status.clone(), + pg_error_code: recovery_payment_attempt.error_code.clone(), + network_advice_code: recovery_payment_attempt.network_advice_code.clone(), + network_error_code: recovery_payment_attempt.network_decline_code.clone(), + first_pg_error_code: revenue_recovery_feature_metadata + .and_then(|data| data.first_payment_attempt_pg_error_code.clone()), + first_network_advice_code: revenue_recovery_feature_metadata + .and_then(|data| data.first_payment_attempt_network_advice_code.clone()), + first_network_error_code: revenue_recovery_feature_metadata + .and_then(|data| data.first_payment_attempt_network_decline_code.clone()), + attempt_created_at: recovery_payment_attempt.created_at.assume_utc(), + payment_method_type: revenue_recovery_feature_metadata + .map(|data| &data.payment_method_type), + payment_method_subtype: revenue_recovery_feature_metadata + .map(|data| &data.payment_method_subtype), + card_network: card_info + .as_ref() + .and_then(|info| info.card_network.as_ref()), + card_issuer: card_info.and_then(|data| data.card_issuer.clone()), + retry_count, + payment_gateway: revenue_recovery_feature_metadata.map(|data| data.connector), + }; + state.event_handler.log_event(&event); + Ok(()) + } +} diff --git a/crates/router/src/events.rs b/crates/router/src/events.rs index 7d77df6871..b7bf1b9f0f 100644 --- a/crates/router/src/events.rs +++ b/crates/router/src/events.rs @@ -39,6 +39,7 @@ pub enum EventType { Consolidated, Authentication, RoutingApiLogs, + RevenueRecovery, } #[derive(Debug, Default, Deserialize, Clone)] diff --git a/crates/router/src/services/kafka.rs b/crates/router/src/services/kafka.rs index 2da6cc66ff..8a7613f344 100644 --- a/crates/router/src/services/kafka.rs +++ b/crates/router/src/services/kafka.rs @@ -28,6 +28,7 @@ mod payment_intent; mod payment_intent_event; mod refund; mod refund_event; +pub mod revenue_recovery; use diesel_models::{authentication::Authentication, refund::Refund}; use hyperswitch_domain_models::payments::{payment_attempt::PaymentAttempt, PaymentIntent}; use serde::Serialize; @@ -162,6 +163,7 @@ pub struct KafkaSettings { consolidated_events_topic: String, authentication_analytics_topic: String, routing_logs_topic: String, + revenue_recovery_topic: String, } impl KafkaSettings { @@ -277,6 +279,7 @@ pub struct KafkaProducer { authentication_analytics_topic: String, ckh_database_name: Option, routing_logs_topic: String, + revenue_recovery_topic: String, } struct RdKafkaProducer(ThreadedProducer); @@ -327,6 +330,7 @@ impl KafkaProducer { authentication_analytics_topic: conf.authentication_analytics_topic.clone(), ckh_database_name: None, routing_logs_topic: conf.routing_logs_topic.clone(), + revenue_recovery_topic: conf.revenue_recovery_topic.clone(), }) } @@ -665,6 +669,7 @@ impl KafkaProducer { EventType::Consolidated => &self.consolidated_events_topic, EventType::Authentication => &self.authentication_analytics_topic, EventType::RoutingApiLogs => &self.routing_logs_topic, + EventType::RevenueRecovery => &self.revenue_recovery_topic, } } } diff --git a/crates/router/src/services/kafka/revenue_recovery.rs b/crates/router/src/services/kafka/revenue_recovery.rs new file mode 100644 index 0000000000..93294dca4c --- /dev/null +++ b/crates/router/src/services/kafka/revenue_recovery.rs @@ -0,0 +1,43 @@ +use common_utils::{id_type, types::MinorUnit}; +use masking::Secret; +use time::OffsetDateTime; +#[derive(serde::Serialize, Debug)] +pub struct RevenueRecovery<'a> { + pub merchant_id: &'a id_type::MerchantId, + pub invoice_amount: MinorUnit, + pub invoice_currency: &'a common_enums::Currency, + #[serde(default, with = "time::serde::timestamp::nanoseconds::option")] + pub invoice_due_date: Option, + #[serde(with = "time::serde::timestamp::nanoseconds::option")] + pub invoice_date: Option, + pub billing_country: Option<&'a common_enums::CountryAlpha2>, + pub billing_state: Option>, + pub billing_city: Option>, + pub attempt_amount: MinorUnit, + pub attempt_currency: &'a common_enums::Currency, + pub attempt_status: &'a common_enums::AttemptStatus, + pub pg_error_code: Option, + pub network_advice_code: Option, + pub network_error_code: Option, + pub first_pg_error_code: Option, + pub first_network_advice_code: Option, + pub first_network_error_code: Option, + #[serde(default, with = "time::serde::timestamp::nanoseconds")] + pub attempt_created_at: OffsetDateTime, + pub payment_method_type: Option<&'a common_enums::PaymentMethod>, + pub payment_method_subtype: Option<&'a common_enums::PaymentMethodType>, + pub card_network: Option<&'a common_enums::CardNetwork>, + pub card_issuer: Option, + pub retry_count: Option, + pub payment_gateway: Option, +} + +impl super::KafkaMessage for RevenueRecovery<'_> { + fn key(&self) -> String { + self.merchant_id.get_string_repr().to_string() + } + + fn event_type(&self) -> crate::events::EventType { + crate::events::EventType::RevenueRecovery + } +} diff --git a/crates/storage_impl/src/errors.rs b/crates/storage_impl/src/errors.rs index 2260a0528f..89326d9a71 100644 --- a/crates/storage_impl/src/errors.rs +++ b/crates/storage_impl/src/errors.rs @@ -289,7 +289,6 @@ pub enum RecoveryError { #[error("Failed to fetch billing connector account id")] BillingMerchantConnectorAccountIdNotFound, } - #[derive(Debug, Clone, thiserror::Error)] pub enum HealthCheckDecisionEngineError { #[error("Failed to establish Decision Engine connection")]