feat(payout_link): return total_count in filtered payouts list API response (#5538)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Kashif
2024-08-14 20:03:10 +05:30
committed by GitHub
parent 92a07cf5e4
commit 34f648e29b
11 changed files with 349 additions and 37 deletions

View File

@ -14253,7 +14253,14 @@
"type": "array", "type": "array",
"items": { "items": {
"$ref": "#/components/schemas/PayoutCreateResponse" "$ref": "#/components/schemas/PayoutCreateResponse"
} },
"description": "The list of payouts response objects"
},
"total_count": {
"type": "integer",
"format": "int64",
"description": "The total number of available payouts for given constraints",
"nullable": true
} }
} }
}, },

View File

@ -19129,7 +19129,14 @@
"type": "array", "type": "array",
"items": { "items": {
"$ref": "#/components/schemas/PayoutCreateResponse" "$ref": "#/components/schemas/PayoutCreateResponse"
} },
"description": "The list of payouts response objects"
},
"total_count": {
"type": "integer",
"format": "int64",
"description": "The total number of available payouts for given constraints",
"nullable": true
} }
} }
}, },

View File

@ -715,8 +715,11 @@ pub struct PayoutListFilterConstraints {
pub struct PayoutListResponse { pub struct PayoutListResponse {
/// The number of payouts included in the list /// The number of payouts included in the list
pub size: usize, pub size: usize,
// The list of payouts response objects /// The list of payouts response objects
pub data: Vec<PayoutCreateResponse>, pub data: Vec<PayoutCreateResponse>,
/// The total number of available payouts for given constraints
#[serde(skip_serializing_if = "Option::is_none")]
pub total_count: Option<i64>,
} }
#[derive(Clone, Debug, serde::Serialize, ToSchema)] #[derive(Clone, Debug, serde::Serialize, ToSchema)]

View File

@ -147,7 +147,7 @@ impl PayoutAttempt {
Vec<enums::PayoutStatus>, Vec<enums::PayoutStatus>,
Vec<enums::PayoutType>, Vec<enums::PayoutType>,
)> { )> {
let active_attempts: Vec<String> = payouts let active_attempt_ids = payouts
.iter() .iter()
.map(|payout| { .map(|payout| {
format!( format!(
@ -156,11 +156,20 @@ impl PayoutAttempt {
payout.attempt_count.clone() payout.attempt_count.clone()
) )
}) })
.collect(); .collect::<Vec<String>>();
let active_payout_ids = payouts
.iter()
.map(|payout| payout.payout_id.clone())
.collect::<Vec<String>>();
let filter = <Self as HasTable>::table() let filter = <Self as HasTable>::table()
.filter(dsl::merchant_id.eq(merchant_id.to_owned())) .filter(dsl::merchant_id.eq(merchant_id.to_owned()))
.filter(dsl::payout_attempt_id.eq_any(active_attempts)); .filter(dsl::payout_attempt_id.eq_any(active_attempt_ids));
let payouts_filter = <Payouts as HasTable>::table()
.filter(payout_dsl::merchant_id.eq(merchant_id.to_owned()))
.filter(payout_dsl::payout_id.eq_any(active_payout_ids));
let payout_status: Vec<enums::PayoutStatus> = payouts let payout_status: Vec<enums::PayoutStatus> = payouts
.iter() .iter()
@ -181,7 +190,8 @@ impl PayoutAttempt {
.flatten() .flatten()
.collect::<Vec<String>>(); .collect::<Vec<String>>();
let filter_currency = <Payouts as HasTable>::table() let filter_currency = payouts_filter
.clone()
.select(payout_dsl::destination_currency) .select(payout_dsl::destination_currency)
.distinct() .distinct()
.get_results_async::<enums::Currency>(conn) .get_results_async::<enums::Currency>(conn)
@ -191,7 +201,8 @@ impl PayoutAttempt {
.into_iter() .into_iter()
.collect::<Vec<enums::Currency>>(); .collect::<Vec<enums::Currency>>();
let filter_payout_method = Payouts::table() let filter_payout_method = payouts_filter
.clone()
.select(payout_dsl::payout_type) .select(payout_dsl::payout_type)
.distinct() .distinct()
.get_results_async::<Option<enums::PayoutType>>(conn) .get_results_async::<Option<enums::PayoutType>>(conn)

View File

@ -1,11 +1,16 @@
use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods}; use async_bb8_diesel::AsyncRunQueryDsl;
use error_stack::report; use diesel::{
associations::HasTable, debug_query, pg::Pg, BoolExpressionMethods, ExpressionMethods,
JoinOnDsl, QueryDsl,
};
use error_stack::{report, ResultExt};
use super::generics; use super::generics;
use crate::{ use crate::{
errors, enums, errors,
payouts::{Payouts, PayoutsNew, PayoutsUpdate, PayoutsUpdateInternal}, payouts::{Payouts, PayoutsNew, PayoutsUpdate, PayoutsUpdateInternal},
schema::payouts::dsl, query::generics::db_metrics,
schema::{payout_attempt, payouts::dsl},
PgPooledConn, StorageResult, PgPooledConn, StorageResult,
}; };
@ -87,4 +92,43 @@ impl Payouts {
) )
.await .await
} }
pub async fn get_total_count_of_payouts(
conn: &PgPooledConn,
merchant_id: &common_utils::id_type::MerchantId,
active_payout_ids: &[String],
connector: Option<Vec<String>>,
currency: Option<Vec<enums::Currency>>,
status: Option<Vec<enums::PayoutStatus>>,
payout_type: Option<Vec<enums::PayoutType>>,
) -> StorageResult<i64> {
let mut filter = <Self as HasTable>::table()
.inner_join(payout_attempt::table.on(payout_attempt::dsl::payout_id.eq(dsl::payout_id)))
.count()
.filter(dsl::merchant_id.eq(merchant_id.to_owned()))
.filter(dsl::payout_id.eq_any(active_payout_ids.to_owned()))
.into_boxed();
if let Some(connector) = connector {
filter = filter.filter(payout_attempt::dsl::connector.eq_any(connector));
}
if let Some(currency) = currency {
filter = filter.filter(dsl::destination_currency.eq_any(currency));
}
if let Some(status) = status {
filter = filter.filter(dsl::status.eq_any(status));
}
if let Some(payout_type) = payout_type {
filter = filter.filter(dsl::payout_type.eq_any(payout_type));
}
router_env::logger::debug!(query = %debug_query::<Pg, _>(&filter).to_string());
db_metrics::track_database_call::<<Self as HasTable>::Table, _, _>(
filter.get_result_async::<i64>(conn),
db_metrics::DatabaseOperation::Filter,
)
.await
.change_context(errors::DatabaseError::Others)
.attach_printable("Error filtering count of payouts")
}
} }

