From bc2f13ab071d24cc09a9cf71aaf48349c6748314 Mon Sep 17 00:00:00 2001 From: Nishant Joshi Date: Thu, 15 Dec 2022 19:10:02 +0530 Subject: [PATCH] fix: resolve `TODO` comments in storage models crate (#151) --- crates/common_utils/src/lib.rs | 8 +++ crates/masking/README.md | 1 - crates/router/src/db/process_tracker.rs | 6 +-- crates/router/src/scheduler/consumer.rs | 27 +++++----- crates/router/src/scheduler/utils.rs | 17 +++--- .../src/types/storage/process_tracker.rs | 2 +- crates/router_env/README.md | 1 - crates/storage_models/src/address.rs | 9 +--- crates/storage_models/src/payment_intent.rs | 2 +- crates/storage_models/src/process_tracker.rs | 5 +- .../src/query/payment_attempt.rs | 48 ++++++++--------- .../src/query/process_tracker.rs | 52 +++++++------------ crates/storage_models/src/query/refund.rs | 2 +- 13 files changed, 78 insertions(+), 102 deletions(-) diff --git a/crates/common_utils/src/lib.rs b/crates/common_utils/src/lib.rs index f43102215e..494552bcf9 100644 --- a/crates/common_utils/src/lib.rs +++ b/crates/common_utils/src/lib.rs @@ -24,12 +24,20 @@ pub mod validation; /// Date-time utilities. pub mod date_time { use time::{OffsetDateTime, PrimitiveDateTime}; + /// Struct to represent milliseconds in time sensitive data fields + #[derive(Debug)] + pub struct Milliseconds(i32); /// Create a new [`PrimitiveDateTime`] with the current date and time in UTC. pub fn now() -> PrimitiveDateTime { let utc_date_time = OffsetDateTime::now_utc(); PrimitiveDateTime::new(utc_date_time.date(), utc_date_time.time()) } + + /// Convert from OffsetDateTime to PrimitiveDateTime + pub fn convert_to_pdt(offset_time: OffsetDateTime) -> PrimitiveDateTime { + PrimitiveDateTime::new(offset_time.date(), offset_time.time()) + } } /// Generate a nanoid with the given prefix and length diff --git a/crates/masking/README.md b/crates/masking/README.md index 3cee5d1208..1b99cf634a 100644 --- a/crates/masking/README.md +++ b/crates/masking/README.md @@ -37,7 +37,6 @@ Most fields are under `Option`. To simplify dealing with `Option`, use `expose_o ``` - ## Files Tree Layout diff --git a/crates/router/src/db/process_tracker.rs b/crates/router/src/db/process_tracker.rs index 88cf4a61b8..396b511d59 100644 --- a/crates/router/src/db/process_tracker.rs +++ b/crates/router/src/db/process_tracker.rs @@ -31,7 +31,7 @@ pub trait ProcessTrackerInterface { &self, task_ids: Vec, task_update: storage::ProcessTrackerUpdate, - ) -> CustomResult, errors::StorageError>; + ) -> CustomResult; async fn update_process_tracker( &self, this: storage::ProcessTracker, @@ -136,7 +136,7 @@ impl ProcessTrackerInterface for Store { &self, task_ids: Vec, task_update: storage::ProcessTrackerUpdate, - ) -> CustomResult, errors::StorageError> { + ) -> CustomResult { let conn = pg_connection(&self.master_pool).await; storage::ProcessTracker::update_process_status_by_ids(&conn, task_ids, task_update) .await @@ -224,7 +224,7 @@ impl ProcessTrackerInterface for MockDb { &self, _task_ids: Vec, _task_update: storage::ProcessTrackerUpdate, - ) -> CustomResult, errors::StorageError> { + ) -> CustomResult { todo!() } } diff --git a/crates/router/src/scheduler/consumer.rs b/crates/router/src/scheduler/consumer.rs index 92cf572356..e73bf61eec 100644 --- a/crates/router/src/scheduler/consumer.rs +++ b/crates/router/src/scheduler/consumer.rs @@ -132,7 +132,7 @@ pub async fn fetch_consumer_tasks( ) -> CustomResult, errors::ProcessTrackerError> { let batches = pt_utils::get_batches(redis_conn, stream_name, group_name, consumer_name).await?; - let tasks = batches.into_iter().fold(Vec::new(), |mut acc, batch| { + let mut tasks = batches.into_iter().fold(Vec::new(), |mut acc, batch| { acc.extend_from_slice( batch .trackers @@ -148,18 +148,19 @@ pub async fn fetch_consumer_tasks( .map(|task| task.id.to_owned()) .collect::>(); - let updated_tasks = db - .process_tracker_update_process_status_by_ids( - task_ids, - storage::ProcessTrackerUpdate::StatusUpdate { - status: enums::ProcessTrackerStatus::ProcessStarted, - business_status: None, - }, - ) - .await - .change_context(errors::ProcessTrackerError::ProcessFetchingFailed)?; - - Ok(updated_tasks) + db.process_tracker_update_process_status_by_ids( + task_ids, + storage::ProcessTrackerUpdate::StatusUpdate { + status: enums::ProcessTrackerStatus::ProcessStarted, + business_status: None, + }, + ) + .await + .change_context(errors::ProcessTrackerError::ProcessFetchingFailed)?; + tasks + .iter_mut() + .for_each(|x| x.status = enums::ProcessTrackerStatus::ProcessStarted); + Ok(tasks) } // Accept flow_options if required diff --git a/crates/router/src/scheduler/utils.rs b/crates/router/src/scheduler/utils.rs index 2d94752e43..a2b5fea194 100644 --- a/crates/router/src/scheduler/utils.rs +++ b/crates/router/src/scheduler/utils.rs @@ -49,7 +49,7 @@ pub async fn update_status_and_append( .collect(); match flow { SchedulerFlow::Producer => { - let res = state + state .store .process_tracker_update_process_status_by_ids( process_ids, @@ -58,18 +58,13 @@ pub async fn update_status_and_append( business_status: None, }, ) - .await; - match res { - Ok(trackers) => { - let count = trackers.len(); - logger::debug!("Updated status of {count} processes"); - Ok(()) - } - Err(error) => { + .await.map_or_else(|error| { logger::error!(error=%error.current_context(),"Error while updating process status"); Err(error.change_context(errors::ProcessTrackerError::ProcessUpdateFailed)) - } - } + }, |count| { + logger::debug!("Updated status of {count} processes"); + Ok(()) + }) } SchedulerFlow::Cleaner => { let res = state diff --git a/crates/router/src/types/storage/process_tracker.rs b/crates/router/src/types/storage/process_tracker.rs index 883c69d69c..98b2fa2b20 100644 --- a/crates/router/src/types/storage/process_tracker.rs +++ b/crates/router/src/types/storage/process_tracker.rs @@ -1,7 +1,7 @@ use error_stack::ResultExt; use serde::Serialize; pub use storage_models::process_tracker::{ - Milliseconds, ProcessData, ProcessTracker, ProcessTrackerNew, ProcessTrackerUpdate, + ProcessData, ProcessTracker, ProcessTrackerNew, ProcessTrackerUpdate, ProcessTrackerUpdateInternal, SchedulerOptions, }; use time::PrimitiveDateTime; diff --git a/crates/router_env/README.md b/crates/router_env/README.md index 22606bdf98..1812281241 100644 --- a/crates/router_env/README.md +++ b/crates/router_env/README.md @@ -22,7 +22,6 @@ pub fn sample() -> () { ## Files Tree Layout - ```text diff --git a/crates/storage_models/src/address.rs b/crates/storage_models/src/address.rs index d45e74fa3d..d404115ee6 100644 --- a/crates/storage_models/src/address.rs +++ b/crates/storage_models/src/address.rs @@ -1,4 +1,4 @@ -use common_utils::{consts, custom_serde, generate_id}; +use common_utils::{consts, custom_serde, date_time, generate_id}; use diesel::{AsChangeset, Identifiable, Insertable, Queryable}; use masking::Secret; use serde::{Deserialize, Serialize}; @@ -115,17 +115,12 @@ impl From for AddressUpdateInternal { last_name, phone_number, country_code, - modified_at: convert_to_pdt(OffsetDateTime::now_utc()), + modified_at: date_time::convert_to_pdt(OffsetDateTime::now_utc()), }, } } } -// TODO: Create utils for this since cane be reused outside address -fn convert_to_pdt(offset_time: OffsetDateTime) -> PrimitiveDateTime { - PrimitiveDateTime::new(offset_time.date(), offset_time.time()) -} - impl Default for AddressNew { fn default() -> Self { Self { diff --git a/crates/storage_models/src/payment_intent.rs b/crates/storage_models/src/payment_intent.rs index 3424fbff51..e13bdebb36 100644 --- a/crates/storage_models/src/payment_intent.rs +++ b/crates/storage_models/src/payment_intent.rs @@ -25,7 +25,7 @@ pub struct PaymentIntent { pub statement_descriptor_suffix: Option, pub created_at: PrimitiveDateTime, pub modified_at: PrimitiveDateTime, - pub last_synced: Option, // FIXME: this is optional + pub last_synced: Option, pub setup_future_usage: Option, pub off_session: Option, pub client_secret: Option, diff --git a/crates/storage_models/src/process_tracker.rs b/crates/storage_models/src/process_tracker.rs index 40c5d402b1..52ce2a1a92 100644 --- a/crates/storage_models/src/process_tracker.rs +++ b/crates/storage_models/src/process_tracker.rs @@ -142,12 +142,9 @@ impl From for ProcessTrackerUpdateInternal { } } -// TODO: Move this to a utility module? -pub struct Milliseconds(i32); - #[allow(dead_code)] pub struct SchedulerOptions { - looper_interval: Milliseconds, + looper_interval: common_utils::date_time::Milliseconds, db_name: String, cache_name: String, schema_name: String, diff --git a/crates/storage_models/src/query/payment_attempt.rs b/crates/storage_models/src/query/payment_attempt.rs index 8fb4389e0a..51ae4446ab 100644 --- a/crates/storage_models/src/query/payment_attempt.rs +++ b/crates/storage_models/src/query/payment_attempt.rs @@ -1,12 +1,6 @@ -use async_bb8_diesel::AsyncRunQueryDsl; -use diesel::{ - associations::HasTable, debug_query, pg::Pg, BoolExpressionMethods, ExpressionMethods, QueryDsl, -}; -use error_stack::{IntoReport, ResultExt}; -use router_env::{ - logger::debug, - tracing::{self, instrument}, -}; +use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods}; +use error_stack::IntoReport; +use router_env::tracing::{self, instrument}; use super::generics::{self, ExecuteQuery, RawQuery, RawSqlQuery}; use crate::{ @@ -121,29 +115,29 @@ impl PaymentAttempt { .await } - // FIXME: Use generics - #[instrument(skip(conn))] pub async fn find_last_successful_attempt_by_payment_id_merchant_id( conn: &PgPooledConn, payment_id: &str, merchant_id: &str, ) -> CustomResult { - let query = Self::table() - .filter( - dsl::payment_id - .eq(payment_id.to_owned()) - .and(dsl::merchant_id.eq(merchant_id.to_owned())) - .and(dsl::status.eq(enums::AttemptStatus::Charged)), - ) - .order(dsl::created_at.desc()); - debug!(query = %debug_query::(&query).to_string()); - - query - .get_result_async(conn) - .await - .into_report() - .change_context(errors::DatabaseError::NotFound) - .attach_printable("Error while finding last successful payment attempt") + // perform ordering on the application level instead of database level + generics::generic_filter::<::Table, _, Self>( + conn, + dsl::payment_id + .eq(payment_id.to_owned()) + .and(dsl::merchant_id.eq(merchant_id.to_owned())) + .and(dsl::status.eq(enums::AttemptStatus::Charged)), + None, + ) + .await? + .into_iter() + .fold( + Err(errors::DatabaseError::NotFound).into_report(), + |acc, cur| match acc { + Ok(value) if value.created_at > cur.created_at => Ok(value), + _ => Ok(cur), + }, + ) } #[instrument(skip(conn))] diff --git a/crates/storage_models/src/query/process_tracker.rs b/crates/storage_models/src/query/process_tracker.rs index 4962f45db6..b4e8901f88 100644 --- a/crates/storage_models/src/query/process_tracker.rs +++ b/crates/storage_models/src/query/process_tracker.rs @@ -1,12 +1,5 @@ -use async_bb8_diesel::AsyncRunQueryDsl; -use diesel::{ - associations::HasTable, debug_query, pg::Pg, BoolExpressionMethods, ExpressionMethods, QueryDsl, -}; -use error_stack::{IntoReport, ResultExt}; -use router_env::{ - logger::debug, - tracing::{self, instrument}, -}; +use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods}; +use router_env::tracing::{self, instrument}; use time::PrimitiveDateTime; use super::generics::{self, ExecuteQuery}; @@ -57,14 +50,12 @@ impl ProcessTracker { conn: &PgPooledConn, task_ids: Vec, task_update: ProcessTrackerUpdate, - ) -> CustomResult, errors::DatabaseError> { - // TODO: Possible optimization: Instead of returning updated values from database, update - // the values in code and return them, if database query executed successfully. - generics::generic_update_with_results::<::Table, _, _, Self, _>( + ) -> CustomResult { + generics::generic_update::<::Table, _, _, _>( conn, dsl::id.eq_any(task_ids), ProcessTrackerUpdateInternal::from(task_update), - ExecuteQuery::new(), + ExecuteQuery::::new(), ) .await } @@ -99,29 +90,26 @@ impl ProcessTracker { .await } - // FIXME with generics - #[instrument(skip(pool))] + #[instrument(skip(conn))] pub async fn find_processes_to_clean( - pool: &PgPooledConn, + conn: &PgPooledConn, time_lower_limit: PrimitiveDateTime, time_upper_limit: PrimitiveDateTime, runner: &str, - limit: i64, + limit: u64, ) -> CustomResult, errors::DatabaseError> { - let query = Self::table() - .filter(dsl::schedule_time.between(time_lower_limit, time_upper_limit)) - .filter(dsl::status.eq(enums::ProcessTrackerStatus::ProcessStarted)) - .filter(dsl::runner.eq(runner.to_owned())) - .order(dsl::schedule_time.asc()) - .limit(limit); - debug!(query = %debug_query::(&query).to_string()); - - query - .get_results_async(pool) - .await - .into_report() - .change_context(errors::DatabaseError::NotFound) - .attach_printable_lazy(|| "Error finding processes to clean") + let mut x: Vec = generics::generic_filter::<::Table, _, _>( + conn, + dsl::schedule_time + .between(time_lower_limit, time_upper_limit) + .and(dsl::status.eq(enums::ProcessTrackerStatus::ProcessStarted)) + .and(dsl::runner.eq(runner.to_owned())), + None, + ) + .await?; + x.sort_by(|a, b| a.schedule_time.cmp(&b.schedule_time)); + x.truncate(limit as usize); + Ok(x) } #[instrument(skip(conn))] diff --git a/crates/storage_models/src/query/refund.rs b/crates/storage_models/src/query/refund.rs index 2ab12e637b..371dd972a4 100644 --- a/crates/storage_models/src/query/refund.rs +++ b/crates/storage_models/src/query/refund.rs @@ -9,7 +9,7 @@ use crate::{ CustomResult, PgPooledConn, }; -// FIXME: Find by partition key +// FIXME: Find by partition key : Review impl RefundNew { #[instrument(skip(conn))]