fix: resolve TODO comments in storage models crate (#151)

This commit is contained in:
Nishant Joshi
2022-12-15 19:10:02 +05:30
committed by GitHub
parent d36ae26460
commit bc2f13ab07
13 changed files with 78 additions and 102 deletions

View File

@ -24,12 +24,20 @@ pub mod validation;
/// Date-time utilities. /// Date-time utilities.
pub mod date_time { pub mod date_time {
use time::{OffsetDateTime, PrimitiveDateTime}; use time::{OffsetDateTime, PrimitiveDateTime};
/// Struct to represent milliseconds in time sensitive data fields
#[derive(Debug)]
pub struct Milliseconds(i32);
/// Create a new [`PrimitiveDateTime`] with the current date and time in UTC. /// Create a new [`PrimitiveDateTime`] with the current date and time in UTC.
pub fn now() -> PrimitiveDateTime { pub fn now() -> PrimitiveDateTime {
let utc_date_time = OffsetDateTime::now_utc(); let utc_date_time = OffsetDateTime::now_utc();
PrimitiveDateTime::new(utc_date_time.date(), utc_date_time.time()) PrimitiveDateTime::new(utc_date_time.date(), utc_date_time.time())
} }
/// Convert from OffsetDateTime to PrimitiveDateTime
pub fn convert_to_pdt(offset_time: OffsetDateTime) -> PrimitiveDateTime {
PrimitiveDateTime::new(offset_time.date(), offset_time.time())
}
} }
/// Generate a nanoid with the given prefix and length /// Generate a nanoid with the given prefix and length

View File

@ -37,7 +37,6 @@ Most fields are under `Option`. To simplify dealing with `Option`, use `expose_o
``` ```
<!-- FIXME: this table should either be generated by a script or smoke test should be introduced checking it agrees with actual structure -->
## Files Tree Layout ## Files Tree Layout

View File

@ -31,7 +31,7 @@ pub trait ProcessTrackerInterface {
&self, &self,
task_ids: Vec<String>, task_ids: Vec<String>,
task_update: storage::ProcessTrackerUpdate, task_update: storage::ProcessTrackerUpdate,
) -> CustomResult<Vec<storage::ProcessTracker>, errors::StorageError>; ) -> CustomResult<usize, errors::StorageError>;
async fn update_process_tracker( async fn update_process_tracker(
&self, &self,
this: storage::ProcessTracker, this: storage::ProcessTracker,
@ -136,7 +136,7 @@ impl ProcessTrackerInterface for Store {
&self, &self,
task_ids: Vec<String>, task_ids: Vec<String>,
task_update: storage::ProcessTrackerUpdate, task_update: storage::ProcessTrackerUpdate,
) -> CustomResult<Vec<storage::ProcessTracker>, errors::StorageError> { ) -> CustomResult<usize, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await; let conn = pg_connection(&self.master_pool).await;
storage::ProcessTracker::update_process_status_by_ids(&conn, task_ids, task_update) storage::ProcessTracker::update_process_status_by_ids(&conn, task_ids, task_update)
.await .await
@ -224,7 +224,7 @@ impl ProcessTrackerInterface for MockDb {
&self, &self,
_task_ids: Vec<String>, _task_ids: Vec<String>,
_task_update: storage::ProcessTrackerUpdate, _task_update: storage::ProcessTrackerUpdate,
) -> CustomResult<Vec<storage::ProcessTracker>, errors::StorageError> { ) -> CustomResult<usize, errors::StorageError> {
todo!() todo!()
} }
} }

View File

@ -132,7 +132,7 @@ pub async fn fetch_consumer_tasks(
) -> CustomResult<Vec<storage::ProcessTracker>, errors::ProcessTrackerError> { ) -> CustomResult<Vec<storage::ProcessTracker>, errors::ProcessTrackerError> {
let batches = pt_utils::get_batches(redis_conn, stream_name, group_name, consumer_name).await?; let batches = pt_utils::get_batches(redis_conn, stream_name, group_name, consumer_name).await?;
let tasks = batches.into_iter().fold(Vec::new(), |mut acc, batch| { let mut tasks = batches.into_iter().fold(Vec::new(), |mut acc, batch| {
acc.extend_from_slice( acc.extend_from_slice(
batch batch
.trackers .trackers
@ -148,18 +148,19 @@ pub async fn fetch_consumer_tasks(
.map(|task| task.id.to_owned()) .map(|task| task.id.to_owned())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let updated_tasks = db db.process_tracker_update_process_status_by_ids(
.process_tracker_update_process_status_by_ids( task_ids,
task_ids, storage::ProcessTrackerUpdate::StatusUpdate {
storage::ProcessTrackerUpdate::StatusUpdate { status: enums::ProcessTrackerStatus::ProcessStarted,
status: enums::ProcessTrackerStatus::ProcessStarted, business_status: None,
business_status: None, },
}, )
) .await
.await .change_context(errors::ProcessTrackerError::ProcessFetchingFailed)?;
.change_context(errors::ProcessTrackerError::ProcessFetchingFailed)?; tasks
.iter_mut()
Ok(updated_tasks) .for_each(|x| x.status = enums::ProcessTrackerStatus::ProcessStarted);
Ok(tasks)
} }
// Accept flow_options if required // Accept flow_options if required

View File

@ -49,7 +49,7 @@ pub async fn update_status_and_append(
.collect(); .collect();
match flow { match flow {
SchedulerFlow::Producer => { SchedulerFlow::Producer => {
let res = state state
.store .store
.process_tracker_update_process_status_by_ids( .process_tracker_update_process_status_by_ids(
process_ids, process_ids,
@ -58,18 +58,13 @@ pub async fn update_status_and_append(
business_status: None, business_status: None,
}, },
) )
.await; .await.map_or_else(|error| {
match res {
Ok(trackers) => {
let count = trackers.len();
logger::debug!("Updated status of {count} processes");
Ok(())
}
Err(error) => {
logger::error!(error=%error.current_context(),"Error while updating process status"); logger::error!(error=%error.current_context(),"Error while updating process status");
Err(error.change_context(errors::ProcessTrackerError::ProcessUpdateFailed)) Err(error.change_context(errors::ProcessTrackerError::ProcessUpdateFailed))
} }, |count| {
} logger::debug!("Updated status of {count} processes");
Ok(())
})
} }
SchedulerFlow::Cleaner => { SchedulerFlow::Cleaner => {
let res = state let res = state

View File

@ -1,7 +1,7 @@
use error_stack::ResultExt; use error_stack::ResultExt;
use serde::Serialize; use serde::Serialize;
pub use storage_models::process_tracker::{ pub use storage_models::process_tracker::{
Milliseconds, ProcessData, ProcessTracker, ProcessTrackerNew, ProcessTrackerUpdate, ProcessData, ProcessTracker, ProcessTrackerNew, ProcessTrackerUpdate,
ProcessTrackerUpdateInternal, SchedulerOptions, ProcessTrackerUpdateInternal, SchedulerOptions,
}; };
use time::PrimitiveDateTime; use time::PrimitiveDateTime;

View File

@ -22,7 +22,6 @@ pub fn sample() -> () {
## Files Tree Layout ## Files Tree Layout
<!-- FIXME: this table should either be generated by a script or smoke test should be introduced checking it agrees with actual structure -->
<!-- FIXME: fill missing --> <!-- FIXME: fill missing -->
```text ```text

View File

@ -1,4 +1,4 @@
use common_utils::{consts, custom_serde, generate_id}; use common_utils::{consts, custom_serde, date_time, generate_id};
use diesel::{AsChangeset, Identifiable, Insertable, Queryable}; use diesel::{AsChangeset, Identifiable, Insertable, Queryable};
use masking::Secret; use masking::Secret;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -115,17 +115,12 @@ impl From<AddressUpdate> for AddressUpdateInternal {
last_name, last_name,
phone_number, phone_number,
country_code, country_code,
modified_at: convert_to_pdt(OffsetDateTime::now_utc()), modified_at: date_time::convert_to_pdt(OffsetDateTime::now_utc()),
}, },
} }
} }
} }
// TODO: Create utils for this since cane be reused outside address
fn convert_to_pdt(offset_time: OffsetDateTime) -> PrimitiveDateTime {
PrimitiveDateTime::new(offset_time.date(), offset_time.time())
}
impl Default for AddressNew { impl Default for AddressNew {
fn default() -> Self { fn default() -> Self {
Self { Self {

View File

@ -25,7 +25,7 @@ pub struct PaymentIntent {
pub statement_descriptor_suffix: Option<String>, pub statement_descriptor_suffix: Option<String>,
pub created_at: PrimitiveDateTime, pub created_at: PrimitiveDateTime,
pub modified_at: PrimitiveDateTime, pub modified_at: PrimitiveDateTime,
pub last_synced: Option<PrimitiveDateTime>, // FIXME: this is optional pub last_synced: Option<PrimitiveDateTime>,
pub setup_future_usage: Option<storage_enums::FutureUsage>, pub setup_future_usage: Option<storage_enums::FutureUsage>,
pub off_session: Option<bool>, pub off_session: Option<bool>,
pub client_secret: Option<String>, pub client_secret: Option<String>,

View File

@ -142,12 +142,9 @@ impl From<ProcessTrackerUpdate> for ProcessTrackerUpdateInternal {
} }
} }
// TODO: Move this to a utility module?
pub struct Milliseconds(i32);
#[allow(dead_code)] #[allow(dead_code)]
pub struct SchedulerOptions { pub struct SchedulerOptions {
looper_interval: Milliseconds, looper_interval: common_utils::date_time::Milliseconds,
db_name: String, db_name: String,
cache_name: String, cache_name: String,
schema_name: String, schema_name: String,

View File

@ -1,12 +1,6 @@
use async_bb8_diesel::AsyncRunQueryDsl; use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods};
use diesel::{ use error_stack::IntoReport;
associations::HasTable, debug_query, pg::Pg, BoolExpressionMethods, ExpressionMethods, QueryDsl, use router_env::tracing::{self, instrument};
};
use error_stack::{IntoReport, ResultExt};
use router_env::{
logger::debug,
tracing::{self, instrument},
};
use super::generics::{self, ExecuteQuery, RawQuery, RawSqlQuery}; use super::generics::{self, ExecuteQuery, RawQuery, RawSqlQuery};
use crate::{ use crate::{
@ -121,29 +115,29 @@ impl PaymentAttempt {
.await .await
} }
// FIXME: Use generics
#[instrument(skip(conn))]
pub async fn find_last_successful_attempt_by_payment_id_merchant_id( pub async fn find_last_successful_attempt_by_payment_id_merchant_id(
conn: &PgPooledConn, conn: &PgPooledConn,
payment_id: &str, payment_id: &str,
merchant_id: &str, merchant_id: &str,
) -> CustomResult<Self, errors::DatabaseError> { ) -> CustomResult<Self, errors::DatabaseError> {
let query = Self::table() // perform ordering on the application level instead of database level
.filter( generics::generic_filter::<<Self as HasTable>::Table, _, Self>(
dsl::payment_id conn,
.eq(payment_id.to_owned()) dsl::payment_id
.and(dsl::merchant_id.eq(merchant_id.to_owned())) .eq(payment_id.to_owned())
.and(dsl::status.eq(enums::AttemptStatus::Charged)), .and(dsl::merchant_id.eq(merchant_id.to_owned()))
) .and(dsl::status.eq(enums::AttemptStatus::Charged)),
.order(dsl::created_at.desc()); None,
debug!(query = %debug_query::<Pg, _>(&query).to_string()); )
.await?
query .into_iter()
.get_result_async(conn) .fold(
.await Err(errors::DatabaseError::NotFound).into_report(),
.into_report() |acc, cur| match acc {
.change_context(errors::DatabaseError::NotFound) Ok(value) if value.created_at > cur.created_at => Ok(value),
.attach_printable("Error while finding last successful payment attempt") _ => Ok(cur),
},
)
} }
#[instrument(skip(conn))] #[instrument(skip(conn))]

View File

@ -1,12 +1,5 @@
use async_bb8_diesel::AsyncRunQueryDsl; use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods};
use diesel::{ use router_env::tracing::{self, instrument};
associations::HasTable, debug_query, pg::Pg, BoolExpressionMethods, ExpressionMethods, QueryDsl,
};
use error_stack::{IntoReport, ResultExt};
use router_env::{
logger::debug,
tracing::{self, instrument},
};
use time::PrimitiveDateTime; use time::PrimitiveDateTime;
use super::generics::{self, ExecuteQuery}; use super::generics::{self, ExecuteQuery};
@ -57,14 +50,12 @@ impl ProcessTracker {
conn: &PgPooledConn, conn: &PgPooledConn,
task_ids: Vec<String>, task_ids: Vec<String>,
task_update: ProcessTrackerUpdate, task_update: ProcessTrackerUpdate,
) -> CustomResult<Vec<Self>, errors::DatabaseError> { ) -> CustomResult<usize, errors::DatabaseError> {
// TODO: Possible optimization: Instead of returning updated values from database, update generics::generic_update::<<Self as HasTable>::Table, _, _, _>(
// the values in code and return them, if database query executed successfully.
generics::generic_update_with_results::<<Self as HasTable>::Table, _, _, Self, _>(
conn, conn,
dsl::id.eq_any(task_ids), dsl::id.eq_any(task_ids),
ProcessTrackerUpdateInternal::from(task_update), ProcessTrackerUpdateInternal::from(task_update),
ExecuteQuery::new(), ExecuteQuery::<Self>::new(),
) )
.await .await
} }
@ -99,29 +90,26 @@ impl ProcessTracker {
.await .await
} }
// FIXME with generics #[instrument(skip(conn))]
#[instrument(skip(pool))]
pub async fn find_processes_to_clean( pub async fn find_processes_to_clean(
pool: &PgPooledConn, conn: &PgPooledConn,
time_lower_limit: PrimitiveDateTime, time_lower_limit: PrimitiveDateTime,
time_upper_limit: PrimitiveDateTime, time_upper_limit: PrimitiveDateTime,
runner: &str, runner: &str,
limit: i64, limit: u64,
) -> CustomResult<Vec<Self>, errors::DatabaseError> { ) -> CustomResult<Vec<Self>, errors::DatabaseError> {
let query = Self::table() let mut x: Vec<Self> = generics::generic_filter::<<Self as HasTable>::Table, _, _>(
.filter(dsl::schedule_time.between(time_lower_limit, time_upper_limit)) conn,
.filter(dsl::status.eq(enums::ProcessTrackerStatus::ProcessStarted)) dsl::schedule_time
.filter(dsl::runner.eq(runner.to_owned())) .between(time_lower_limit, time_upper_limit)
.order(dsl::schedule_time.asc()) .and(dsl::status.eq(enums::ProcessTrackerStatus::ProcessStarted))
.limit(limit); .and(dsl::runner.eq(runner.to_owned())),
debug!(query = %debug_query::<Pg, _>(&query).to_string()); None,
)
query .await?;
.get_results_async(pool) x.sort_by(|a, b| a.schedule_time.cmp(&b.schedule_time));
.await x.truncate(limit as usize);
.into_report() Ok(x)
.change_context(errors::DatabaseError::NotFound)
.attach_printable_lazy(|| "Error finding processes to clean")
} }
#[instrument(skip(conn))] #[instrument(skip(conn))]

View File

@ -9,7 +9,7 @@ use crate::{
CustomResult, PgPooledConn, CustomResult, PgPooledConn,
}; };
// FIXME: Find by partition key // FIXME: Find by partition key : Review
impl RefundNew { impl RefundNew {
#[instrument(skip(conn))] #[instrument(skip(conn))]