fix: make push to drainer generic and add application metrics for KV (#2563)

This commit is contained in:
Kartikeya Hegde
2023-10-17 11:21:18 +00:00
committed by GitHub
parent cecea8718a
commit 274a78343e
11 changed files with 258 additions and 334 deletions

View File

@ -11,7 +11,7 @@ use redis_interface::SetnxReply;
use crate::{
diesel_error_to_data_error,
redis::kv_store::{kv_wrapper, KvOperation, PartitionKey},
redis::kv_store::{kv_wrapper, KvOperation},
utils::{self, try_redis_get_else_try_database_get},
DatabaseStore, KVRouterStore, RouterStore,
};
@ -84,31 +84,22 @@ impl<T: DatabaseStore> ReverseLookupInterface for KVRouterStore<T> {
pk_id: new.pk_id.clone(),
source: new.source.clone(),
};
let combination = &created_rev_lookup.pk_id;
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Insert {
insertable: kv::Insertable::ReverseLookUp(new),
},
};
match kv_wrapper::<DieselReverseLookup, _, _>(
self,
KvOperation::SetNx(&created_rev_lookup),
KvOperation::SetNx(&created_rev_lookup, redis_entry),
format!("reverse_lookup_{}", &created_rev_lookup.lookup_id),
)
.await
.change_context(errors::StorageError::KVError)?
.try_into_setnx()
{
Ok(SetnxReply::KeySet) => {
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Insert {
insertable: kv::Insertable::ReverseLookUp(new),
},
};
self.push_to_drainer_stream::<DieselReverseLookup>(
redis_entry,
PartitionKey::MerchantIdPaymentIdCombination { combination },
)
.await
.change_context(errors::StorageError::KVError)?;
Ok(created_rev_lookup)
}
Ok(SetnxReply::KeySet) => Ok(created_rev_lookup),
Ok(SetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue {
entity: "reverse_lookup",
key: Some(created_rev_lookup.lookup_id.clone()),

View File

@ -4,3 +4,7 @@ metrics_context!(CONTEXT);
global_meter!(GLOBAL_METER, "ROUTER_API");
counter_metric!(KV_MISS, GLOBAL_METER); // No. of KV misses
// Metrics for KV
counter_metric!(KV_OPERATION_SUCCESSFUL, GLOBAL_METER);
counter_metric!(KV_OPERATION_FAILED, GLOBAL_METER);

View File

@ -28,7 +28,7 @@ use router_env::{instrument, tracing};
use crate::{
diesel_error_to_data_error,
lookup::ReverseLookupInterface,
redis::kv_store::{kv_wrapper, KvOperation, PartitionKey},
redis::kv_store::{kv_wrapper, KvOperation},
utils::{pg_connection_read, pg_connection_write, try_redis_get_else_try_database_get},
DataModelExt, DatabaseStore, KVRouterStore, RouterStore,
};
@ -363,9 +363,21 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
let field = format!("pa_{}", created_attempt.attempt_id);
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Insert {
insertable: kv::Insertable::PaymentAttempt(
payment_attempt.to_storage_model(),
),
},
};
match kv_wrapper::<PaymentAttempt, _, _>(
self,
KvOperation::HSetNx(&field, &created_attempt),
KvOperation::HSetNx(
&field,
&created_attempt.clone().to_storage_model(),
redis_entry,
),
&key,
)
.await
@ -391,22 +403,6 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
self.insert_reverse_lookup(reverse_lookup, storage_scheme)
.await?;
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Insert {
insertable: kv::Insertable::PaymentAttempt(
payment_attempt.to_storage_model(),
),
},
};
self.push_to_drainer_stream::<DieselPaymentAttempt>(
redis_entry,
PartitionKey::MerchantIdPaymentId {
merchant_id: &created_attempt.merchant_id,
payment_id: &created_attempt.payment_id,
},
)
.await
.change_context(errors::StorageError::KVError)?;
Ok(created_attempt)
}
Err(error) => Err(error.change_context(errors::StorageError::KVError)),
@ -444,9 +440,20 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
.change_context(errors::StorageError::KVError)?;
let field = format!("pa_{}", updated_attempt.attempt_id);
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Update {
updatable: kv::Updateable::PaymentAttemptUpdate(
kv::PaymentAttemptUpdateMems {
orig: this.clone().to_storage_model(),
update_data: payment_attempt.to_storage_model(),
},
),
},
};
kv_wrapper::<(), _, _>(
self,
KvOperation::Hset::<PaymentAttempt>((&field, redis_value)),
KvOperation::Hset::<DieselPaymentAttempt>((&field, redis_value), redis_entry),
&key,
)
.await
@ -513,25 +520,6 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
(_, _) => {}
}
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Update {
updatable: kv::Updateable::PaymentAttemptUpdate(
kv::PaymentAttemptUpdateMems {
orig: this.to_storage_model(),
update_data: payment_attempt.to_storage_model(),
},
),
},
};
self.push_to_drainer_stream::<DieselPaymentAttempt>(
redis_entry,
PartitionKey::MerchantIdPaymentId {
merchant_id: &updated_attempt.merchant_id,
payment_id: &updated_attempt.payment_id,
},
)
.await
.change_context(errors::StorageError::KVError)?;
Ok(updated_attempt)
}
}
@ -565,7 +553,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
try_redis_get_else_try_database_get(
async {
kv_wrapper(self, KvOperation::<PaymentAttempt>::HGet(&lookup.sk_id), key).await?.try_into_hget()
kv_wrapper(self, KvOperation::<DieselPaymentAttempt>::HGet(&lookup.sk_id), key).await?.try_into_hget()
},
|| async {self.router_store.find_payment_attempt_by_connector_transaction_id_payment_id_merchant_id(connector_transaction_id, payment_id, merchant_id, storage_scheme).await},
)
@ -597,7 +585,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
let redis_fut = async {
let kv_result = kv_wrapper::<PaymentAttempt, _, _>(
self,
KvOperation::<PaymentAttempt>::Scan(pattern),
KvOperation::<DieselPaymentAttempt>::Scan(pattern),
key,
)
.await?
@ -645,7 +633,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
async {
kv_wrapper(
self,
KvOperation::<PaymentAttempt>::HGet(&lookup.sk_id),
KvOperation::<DieselPaymentAttempt>::HGet(&lookup.sk_id),
key,
)
.await?
@ -690,7 +678,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
let field = format!("pa_{attempt_id}");
try_redis_get_else_try_database_get(
async {
kv_wrapper(self, KvOperation::<PaymentAttempt>::HGet(&field), key)
kv_wrapper(self, KvOperation::<DieselPaymentAttempt>::HGet(&field), key)
.await?
.try_into_hget()
},
@ -736,7 +724,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
async {
kv_wrapper(
self,
KvOperation::<PaymentAttempt>::HGet(&lookup.sk_id),
KvOperation::<DieselPaymentAttempt>::HGet(&lookup.sk_id),
key,
)
.await?
@ -784,7 +772,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
async {
kv_wrapper(
self,
KvOperation::<PaymentAttempt>::HGet(&lookup.sk_id),
KvOperation::<DieselPaymentAttempt>::HGet(&lookup.sk_id),
key,
)
.await?
@ -824,7 +812,7 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
MerchantStorageScheme::RedisKv => {
let key = format!("mid_{merchant_id}_pid_{payment_id}");
kv_wrapper(self, KvOperation::<PaymentAttempt>::Scan("pa_*"), key)
kv_wrapper(self, KvOperation::<DieselPaymentAttempt>::Scan("pa_*"), key)
.await
.change_context(errors::StorageError::KVError)?
.try_into_scan()

View File

@ -35,7 +35,7 @@ use router_env::{instrument, tracing};
use crate::{
diesel_error_to_data_error,
redis::kv_store::{kv_wrapper, KvOperation, PartitionKey},
redis::kv_store::{kv_wrapper, KvOperation},
utils::{pg_connection_read, pg_connection_write},
DataModelExt, DatabaseStore, KVRouterStore,
};
@ -93,11 +93,19 @@ impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
payment_link_id: new.payment_link_id.clone(),
payment_confirm_source: new.payment_confirm_source,
};
let diesel_intent = created_intent.clone().to_storage_model();
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Insert {
insertable: kv::Insertable::PaymentIntent(new.to_storage_model()),
},
};
match kv_wrapper::<DieselPaymentIntent, _, _>(
self,
KvOperation::HSetNx(&field, &diesel_intent),
KvOperation::<DieselPaymentIntent>::HSetNx(
&field,
&created_intent.clone().to_storage_model(),
redis_entry,
),
&key,
)
.await
@ -109,23 +117,7 @@ impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
key: Some(key),
})
.into_report(),
Ok(HsetnxReply::KeySet) => {
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Insert {
insertable: kv::Insertable::PaymentIntent(new.to_storage_model()),
},
};
self.push_to_drainer_stream::<DieselPaymentIntent>(
redis_entry,
PartitionKey::MerchantIdPaymentId {
merchant_id: &created_intent.merchant_id,
payment_id: &created_intent.payment_id,
},
)
.await
.change_context(StorageError::KVError)?;
Ok(created_intent)
}
Ok(HsetnxReply::KeySet) => Ok(created_intent),
Err(error) => Err(error.change_context(StorageError::KVError)),
}
}
@ -157,16 +149,6 @@ impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
Encode::<DieselPaymentIntent>::encode_to_string_of_json(&diesel_intent)
.change_context(StorageError::SerializationFailed)?;
kv_wrapper::<(), _, _>(
self,
KvOperation::<DieselPaymentIntent>::Hset((&field, redis_value)),
&key,
)
.await
.change_context(StorageError::KVError)?
.try_into_hset()
.change_context(StorageError::KVError)?;
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Update {
updatable: kv::Updateable::PaymentIntentUpdate(
@ -178,15 +160,16 @@ impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
},
};
self.push_to_drainer_stream::<DieselPaymentIntent>(
redis_entry,
PartitionKey::MerchantIdPaymentId {
merchant_id: &updated_intent.merchant_id,
payment_id: &updated_intent.payment_id,
},
kv_wrapper::<(), _, _>(
self,
KvOperation::<DieselPaymentIntent>::Hset((&field, redis_value), redis_entry),
&key,
)
.await
.change_context(StorageError::KVError)?
.try_into_hset()
.change_context(StorageError::KVError)?;
Ok(updated_intent)
}
}