View File

@ -41,9 +41,9 @@ pub trait PayoutAttemptInterface {
async fn get_filters_for_payouts( async fn get_filters_for_payouts(
&self, &self,
payout: &[Payouts], _payout: &[Payouts],
merchant_id: &id_type::MerchantId, _merchant_id: &id_type::MerchantId,
storage_scheme: MerchantStorageScheme, _storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PayoutListFilters, errors::StorageError>; ) -> error_stack::Result<PayoutListFilters, errors::StorageError>;
} }

View File

@ -61,10 +61,29 @@ pub trait PayoutsInterface {
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
async fn filter_payouts_by_time_range_constraints( async fn filter_payouts_by_time_range_constraints(
&self, &self,
merchant_id: &id_type::MerchantId, _merchant_id: &id_type::MerchantId,
time_range: &api_models::payments::TimeRange, _time_range: &api_models::payments::TimeRange,
storage_scheme: MerchantStorageScheme, _storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<Payouts>, errors::StorageError>; ) -> error_stack::Result<Vec<Payouts>, errors::StorageError>;
#[cfg(feature = "olap")]
#[allow(clippy::too_many_arguments)]
async fn get_total_count_of_filtered_payouts(
&self,
_merchant_id: &id_type::MerchantId,
_active_payout_ids: &[String],
_connector: Option<Vec<api_models::enums::PayoutConnectors>>,
_currency: Option<Vec<storage_enums::Currency>>,
_status: Option<Vec<storage_enums::PayoutStatus>>,
_payout_method: Option<Vec<storage_enums::PayoutType>>,
) -> error_stack::Result<i64, errors::StorageError>;
#[cfg(feature = "olap")]
async fn filter_active_payout_ids_by_constraints(
&self,
_merchant_id: &id_type::MerchantId,
_constraints: &PayoutFetchConstraints,
) -> error_stack::Result<Vec<String>, errors::StorageError>;
} }
#[derive(Clone, Debug, Eq, PartialEq)] #[derive(Clone, Debug, Eq, PartialEq)]

View File

