mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-11-01 19:42:27 +08:00
feat(payment_attempt): add kv for find last successful attempt (#2206)
This commit is contained in:
@ -14,7 +14,7 @@ use crate::{
|
|||||||
|
|
||||||
pub async fn get_or_populate_redis<T, F, Fut>(
|
pub async fn get_or_populate_redis<T, F, Fut>(
|
||||||
store: &dyn StorageInterface,
|
store: &dyn StorageInterface,
|
||||||
key: &str,
|
key: impl AsRef<str>,
|
||||||
fun: F,
|
fun: F,
|
||||||
) -> CustomResult<T, errors::StorageError>
|
) -> CustomResult<T, errors::StorageError>
|
||||||
where
|
where
|
||||||
@ -23,6 +23,7 @@ where
|
|||||||
Fut: futures::Future<Output = CustomResult<T, errors::StorageError>> + Send,
|
Fut: futures::Future<Output = CustomResult<T, errors::StorageError>> + Send,
|
||||||
{
|
{
|
||||||
let type_name = std::any::type_name::<T>();
|
let type_name = std::any::type_name::<T>();
|
||||||
|
let key = key.as_ref();
|
||||||
let redis = &store
|
let redis = &store
|
||||||
.get_redis_conn()
|
.get_redis_conn()
|
||||||
.change_context(errors::StorageError::RedisError(
|
.change_context(errors::StorageError::RedisError(
|
||||||
|
|||||||
@ -40,7 +40,7 @@ impl ReverseLookupInterface for Store {
|
|||||||
.map_err(Into::into)
|
.map_err(Into::into)
|
||||||
.into_report()
|
.into_report()
|
||||||
};
|
};
|
||||||
cache::get_or_populate_redis(self, id, database_call).await
|
cache::get_or_populate_redis(self, format!("reverse_lookup_{id}"), database_call).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -50,7 +50,7 @@ impl<T: DatabaseStore> ReverseLookupInterface for RouterStore<T> {
|
|||||||
er.change_context(new_err)
|
er.change_context(new_err)
|
||||||
})
|
})
|
||||||
};
|
};
|
||||||
get_or_populate_redis(self, id, database_call).await
|
get_or_populate_redis(self, format!("reverse_lookup_{id}"), database_call).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -26,6 +26,7 @@ use redis_interface::HsetnxReply;
|
|||||||
use router_env::{instrument, tracing};
|
use router_env::{instrument, tracing};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
diesel_error_to_data_error,
|
||||||
lookup::ReverseLookupInterface,
|
lookup::ReverseLookupInterface,
|
||||||
redis::kv_store::{PartitionKey, RedisConnInterface},
|
redis::kv_store::{PartitionKey, RedisConnInterface},
|
||||||
utils::{
|
utils::{
|
||||||
@ -49,7 +50,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for RouterStore<T> {
|
|||||||
.insert(&conn)
|
.insert(&conn)
|
||||||
.await
|
.await
|
||||||
.map_err(|er| {
|
.map_err(|er| {
|
||||||
let new_err = crate::diesel_error_to_data_error(er.current_context());
|
let new_err = diesel_error_to_data_error(er.current_context());
|
||||||
er.change_context(new_err)
|
er.change_context(new_err)
|
||||||
})
|
})
|
||||||
.map(PaymentAttempt::from_storage_model)
|
.map(PaymentAttempt::from_storage_model)
|
||||||
@ -67,7 +68,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for RouterStore<T> {
|
|||||||
.update_with_attempt_id(&conn, payment_attempt.to_storage_model())
|
.update_with_attempt_id(&conn, payment_attempt.to_storage_model())
|
||||||
.await
|
.await
|
||||||
.map_err(|er| {
|
.map_err(|er| {
|
||||||
let new_err = crate::diesel_error_to_data_error(er.current_context());
|
let new_err = diesel_error_to_data_error(er.current_context());
|
||||||
er.change_context(new_err)
|
er.change_context(new_err)
|
||||||
})
|
})
|
||||||
.map(PaymentAttempt::from_storage_model)
|
.map(PaymentAttempt::from_storage_model)
|
||||||
@ -89,7 +90,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for RouterStore<T> {
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(|er| {
|
.map_err(|er| {
|
||||||
let new_err = crate::diesel_error_to_data_error(er.current_context());
|
let new_err = diesel_error_to_data_error(er.current_context());
|
||||||
er.change_context(new_err)
|
er.change_context(new_err)
|
||||||
})
|
})
|
||||||
.map(PaymentAttempt::from_storage_model)
|
.map(PaymentAttempt::from_storage_model)
|
||||||
@ -109,7 +110,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for RouterStore<T> {
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(|er| {
|
.map_err(|er| {
|
||||||
let new_err = crate::diesel_error_to_data_error(er.current_context());
|
let new_err = diesel_error_to_data_error(er.current_context());
|
||||||
er.change_context(new_err)
|
er.change_context(new_err)
|
||||||
})
|
})
|
||||||
.map(PaymentAttempt::from_storage_model)
|
.map(PaymentAttempt::from_storage_model)
|
||||||
@ -129,7 +130,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for RouterStore<T> {
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(|er| {
|
.map_err(|er| {
|
||||||
let new_err = crate::diesel_error_to_data_error(er.current_context());
|
let new_err = diesel_error_to_data_error(er.current_context());
|
||||||
er.change_context(new_err)
|
er.change_context(new_err)
|
||||||
})
|
})
|
||||||
.map(PaymentAttempt::from_storage_model)
|
.map(PaymentAttempt::from_storage_model)
|
||||||
@ -153,7 +154,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for RouterStore<T> {
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(|er| {
|
.map_err(|er| {
|
||||||
let new_err = crate::diesel_error_to_data_error(er.current_context());
|
let new_err = diesel_error_to_data_error(er.current_context());
|
||||||
er.change_context(new_err)
|
er.change_context(new_err)
|
||||||
})
|
})
|
||||||
.map(PaymentAttempt::from_storage_model)
|
.map(PaymentAttempt::from_storage_model)
|
||||||
@ -174,7 +175,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for RouterStore<T> {
|
|||||||
DieselPaymentAttempt::get_filters_for_payments(&conn, intents.as_slice(), merchant_id)
|
DieselPaymentAttempt::get_filters_for_payments(&conn, intents.as_slice(), merchant_id)
|
||||||
.await
|
.await
|
||||||
.map_err(|er| {
|
.map_err(|er| {
|
||||||
let new_err = crate::diesel_error_to_data_error(er.current_context());
|
let new_err = diesel_error_to_data_error(er.current_context());
|
||||||
er.change_context(new_err)
|
er.change_context(new_err)
|
||||||
})
|
})
|
||||||
.map(
|
.map(
|
||||||
@ -202,7 +203,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for RouterStore<T> {
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(|er| {
|
.map_err(|er| {
|
||||||
let new_err = crate::diesel_error_to_data_error(er.current_context());
|
let new_err = diesel_error_to_data_error(er.current_context());
|
||||||
er.change_context(new_err)
|
er.change_context(new_err)
|
||||||
})
|
})
|
||||||
.map(PaymentAttempt::from_storage_model)
|
.map(PaymentAttempt::from_storage_model)
|
||||||
@ -218,7 +219,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for RouterStore<T> {
|
|||||||
DieselPaymentAttempt::find_by_merchant_id_payment_id(&conn, merchant_id, payment_id)
|
DieselPaymentAttempt::find_by_merchant_id_payment_id(&conn, merchant_id, payment_id)
|
||||||
.await
|
.await
|
||||||
.map_err(|er| {
|
.map_err(|er| {
|
||||||
let new_err = crate::diesel_error_to_data_error(er.current_context());
|
let new_err = diesel_error_to_data_error(er.current_context());
|
||||||
er.change_context(new_err)
|
er.change_context(new_err)
|
||||||
})
|
})
|
||||||
.map(|a| {
|
.map(|a| {
|
||||||
@ -239,7 +240,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for RouterStore<T> {
|
|||||||
DieselPaymentAttempt::find_by_merchant_id_attempt_id(&conn, merchant_id, attempt_id)
|
DieselPaymentAttempt::find_by_merchant_id_attempt_id(&conn, merchant_id, attempt_id)
|
||||||
.await
|
.await
|
||||||
.map_err(|er| {
|
.map_err(|er| {
|
||||||
let new_err = crate::diesel_error_to_data_error(er.current_context());
|
let new_err = diesel_error_to_data_error(er.current_context());
|
||||||
er.change_context(new_err)
|
er.change_context(new_err)
|
||||||
})
|
})
|
||||||
.map(PaymentAttempt::from_storage_model)
|
.map(PaymentAttempt::from_storage_model)
|
||||||
@ -275,7 +276,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for RouterStore<T> {
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(|er| {
|
.map_err(|er| {
|
||||||
let new_err = crate::diesel_error_to_data_error(er.current_context());
|
let new_err = diesel_error_to_data_error(er.current_context());
|
||||||
er.change_context(new_err)
|
er.change_context(new_err)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -380,7 +381,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
|
|||||||
.insert(&conn)
|
.insert(&conn)
|
||||||
.await
|
.await
|
||||||
.map_err(|er| {
|
.map_err(|er| {
|
||||||
let new_err = crate::diesel_error_to_data_error(er.current_context());
|
let new_err = diesel_error_to_data_error(er.current_context());
|
||||||
er.change_context(new_err)
|
er.change_context(new_err)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
@ -570,13 +571,41 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
|
|||||||
merchant_id: &str,
|
merchant_id: &str,
|
||||||
storage_scheme: MerchantStorageScheme,
|
storage_scheme: MerchantStorageScheme,
|
||||||
) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
|
) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
|
||||||
self.router_store
|
let database_call = || {
|
||||||
.find_payment_attempt_last_successful_attempt_by_payment_id_merchant_id(
|
self.router_store
|
||||||
payment_id,
|
.find_payment_attempt_last_successful_attempt_by_payment_id_merchant_id(
|
||||||
merchant_id,
|
payment_id,
|
||||||
storage_scheme,
|
merchant_id,
|
||||||
)
|
storage_scheme,
|
||||||
.await
|
)
|
||||||
|
};
|
||||||
|
match storage_scheme {
|
||||||
|
MerchantStorageScheme::PostgresOnly => database_call().await,
|
||||||
|
MerchantStorageScheme::RedisKv => {
|
||||||
|
let key = format!("{merchant_id}_{payment_id}");
|
||||||
|
let pattern = "pa_*";
|
||||||
|
let redis_conn = self
|
||||||
|
.get_redis_conn()
|
||||||
|
.change_context(errors::StorageError::KVError)?;
|
||||||
|
|
||||||
|
let redis_fut = async {
|
||||||
|
redis_conn
|
||||||
|
.hscan_and_deserialize::<PaymentAttempt>(&key, pattern, None)
|
||||||
|
.await
|
||||||
|
.and_then(|mut payment_attempts| {
|
||||||
|
payment_attempts.sort_by(|a, b| b.modified_at.cmp(&a.modified_at));
|
||||||
|
payment_attempts
|
||||||
|
.iter()
|
||||||
|
.find(|&pa| pa.status == api_models::enums::AttemptStatus::Charged)
|
||||||
|
.cloned()
|
||||||
|
.ok_or(error_stack::report!(
|
||||||
|
redis_interface::errors::RedisError::NotFound
|
||||||
|
))
|
||||||
|
})
|
||||||
|
};
|
||||||
|
try_redis_get_else_try_database_get(redis_fut, database_call).await
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn find_payment_attempt_by_merchant_id_connector_txn_id(
|
async fn find_payment_attempt_by_merchant_id_connector_txn_id(
|
||||||
@ -1463,7 +1492,7 @@ async fn add_connector_txn_id_to_reverse_lookup<T: DatabaseStore>(
|
|||||||
.insert(&conn)
|
.insert(&conn)
|
||||||
.await
|
.await
|
||||||
.map_err(|err| {
|
.map_err(|err| {
|
||||||
let new_err = crate::diesel_error_to_data_error(err.current_context());
|
let new_err = diesel_error_to_data_error(err.current_context());
|
||||||
err.change_context(new_err)
|
err.change_context(new_err)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -1487,7 +1516,7 @@ async fn add_preprocessing_id_to_reverse_lookup<T: DatabaseStore>(
|
|||||||
.insert(&conn)
|
.insert(&conn)
|
||||||
.await
|
.await
|
||||||
.map_err(|er| {
|
.map_err(|er| {
|
||||||
let new_err = crate::diesel_error_to_data_error(er.current_context());
|
let new_err = diesel_error_to_data_error(er.current_context());
|
||||||
er.change_context(new_err)
|
er.change_context(new_err)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@ -138,7 +138,7 @@ impl Cache {
|
|||||||
|
|
||||||
pub async fn get_or_populate_redis<T, F, Fut>(
|
pub async fn get_or_populate_redis<T, F, Fut>(
|
||||||
store: &(dyn RedisConnInterface + Send + Sync),
|
store: &(dyn RedisConnInterface + Send + Sync),
|
||||||
key: &str,
|
key: impl AsRef<str>,
|
||||||
fun: F,
|
fun: F,
|
||||||
) -> CustomResult<T, StorageError>
|
) -> CustomResult<T, StorageError>
|
||||||
where
|
where
|
||||||
@ -147,6 +147,7 @@ where
|
|||||||
Fut: futures::Future<Output = CustomResult<T, StorageError>> + Send,
|
Fut: futures::Future<Output = CustomResult<T, StorageError>> + Send,
|
||||||
{
|
{
|
||||||
let type_name = std::any::type_name::<T>();
|
let type_name = std::any::type_name::<T>();
|
||||||
|
let key = key.as_ref();
|
||||||
let redis = &store
|
let redis = &store
|
||||||
.get_redis_conn()
|
.get_redis_conn()
|
||||||
.map_err(|er| {
|
.map_err(|er| {
|
||||||
|
|||||||
Reference in New Issue
Block a user