View File

@ -3,9 +3,10 @@ use std::{fmt::Debug, sync::Arc};
use common_utils::errors::CustomResult;
use redis_interface::errors::RedisError;
use router_derive::TryGetEnumVariant;
use router_env::logger;
use serde::de;
use crate::{consts, KVRouterStore};
use crate::{consts, metrics, store::kv::TypedSql, KVRouterStore};
pub trait KvStorePartition {
fn partition_number(key: PartitionKey<'_>, num_partitions: u8) -> u32 {
@ -48,10 +49,11 @@ pub trait RedisConnInterface {
) -> error_stack::Result<Arc<redis_interface::RedisConnectionPool>, RedisError>;
}
/// An enum to represent what operation to do on
pub enum KvOperation<'a, S: serde::Serialize + Debug> {
Hset((&'a str, String)),
SetNx(S),
HSetNx(&'a str, S),
Hset((&'a str, String), TypedSql),
SetNx(&'a S, TypedSql),
HSetNx(&'a str, &'a S, TypedSql),
HGet(&'a str),
Get,
Scan(&'a str),
@ -68,6 +70,22 @@ pub enum KvResult<T: de::DeserializeOwned> {
Scan(Vec<T>),
}
impl<T> std::fmt::Display for KvOperation<'_, T>
where
T: serde::Serialize + Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
KvOperation::Hset(_, _) => f.write_str("Hset"),
KvOperation::SetNx(_, _) => f.write_str("Setnx"),
KvOperation::HSetNx(_, _, _) => f.write_str("HSetNx"),
KvOperation::HGet(_) => f.write_str("Hget"),
KvOperation::Get => f.write_str("Get"),
KvOperation::Scan(_) => f.write_str("Scan"),
}
}
}
pub async fn kv_wrapper<'a, T, D, S>(
store: &KVRouterStore<D>,
op: KvOperation<'a, S>,
@ -76,45 +94,101 @@ pub async fn kv_wrapper<'a, T, D, S>(
where
T: de::DeserializeOwned,
D: crate::database::store::DatabaseStore,
S: serde::Serialize + Debug,
S: serde::Serialize + Debug + KvStorePartition,
{
let redis_conn = store.get_redis_conn()?;
let key = key.as_ref();
let type_name = std::any::type_name::<T>();
let operation = op.to_string();
match op {
KvOperation::Hset(value) => {
redis_conn
.set_hash_fields(key, value, Some(consts::KV_TTL))
.await?;
Ok(KvResult::Hset(()))
let partition_key = PartitionKey::MerchantIdPaymentIdCombination { combination: key };
let result = async {
match op {
KvOperation::Hset(value, sql) => {
logger::debug!("Operation: {operation} value: {value:?}");
redis_conn
.set_hash_fields(key, value, Some(consts::KV_TTL))
.await?;
store
.push_to_drainer_stream::<S>(sql, partition_key)
.await?;
Ok(KvResult::Hset(()))
}
KvOperation::HGet(field) => {
let result = redis_conn
.get_hash_field_and_deserialize(key, field, type_name)
.await?;
Ok(KvResult::HGet(result))
}
KvOperation::Scan(pattern) => {
let result: Vec<T> = redis_conn.hscan_and_deserialize(key, pattern, None).await?;
Ok(KvResult::Scan(result))
}
KvOperation::HSetNx(field, value, sql) => {
logger::debug!("Operation: {operation} value: {value:?}");
let result = redis_conn
.serialize_and_set_hash_field_if_not_exist(
key,
field,
value,
Some(consts::KV_TTL),
)
.await?;
if matches!(result, redis_interface::HsetnxReply::KeySet) {
store
.push_to_drainer_stream::<S>(sql, partition_key)
.await?;
}
Ok(KvResult::HSetNx(result))
}
KvOperation::SetNx(value, sql) => {
logger::debug!("Operation: {operation} value: {value:?}");
let result = redis_conn
.serialize_and_set_key_if_not_exist(key, value, Some(consts::KV_TTL.into()))
.await?;
if matches!(result, redis_interface::SetnxReply::KeySet) {
store
.push_to_drainer_stream::<S>(sql, partition_key)
.await?;
}
Ok(KvResult::SetNx(result))
}
KvOperation::Get => {
let result = redis_conn.get_and_deserialize_key(key, type_name).await?;
Ok(KvResult::Get(result))
}
}
KvOperation::HGet(field) => {
let result = redis_conn
.get_hash_field_and_deserialize(key, field, type_name)
.await?;
Ok(KvResult::HGet(result))
}
KvOperation::Scan(pattern) => {
let result: Vec<T> = redis_conn.hscan_and_deserialize(key, pattern, None).await?;
Ok(KvResult::Scan(result))
}
KvOperation::HSetNx(field, value) => {
let result = redis_conn
.serialize_and_set_hash_field_if_not_exist(key, field, value, Some(consts::KV_TTL))
.await?;
Ok(KvResult::HSetNx(result))
}
KvOperation::SetNx(value) => {
let result = redis_conn
.serialize_and_set_key_if_not_exist(key, value, Some(consts::KV_TTL.into()))
.await?;
Ok(KvResult::SetNx(result))
}
KvOperation::Get => {
let result = redis_conn.get_and_deserialize_key(key, type_name).await?;
Ok(KvResult::Get(result))
}
}
};
result
.await
.map(|result| {
logger::debug!("KvOperation {operation} succeeded");
let keyvalue = router_env::opentelemetry::KeyValue::new("operation", operation.clone());
metrics::KV_OPERATION_SUCCESSFUL.add(&metrics::CONTEXT, 1, &[keyvalue]);
result
})
.map_err(|err| {
logger::error!("KvOperation for {operation} failed with {err:?}");
let keyvalue = router_env::opentelemetry::KeyValue::new("operation", operation);
metrics::KV_OPERATION_FAILED.add(&metrics::CONTEXT, 1, &[keyvalue]);
err
})
}