@ -862,6 +862,7 @@ pub async fn payouts_list_core(
api::PayoutListResponse { api::PayoutListResponse {
size: data.len(), size: data.len(),
data, data,
total_count: None,
}, },
)) ))
} }
@ -877,6 +878,7 @@ pub async fn payouts_filtered_list_core(
let limit = &filters.limit; let limit = &filters.limit;
validator::validate_payout_list_request_for_joins(*limit)?; validator::validate_payout_list_request_for_joins(*limit)?;
let db = state.store.as_ref(); let db = state.store.as_ref();
let constraints = filters.clone().into();
let list: Vec<( let list: Vec<(
storage::Payouts, storage::Payouts,
storage::PayoutAttempt, storage::PayoutAttempt,
@ -884,7 +886,7 @@ pub async fn payouts_filtered_list_core(
)> = db )> = db
.filter_payouts_and_attempts( .filter_payouts_and_attempts(
merchant_account.get_id(), merchant_account.get_id(),
&filters.clone().into(), &constraints,
merchant_account.storage_scheme, merchant_account.storage_scheme,
) )
.await .await
@ -915,10 +917,35 @@ pub async fn payouts_filtered_list_core(
.map(ForeignFrom::foreign_from) .map(ForeignFrom::foreign_from)
.collect(); .collect();
let active_payout_ids = db
.filter_active_payout_ids_by_constraints(merchant_account.get_id(), &constraints)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to filter active payout ids based on the constraints")?;
let total_count = db
.get_total_count_of_filtered_payouts(
merchant_account.get_id(),
&active_payout_ids,
filters.connector.clone(),
filters.currency.clone(),
filters.status.clone(),
filters.payout_method.clone(),
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable_lazy(|| {
format!(
"Failed to fetch total count of filtered payouts for the given constraints - {:?}",
filters
)
})?;
Ok(services::ApplicationResponse::Json( Ok(services::ApplicationResponse::Json(
api::PayoutListResponse { api::PayoutListResponse {
size: data.len(), size: data.len(),
data, data,
total_count: Some(total_count),
}, },
)) ))
} }

View File

@ -1975,6 +1975,39 @@ impl PayoutsInterface for KafkaStore {
.filter_payouts_by_time_range_constraints(merchant_id, time_range, storage_scheme) .filter_payouts_by_time_range_constraints(merchant_id, time_range, storage_scheme)
.await .await
} }
#[cfg(feature = "olap")]
async fn get_total_count_of_filtered_payouts(
&self,
merchant_id: &id_type::MerchantId,
active_payout_ids: &[String],
connector: Option<Vec<api_models::enums::PayoutConnectors>>,
currency: Option<Vec<enums::Currency>>,
status: Option<Vec<enums::PayoutStatus>>,
payout_method: Option<Vec<enums::PayoutType>>,
) -> CustomResult<i64, errors::DataStorageError> {
self.diesel_store
.get_total_count_of_filtered_payouts(
merchant_id,
active_payout_ids,
connector,
currency,
status,
payout_method,
)
.await
}
#[cfg(feature = "olap")]
async fn filter_active_payout_ids_by_constraints(
&self,
merchant_id: &id_type::MerchantId,
constraints: &hyperswitch_domain_models::payouts::PayoutFetchConstraints,
) -> CustomResult<Vec<String>, errors::DataStorageError> {
self.diesel_store
.filter_active_payout_ids_by_constraints(merchant_id, constraints)
.await
}
} }
#[async_trait::async_trait] #[async_trait::async_trait]

View File

@ -85,4 +85,28 @@ impl PayoutsInterface for MockDb {
// TODO: Implement function for `MockDb` // TODO: Implement function for `MockDb`
Err(StorageError::MockDbError)? Err(StorageError::MockDbError)?
} }
#[cfg(feature = "olap")]
async fn get_total_count_of_filtered_payouts(
&self,
_merchant_id: &common_utils::id_type::MerchantId,
_active_payout_ids: &[String],
_connector: Option<Vec<api_models::enums::PayoutConnectors>>,
_currency: Option<Vec<storage_enums::Currency>>,
_status: Option<Vec<storage_enums::PayoutStatus>>,
_payout_method: Option<Vec<storage_enums::PayoutType>>,
) -> CustomResult<i64, StorageError> {
// TODO: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
#[cfg(feature = "olap")]
async fn filter_active_payout_ids_by_constraints(
&self,
_merchant_id: &common_utils::id_type::MerchantId,
_constraints: &hyperswitch_domain_models::payouts::PayoutFetchConstraints,
) -> CustomResult<Vec<String>, StorageError> {
// TODO: Implement function for `MockDb`
Err(StorageError::MockDbError)?
}
} }

