feat(router): separate straight through algorithm in separate column in payment attempt (#863)

This commit is contained in:
ItsMeShashank
2023-04-12 18:27:17 +05:30
committed by GitHub
parent 1b94d25f66
commit 01f86c499d
15 changed files with 104 additions and 121 deletions

10
Cargo.lock generated
View File

@ -2784,7 +2784,7 @@ dependencies = [
[[package]]
name = "opentelemetry"
version = "0.18.0"
source = "git+https://github.com/open-telemetry/opentelemetry-rust?rev=44b90202fd744598db8b0ace5b8f0bad7ec45658#44b90202fd744598db8b0ace5b8f0bad7ec45658"
source = "git+https://github.com/open-telemetry/opentelemetry-rust/?rev=44b90202fd744598db8b0ace5b8f0bad7ec45658#44b90202fd744598db8b0ace5b8f0bad7ec45658"
dependencies = [
"opentelemetry_api",
"opentelemetry_sdk",
@ -2793,7 +2793,7 @@ dependencies = [
[[package]]
name = "opentelemetry-otlp"
version = "0.11.0"
source = "git+https://github.com/open-telemetry/opentelemetry-rust?rev=44b90202fd744598db8b0ace5b8f0bad7ec45658#44b90202fd744598db8b0ace5b8f0bad7ec45658"
source = "git+https://github.com/open-telemetry/opentelemetry-rust/?rev=44b90202fd744598db8b0ace5b8f0bad7ec45658#44b90202fd744598db8b0ace5b8f0bad7ec45658"
dependencies = [
"async-trait",
"futures",
@ -2810,7 +2810,7 @@ dependencies = [
[[package]]
name = "opentelemetry-proto"
version = "0.1.0"
source = "git+https://github.com/open-telemetry/opentelemetry-rust?rev=44b90202fd744598db8b0ace5b8f0bad7ec45658#44b90202fd744598db8b0ace5b8f0bad7ec45658"
source = "git+https://github.com/open-telemetry/opentelemetry-rust/?rev=44b90202fd744598db8b0ace5b8f0bad7ec45658#44b90202fd744598db8b0ace5b8f0bad7ec45658"
dependencies = [
"futures",
"futures-util",
@ -2822,7 +2822,7 @@ dependencies = [
[[package]]
name = "opentelemetry_api"
version = "0.18.0"
source = "git+https://github.com/open-telemetry/opentelemetry-rust?rev=44b90202fd744598db8b0ace5b8f0bad7ec45658#44b90202fd744598db8b0ace5b8f0bad7ec45658"
source = "git+https://github.com/open-telemetry/opentelemetry-rust/?rev=44b90202fd744598db8b0ace5b8f0bad7ec45658#44b90202fd744598db8b0ace5b8f0bad7ec45658"
dependencies = [
"fnv",
"futures-channel",
@ -2837,7 +2837,7 @@ dependencies = [
[[package]]
name = "opentelemetry_sdk"
version = "0.18.0"
source = "git+https://github.com/open-telemetry/opentelemetry-rust?rev=44b90202fd744598db8b0ace5b8f0bad7ec45658#44b90202fd744598db8b0ace5b8f0bad7ec45658"
source = "git+https://github.com/open-telemetry/opentelemetry-rust/?rev=44b90202fd744598db8b0ace5b8f0bad7ec45658#44b90202fd744598db8b0ace5b8f0bad7ec45658"
dependencies = [
"async-trait",
"crossbeam-channel",

View File

@ -828,7 +828,7 @@ pub async fn list_payments(
) -> RouterResponse<api::PaymentListResponse> {
use futures::stream::StreamExt;
use crate::types::transformers::ForeignTryFrom;
use crate::types::transformers::ForeignFrom;
helpers::validate_payment_list_request(&constraints)?;
let merchant_id = &merchant.merchant_id;
@ -859,11 +859,7 @@ pub async fn list_payments(
.collect::<Vec<(storage::PaymentIntent, storage::PaymentAttempt)>>()
.await;
let data: Vec<api::PaymentsResponse> = pi
.into_iter()
.map(ForeignTryFrom::foreign_try_from)
.collect::<Result<_, _>>()
.change_context(errors::ApiErrorResponse::InternalServerError)?;
let data: Vec<api::PaymentsResponse> = pi.into_iter().map(ForeignFrom::foreign_from).collect();
Ok(services::ApplicationResponse::Json(
api::PaymentListResponse {
@ -912,24 +908,12 @@ pub fn update_straight_through_routing<F>(
where
F: Send + Clone,
{
let mut routing_data: storage::RoutingData = payment_data
.payment_attempt
.connector
let _: api::RoutingAlgorithm = request_straight_through
.clone()
.unwrap_or_else(|| serde_json::json!({}))
.parse_value("RoutingData")
.attach_printable("Invalid routing data format in payment attempt")?;
let request_straight_through: api::RoutingAlgorithm = request_straight_through
.parse_value("RoutingAlgorithm")
.attach_printable("Invalid straight through routing rules format")?;
routing_data.algorithm = Some(request_straight_through);
let encoded_routing_data = Encode::<storage::RoutingData>::encode_to_value(&routing_data)
.attach_printable("Unable to serialize routing data to serde value")?;
payment_data.payment_attempt.connector = Some(encoded_routing_data);
payment_data.payment_attempt.straight_through_algorithm = Some(request_straight_through);
Ok(())
}
@ -987,14 +971,17 @@ pub fn connector_selection<F>(
where
F: Send + Clone,
{
let mut routing_data: storage::RoutingData = payment_data
.payment_attempt
.connector
.clone()
.unwrap_or_else(|| serde_json::json!({}))
.parse_value("RoutingData")
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Invalid routing data format in payment attempt")?;
let mut routing_data = storage::RoutingData {
routed_through: payment_data.payment_attempt.connector.clone(),
algorithm: payment_data
.payment_attempt
.straight_through_algorithm
.clone()
.map(|val| val.parse_value("RoutingAlgorithm"))
.transpose()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Invalid straight through algorithm format in payment attempt")?,
};
let request_straight_through: Option<api::RoutingAlgorithm> = request_straight_through
.map(|val| val.parse_value("RoutingAlgorithm"))
@ -1009,11 +996,15 @@ where
&mut routing_data,
)?;
let encoded_routing_data = Encode::<storage::RoutingData>::encode_to_value(&routing_data)
let encoded_algorithm = routing_data
.algorithm
.map(|algo| Encode::<api::RoutingAlgorithm>::encode_to_value(&algo))
.transpose()
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Unable to serialize routing data to serde value")?;
.attach_printable("Unable to serialize routing algorithm to serde value")?;
payment_data.payment_attempt.connector = Some(encoded_routing_data);
payment_data.payment_attempt.connector = routing_data.routed_through;
payment_data.payment_attempt.straight_through_algorithm = encoded_algorithm;
Ok(decided_connector)
}

View File

@ -483,14 +483,9 @@ where
Op: std::fmt::Debug,
{
if check_if_operation_confirm(operation) {
let routed_through: storage::RoutedThroughData = payment_attempt
let connector_name = payment_attempt
.connector
.clone()
.parse_value("RoutedThroughData")
.change_context(errors::ApiErrorResponse::InternalServerError)?;
let connector_name = routed_through
.routed_through
.ok_or(errors::ApiErrorResponse::InternalServerError)?;
let schedule_time = payment_sync::get_sync_process_schedule_time(

View File

@ -331,6 +331,10 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsRequest> for Paymen
};
let connector = payment_data.payment_attempt.connector.clone();
let straight_through_algorithm = payment_data
.payment_attempt
.straight_through_algorithm
.clone();
let payment_token = payment_data.token.clone();
let payment_method_type = payment_data.payment_attempt.payment_method_type.clone();
let payment_experience = payment_data.payment_attempt.payment_experience.clone();
@ -362,6 +366,7 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsRequest> for Paymen
payment_method_type,
payment_experience,
business_sub_label,
straight_through_algorithm,
},
storage_scheme,
)

View File

@ -23,7 +23,6 @@ use crate::{
storage::{
self,
enums::{self, IntentStatus},
PaymentAttemptExt,
},
transformers::ForeignInto,
},
@ -139,8 +138,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::PaymentsRequest> for Pa
})?;
connector_response = db
.insert_connector_response(
Self::make_connector_response(&payment_attempt)
.change_context(errors::ApiErrorResponse::InternalServerError)?,
Self::make_connector_response(&payment_attempt),
storage_scheme,
)
.await
@ -316,6 +314,10 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsRequest> for Paymen
let payment_token = payment_data.token.clone();
let connector = payment_data.payment_attempt.connector.clone();
let straight_through_algorithm = payment_data
.payment_attempt
.straight_through_algorithm
.clone();
payment_data.payment_attempt = db
.update_payment_attempt_with_attempt_id(
@ -323,6 +325,7 @@ impl<F: Clone> UpdateTracker<F, PaymentData<F>, api::PaymentsRequest> for Paymen
storage::PaymentAttemptUpdate::UpdateTrackers {
payment_token,
connector,
straight_through_algorithm,
},
storage_scheme,
)
@ -532,18 +535,18 @@ impl PaymentCreate {
#[instrument(skip_all)]
pub fn make_connector_response(
payment_attempt: &storage::PaymentAttempt,
) -> CustomResult<storage::ConnectorResponseNew, errors::ParsingError> {
Ok(storage::ConnectorResponseNew {
) -> storage::ConnectorResponseNew {
storage::ConnectorResponseNew {
payment_id: payment_attempt.payment_id.clone(),
merchant_id: payment_attempt.merchant_id.clone(),
attempt_id: payment_attempt.attempt_id.clone(),
created_at: payment_attempt.created_at,
modified_at: payment_attempt.modified_at,
connector_name: payment_attempt.get_routed_through_connector()?,
connector_name: payment_attempt.connector.clone(),
connector_transaction_id: None,
authentication_data: None,
encoded_data: None,
})
}
}
}

View File

@ -124,8 +124,7 @@ impl<F: Send + Clone> GetTracker<F, PaymentData<F>, api::VerifyRequest> for Paym
connector_response = match db
.insert_connector_response(
PaymentCreate::make_connector_response(&payment_attempt)
.change_context(errors::ApiErrorResponse::InternalServerError)?,
PaymentCreate::make_connector_response(&payment_attempt),
storage_scheme,
)
.await

View File

@ -15,8 +15,8 @@ use crate::{
services::{self, RedirectForm},
types::{
self, api,
storage::{self, enums, PaymentAttemptExt},
transformers::{ForeignInto, ForeignTryFrom},
storage::{self, enums},
transformers::{ForeignFrom, ForeignInto},
},
utils::{OptionExt, ValueExt},
};
@ -278,9 +278,7 @@ where
})
}
let mut response: api::PaymentsResponse = Default::default();
let routed_through = payment_attempt
.get_routed_through_connector()
.change_context(errors::ApiErrorResponse::InternalServerError)?;
let routed_through = payment_attempt.connector.clone();
let connector_label = routed_through.as_ref().map(|connector_name| {
helpers::get_connector_label(
@ -417,15 +415,11 @@ where
})
}
impl ForeignTryFrom<(storage::PaymentIntent, storage::PaymentAttempt)> for api::PaymentsResponse {
type Error = error_stack::Report<errors::ParsingError>;
fn foreign_try_from(
item: (storage::PaymentIntent, storage::PaymentAttempt),
) -> Result<Self, Self::Error> {
impl ForeignFrom<(storage::PaymentIntent, storage::PaymentAttempt)> for api::PaymentsResponse {
fn foreign_from(item: (storage::PaymentIntent, storage::PaymentAttempt)) -> Self {
let pi = item.0;
let pa = item.1;
Ok(Self {
Self {
payment_id: Some(pi.payment_id),
merchant_id: Some(pi.merchant_id),
status: pi.status.foreign_into(),
@ -437,11 +431,11 @@ impl ForeignTryFrom<(storage::PaymentIntent, storage::PaymentAttempt)> for api::
description: pi.description,
metadata: pi.metadata,
customer_id: pi.customer_id,
connector: pa.get_routed_through_connector()?,
connector: pa.connector,
payment_method: pa.payment_method.map(ForeignInto::foreign_into),
payment_method_type: pa.payment_method_type.map(ForeignInto::foreign_into),
..Default::default()
})
}
}
}

View File

@ -18,7 +18,7 @@ use crate::{
types::{
self,
api::{self, refunds},
storage::{self, enums, PaymentAttemptExt, ProcessTrackerExt},
storage::{self, enums, ProcessTrackerExt},
transformers::{ForeignFrom, ForeignInto},
},
utils::{self, OptionExt},
@ -120,8 +120,8 @@ pub async fn trigger_refund_to_gateway(
creds_identifier: Option<String>,
) -> RouterResult<storage::Refund> {
let routed_through = payment_attempt
.get_routed_through_connector()
.change_context(errors::ApiErrorResponse::InternalServerError)?
.connector
.clone()
.ok_or(errors::ApiErrorResponse::InternalServerError)
.into_report()
.attach_printable("Failed to retrieve connector from payment attempt")?;
@ -552,8 +552,8 @@ pub async fn validate_and_create_refund(
.change_context(errors::ApiErrorResponse::MaximumRefundCount)?;
let connector = payment_attempt
.get_routed_through_connector()
.change_context(errors::ApiErrorResponse::InternalServerError)?
.connector
.clone()
.ok_or(errors::ApiErrorResponse::InternalServerError)
.into_report()
.attach_printable("No connector populated in payment attempt")?;

View File

@ -264,6 +264,7 @@ impl PaymentAttemptInterface for MockDb {
payment_method_type: payment_attempt.payment_method_type,
payment_method_data: payment_attempt.payment_method_data,
business_sub_label: payment_attempt.business_sub_label,
straight_through_algorithm: payment_attempt.straight_through_algorithm,
};
payment_attempts.push(payment_attempt.clone());
Ok(payment_attempt)
@ -396,6 +397,9 @@ mod storage {
payment_method_type: payment_attempt.payment_method_type.clone(),
payment_method_data: payment_attempt.payment_method_data.clone(),
business_sub_label: payment_attempt.business_sub_label.clone(),
straight_through_algorithm: payment_attempt
.straight_through_algorithm
.clone(),
};
let field = format!("pa_{}", created_attempt.attempt_id);

View File

@ -9,7 +9,7 @@ use crate::{
scheduler::{consumer, process_data, utils},
types::{
api,
storage::{self, enums, PaymentAttemptExt, ProcessTrackerExt},
storage::{self, enums, ProcessTrackerExt},
},
utils::{OptionExt, ValueExt},
};
@ -64,8 +64,7 @@ impl ProcessTrackerWorkflow for PaymentsSyncWorkflow {
_ => {
let connector = payment_data
.payment_attempt
.get_routed_through_connector()
.map_err(errors::ProcessTrackerError::EParsingError)?
.connector
.ok_or(errors::ProcessTrackerError::MissingRequiredField)?;
retry_sync_task(

View File

@ -1,43 +1,13 @@
use error_stack::ResultExt;
pub use storage_models::payment_attempt::{
PaymentAttempt, PaymentAttemptNew, PaymentAttemptUpdate, PaymentAttemptUpdateInternal,
};
use crate::{
core::errors::{self, CustomResult},
utils::ValueExt,
};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct RoutingData {
pub routed_through: Option<String>,
pub algorithm: Option<api_models::admin::RoutingAlgorithm>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct RoutedThroughData {
pub routed_through: Option<String>,
}
pub trait PaymentAttemptExt {
fn get_routed_through_connector(&self) -> CustomResult<Option<String>, errors::ParsingError>;
}
impl PaymentAttemptExt for PaymentAttempt {
fn get_routed_through_connector(&self) -> CustomResult<Option<String>, errors::ParsingError> {
if let Some(ref val) = self.connector {
let data: RoutedThroughData = val
.clone()
.parse_value("RoutedThroughData")
.attach_printable("Failed to read routed_through connector from payment attempt")?;
Ok(data.routed_through)
} else {
Ok(None)
}
}
}
#[cfg(feature = "kv_store")]
impl crate::utils::storage_partitioning::KvStorePartition for PaymentAttempt {}
@ -67,9 +37,7 @@ mod tests {
let connector = types::Connector::Dummy.to_string();
let payment_attempt = PaymentAttemptNew {
payment_id: payment_id.clone(),
connector: Some(serde_json::json!({
"routed_through": connector,
})),
connector: Some(connector),
created_at: current_time.into(),
modified_at: current_time.into(),
..PaymentAttemptNew::default()
@ -102,9 +70,7 @@ mod tests {
let payment_attempt = PaymentAttemptNew {
payment_id: payment_id.clone(),
merchant_id: merchant_id.clone(),
connector: Some(serde_json::json!({
"routed_through": connector,
})),
connector: Some(connector),
created_at: current_time.into(),
modified_at: current_time.into(),
attempt_id: attempt_id.clone(),
@ -146,9 +112,7 @@ mod tests {
let payment_attempt = PaymentAttemptNew {
payment_id: uuid.clone(),
merchant_id: "1".to_string(),
connector: Some(serde_json::json!({
"routed_through": connector,
})),
connector: Some(connector),
created_at: current_time.into(),
modified_at: current_time.into(),
// Adding a mandate_id

View File

@ -15,7 +15,7 @@ pub struct PaymentAttempt {
pub amount: i64,
pub currency: Option<storage_enums::Currency>,
pub save_to_locker: Option<bool>,
pub connector: Option<serde_json::Value>,
pub connector: Option<String>,
pub error_message: Option<String>,
pub offer_amount: Option<i64>,
pub surcharge_amount: Option<i64>,
@ -45,6 +45,7 @@ pub struct PaymentAttempt {
pub payment_method_type: Option<storage_enums::PaymentMethodType>,
pub payment_method_data: Option<serde_json::Value>,
pub business_sub_label: Option<String>,
pub straight_through_algorithm: Option<serde_json::Value>,
}
#[derive(
@ -60,7 +61,7 @@ pub struct PaymentAttemptNew {
pub currency: Option<storage_enums::Currency>,
// pub auto_capture: Option<bool>,
pub save_to_locker: Option<bool>,
pub connector: Option<serde_json::Value>,
pub connector: Option<String>,
pub error_message: Option<String>,
pub offer_amount: Option<i64>,
pub surcharge_amount: Option<i64>,
@ -90,6 +91,7 @@ pub struct PaymentAttemptNew {
pub payment_method_type: Option<storage_enums::PaymentMethodType>,
pub payment_method_data: Option<serde_json::Value>,
pub business_sub_label: Option<String>,
pub straight_through_algorithm: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -108,7 +110,8 @@ pub enum PaymentAttemptUpdate {
},
UpdateTrackers {
payment_token: Option<String>,
connector: Option<serde_json::Value>,
connector: Option<String>,
straight_through_algorithm: Option<serde_json::Value>,
},
AuthenticationTypeUpdate {
authentication_type: storage_enums::AuthenticationType,
@ -120,12 +123,13 @@ pub enum PaymentAttemptUpdate {
authentication_type: Option<storage_enums::AuthenticationType>,
payment_method: Option<storage_enums::PaymentMethod>,
browser_info: Option<serde_json::Value>,
connector: Option<serde_json::Value>,
connector: Option<String>,
payment_token: Option<String>,
payment_method_data: Option<serde_json::Value>,
payment_method_type: Option<storage_enums::PaymentMethodType>,
payment_experience: Option<storage_enums::PaymentExperience>,
business_sub_label: Option<String>,
straight_through_algorithm: Option<serde_json::Value>,
},
VoidUpdate {
status: storage_enums::AttemptStatus,
@ -133,7 +137,7 @@ pub enum PaymentAttemptUpdate {
},
ResponseUpdate {
status: storage_enums::AttemptStatus,
connector: Option<serde_json::Value>,
connector: Option<String>,
connector_transaction_id: Option<String>,
authentication_type: Option<storage_enums::AuthenticationType>,
payment_method_id: Option<Option<String>>,
@ -145,7 +149,7 @@ pub enum PaymentAttemptUpdate {
},
UnresolvedResponseUpdate {
status: storage_enums::AttemptStatus,
connector: Option<serde_json::Value>,
connector: Option<String>,
connector_transaction_id: Option<String>,
payment_method_id: Option<Option<String>>,
error_code: Option<Option<String>>,
@ -155,7 +159,7 @@ pub enum PaymentAttemptUpdate {
status: storage_enums::AttemptStatus,
},
ErrorUpdate {
connector: Option<serde_json::Value>,
connector: Option<String>,
status: storage_enums::AttemptStatus,
error_code: Option<Option<String>>,
error_message: Option<Option<String>>,
@ -169,7 +173,7 @@ pub struct PaymentAttemptUpdateInternal {
currency: Option<storage_enums::Currency>,
status: Option<storage_enums::AttemptStatus>,
connector_transaction_id: Option<String>,
connector: Option<serde_json::Value>,
connector: Option<String>,
authentication_type: Option<storage_enums::AuthenticationType>,
payment_method: Option<storage_enums::PaymentMethod>,
error_message: Option<Option<String>>,
@ -185,6 +189,7 @@ pub struct PaymentAttemptUpdateInternal {
payment_method_type: Option<storage_enums::PaymentMethodType>,
payment_experience: Option<storage_enums::PaymentExperience>,
business_sub_label: Option<String>,
straight_through_algorithm: Option<serde_json::Value>,
}
impl PaymentAttemptUpdate {
@ -262,6 +267,7 @@ impl From<PaymentAttemptUpdate> for PaymentAttemptUpdateInternal {
payment_method_type,
payment_experience,
business_sub_label,
straight_through_algorithm,
} => Self {
amount: Some(amount),
currency: Some(currency),
@ -276,6 +282,7 @@ impl From<PaymentAttemptUpdate> for PaymentAttemptUpdateInternal {
payment_method_type,
payment_experience,
business_sub_label,
straight_through_algorithm,
..Default::default()
},
PaymentAttemptUpdate::VoidUpdate {
@ -331,9 +338,11 @@ impl From<PaymentAttemptUpdate> for PaymentAttemptUpdateInternal {
PaymentAttemptUpdate::UpdateTrackers {
payment_token,
connector,
straight_through_algorithm,
} => Self {
payment_token,
connector,
straight_through_algorithm,
..Default::default()
},
PaymentAttemptUpdate::UnresolvedResponseUpdate {

View File

@ -262,7 +262,7 @@ diesel::table! {
amount -> Int8,
currency -> Nullable<Currency>,
save_to_locker -> Nullable<Bool>,
connector -> Nullable<Jsonb>,
connector -> Nullable<Varchar>,
error_message -> Nullable<Text>,
offer_amount -> Nullable<Int8>,
surcharge_amount -> Nullable<Int8>,
@ -288,6 +288,7 @@ diesel::table! {
payment_method_type -> Nullable<Varchar>,
payment_method_data -> Nullable<Jsonb>,
business_sub_label -> Nullable<Varchar>,
straight_through_algorithm -> Nullable<Jsonb>,
}
}

View File

@ -0,0 +1,9 @@
-- This file should undo anything in `up.sql`
ALTER TABLE payment_attempt
ALTER COLUMN connector TYPE JSONB
USING jsonb_build_object(
'routed_through', connector,
'algorithm', straight_through_algorithm
);
ALTER TABLE payment_attempt DROP COLUMN straight_through_algorithm;

View File

@ -0,0 +1,10 @@
-- Your SQL goes here
ALTER TABLE payment_attempt
ADD COLUMN straight_through_algorithm JSONB;
UPDATE payment_attempt SET straight_through_algorithm = connector->'algorithm'
WHERE connector->>'algorithm' IS NOT NULL;
ALTER TABLE payment_attempt
ALTER COLUMN connector TYPE VARCHAR(64)
USING connector->>'routed_through';