View File

@ -1,5 +1,9 @@
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
use api_models::enums::PayoutConnectors;
#[cfg(feature = "olap")]
use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl}; use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl};
#[cfg(feature = "olap")]
use common_utils::errors::ReportSwitchExt;
use common_utils::ext_traits::Encode; use common_utils::ext_traits::Encode;
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
use diesel::{associations::HasTable, ExpressionMethods, QueryDsl}; use diesel::{associations::HasTable, ExpressionMethods, QueryDsl};
@ -11,7 +15,7 @@ use diesel::{associations::HasTable, ExpressionMethods, QueryDsl};
use diesel::{JoinOnDsl, NullableExpressionMethods}; use diesel::{JoinOnDsl, NullableExpressionMethods};
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
use diesel_models::{ use diesel_models::{
customers::Customer as DieselCustomer, query::generics::db_metrics, customers::Customer as DieselCustomer, enums as storage_enums, query::generics::db_metrics,
schema::payouts::dsl as po_dsl, schema::payouts::dsl as po_dsl,
}; };
use diesel_models::{ use diesel_models::{
@ -348,6 +352,39 @@ impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {
.filter_payouts_by_time_range_constraints(merchant_id, time_range, storage_scheme) .filter_payouts_by_time_range_constraints(merchant_id, time_range, storage_scheme)
.await .await
} }
#[cfg(feature = "olap")]
async fn get_total_count_of_filtered_payouts(
&self,
merchant_id: &common_utils::id_type::MerchantId,
active_payout_ids: &[String],
connector: Option<Vec<PayoutConnectors>>,
currency: Option<Vec<storage_enums::Currency>>,
status: Option<Vec<storage_enums::PayoutStatus>>,
payout_method: Option<Vec<storage_enums::PayoutType>>,
) -> error_stack::Result<i64, StorageError> {
self.router_store
.get_total_count_of_filtered_payouts(
merchant_id,
active_payout_ids,
connector,
currency,
status,
payout_method,
)
.await
}
#[cfg(feature = "olap")]
async fn filter_active_payout_ids_by_constraints(
&self,
merchant_id: &common_utils::id_type::MerchantId,
constraints: &PayoutFetchConstraints,
) -> error_stack::Result<Vec<String>, StorageError> {
self.router_store
.filter_active_payout_ids_by_constraints(merchant_id, constraints)
.await
}
} }
#[async_trait::async_trait] #[async_trait::async_trait]
@ -431,8 +468,6 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
filters: &PayoutFetchConstraints, filters: &PayoutFetchConstraints,
storage_scheme: MerchantStorageScheme, storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<Payouts>, StorageError> { ) -> error_stack::Result<Vec<Payouts>, StorageError> {
use common_utils::errors::ReportSwitchExt;
let conn = connection::pg_connection_read(self).await.switch()?; let conn = connection::pg_connection_read(self).await.switch()?;
let conn = async_bb8_diesel::Connection::as_async_conn(&conn); let conn = async_bb8_diesel::Connection::as_async_conn(&conn);
@ -495,18 +530,6 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
query = query.offset(params.offset.into()); query = query.offset(params.offset.into());
query = match &params.currency {
Some(currency) => {
query.filter(po_dsl::destination_currency.eq_any(currency.clone()))
}
None => query,
};
query = match &params.status {
Some(status) => query.filter(po_dsl::status.eq_any(status.clone())),
None => query,
};
if let Some(currency) = &params.currency { if let Some(currency) = &params.currency {
query = query.filter(po_dsl::destination_currency.eq_any(currency.clone())); query = query.filter(po_dsl::destination_currency.eq_any(currency.clone()));
} }
@ -552,8 +575,6 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
storage_scheme: MerchantStorageScheme, storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<(Payouts, PayoutAttempt, Option<DieselCustomer>)>, StorageError> ) -> error_stack::Result<Vec<(Payouts, PayoutAttempt, Option<DieselCustomer>)>, StorageError>
{ {
use common_utils::errors::ReportSwitchExt;
let conn = connection::pg_connection_read(self).await.switch()?; let conn = connection::pg_connection_read(self).await.switch()?;
let conn = async_bb8_diesel::Connection::as_async_conn(&conn); let conn = async_bb8_diesel::Connection::as_async_conn(&conn);
let mut query = DieselPayouts::table() let mut query = DieselPayouts::table()
@ -561,7 +582,7 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
diesel_models::schema::payout_attempt::table diesel_models::schema::payout_attempt::table
.on(poa_dsl::payout_id.eq(po_dsl::payout_id)), .on(poa_dsl::payout_id.eq(po_dsl::payout_id)),
) )
.inner_join( .left_join(
diesel_models::schema::customers::table diesel_models::schema::customers::table
.on(cust_dsl::customer_id.nullable().eq(po_dsl::customer_id)), .on(cust_dsl::customer_id.nullable().eq(po_dsl::customer_id)),
) )
@ -704,6 +725,122 @@ impl<T: DatabaseStore> PayoutsInterface for crate::RouterStore<T> {
self.filter_payouts_by_constraints(merchant_id, &payout_filters, storage_scheme) self.filter_payouts_by_constraints(merchant_id, &payout_filters, storage_scheme)
.await .await
} }
#[cfg(feature = "olap")]
#[instrument(skip_all)]
async fn get_total_count_of_filtered_payouts(
&self,
merchant_id: &common_utils::id_type::MerchantId,
active_payout_ids: &[String],
connector: Option<Vec<PayoutConnectors>>,
currency: Option<Vec<storage_enums::Currency>>,
status: Option<Vec<storage_enums::PayoutStatus>>,
payout_type: Option<Vec<storage_enums::PayoutType>>,
) -> error_stack::Result<i64, StorageError> {
let conn = self
.db_store
.get_replica_pool()
.get()
.await
.change_context(StorageError::DatabaseConnectionError)?;
let connector_strings = connector.as_ref().map(|connectors| {
connectors
.iter()
.map(|c| c.to_string())
.collect::<Vec<String>>()
});
DieselPayouts::get_total_count_of_payouts(
&conn,
merchant_id,
active_payout_ids,
connector_strings,
currency,
status,
payout_type,
)
.await
.map_err(|er| {
let new_err = diesel_error_to_data_error(er.current_context());
er.change_context(new_err)
})
}
#[cfg(feature = "olap")]
#[instrument(skip_all)]
async fn filter_active_payout_ids_by_constraints(
&self,
merchant_id: &common_utils::id_type::MerchantId,
constraints: &PayoutFetchConstraints,
) -> error_stack::Result<Vec<String>, StorageError> {
let conn = connection::pg_connection_read(self).await.switch()?;
let conn = async_bb8_diesel::Connection::as_async_conn(&conn);
let mut query = DieselPayouts::table()
.inner_join(
diesel_models::schema::payout_attempt::table
.on(poa_dsl::payout_id.eq(po_dsl::payout_id)),
)
.left_join(
diesel_models::schema::customers::table
.on(cust_dsl::customer_id.nullable().eq(po_dsl::customer_id)),
)
.select(po_dsl::payout_id)
.filter(po_dsl::merchant_id.eq(merchant_id.to_owned()))
.order(po_dsl::created_at.desc())
.into_boxed();
query = match constraints {
PayoutFetchConstraints::Single { payout_id } => {
query.filter(po_dsl::payout_id.eq(payout_id.to_owned()))
}
PayoutFetchConstraints::List(params) => {
if let Some(customer_id) = &params.customer_id {
query = query.filter(po_dsl::customer_id.eq(customer_id.clone()));
}
if let Some(profile_id) = &params.profile_id {
query = query.filter(po_dsl::profile_id.eq(profile_id.clone()));
}
query = match params.starting_at {
Some(starting_at) => query.filter(po_dsl::created_at.ge(starting_at)),
None => query,
};
query = match params.ending_at {
Some(ending_at) => query.filter(po_dsl::created_at.le(ending_at)),
None => query,
};
query = match &params.currency {
Some(currency) => {
query.filter(po_dsl::destination_currency.eq_any(currency.clone()))
}
None => query,
};
query = match &params.status {
Some(status) => query.filter(po_dsl::status.eq_any(status.clone())),
None => query,
};
query
}
};
logger::debug!(filter = %diesel::debug_query::<diesel::pg::Pg,_>(&query).to_string());
db_metrics::track_database_call::<<DieselPayouts as HasTable>::Table, _, _>(
query.get_results_async::<String>(conn),
db_metrics::DatabaseOperation::Filter,
)
.await
.map_err(|er| {
StorageError::DatabaseError(
error_stack::report!(diesel_models::errors::DatabaseError::from(er))
.attach_printable("Error filtering payout records"),
)
.into()
})
}
} }
impl DataModelExt for Payouts { impl DataModelExt for Payouts {