feat: Soft kill kv (#4582)

Co-authored-by: Akshay S <akshay.s@Akshay-Subramanian-D66TQ6D97K.local>
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
akshay-97
2024-05-20 12:47:05 +05:30
committed by GitHub
parent 5e84855496
commit 3fa59d4bac
32 changed files with 462 additions and 118 deletions

View File

@ -151,6 +151,7 @@ pub struct KVRouterStore<T: DatabaseStore> {
drainer_num_partitions: u8,
ttl_for_kv: u32,
pub request_id: Option<String>,
soft_kill_mode: bool,
}
#[async_trait::async_trait]
@ -159,14 +160,16 @@ where
RouterStore<T>: DatabaseStore,
T: DatabaseStore,
{
type Config = (RouterStore<T>, String, u8, u32);
type Config = (RouterStore<T>, String, u8, u32, Option<bool>);
async fn new(config: Self::Config, _test_transaction: bool) -> StorageResult<Self> {
let (router_store, drainer_stream_name, drainer_num_partitions, ttl_for_kv) = config;
let (router_store, drainer_stream_name, drainer_num_partitions, ttl_for_kv, soft_kill_mode) =
config;
Ok(Self::from_store(
router_store,
drainer_stream_name,
drainer_num_partitions,
ttl_for_kv,
soft_kill_mode,
))
}
fn get_master_pool(&self) -> &PgPool {
@ -191,6 +194,7 @@ impl<T: DatabaseStore> KVRouterStore<T> {
drainer_stream_name: String,
drainer_num_partitions: u8,
ttl_for_kv: u32,
soft_kill: Option<bool>,
) -> Self {
let request_id = store.request_id.clone();
@ -200,6 +204,7 @@ impl<T: DatabaseStore> KVRouterStore<T> {
drainer_num_partitions,
ttl_for_kv,
request_id,
soft_kill_mode: soft_kill.unwrap_or(false),
}
}

View File

@ -12,7 +12,7 @@ use redis_interface::SetnxReply;
use crate::{
diesel_error_to_data_error,
errors::RedisErrorExt,
redis::kv_store::{kv_wrapper, KvOperation, PartitionKey},
redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey},
utils::{self, try_redis_get_else_try_database_get},
DatabaseStore, KVRouterStore, RouterStore,
};
@ -71,6 +71,8 @@ impl<T: DatabaseStore> ReverseLookupInterface for KVRouterStore<T> {
new: DieselReverseLookupNew,
storage_scheme: storage_enums::MerchantStorageScheme,
) -> CustomResult<DieselReverseLookup, errors::StorageError> {
let storage_scheme =
decide_storage_scheme::<_, DieselReverseLookup>(self, storage_scheme, Op::Insert).await;
match storage_scheme {
storage_enums::MerchantStorageScheme::PostgresOnly => {
self.router_store
@ -124,6 +126,8 @@ impl<T: DatabaseStore> ReverseLookupInterface for KVRouterStore<T> {
.get_lookup_by_lookup_id(id, storage_scheme)
.await
};
let storage_scheme =
decide_storage_scheme::<_, DieselReverseLookup>(self, storage_scheme, Op::Find).await;
match storage_scheme {
storage_enums::MerchantStorageScheme::PostgresOnly => database_call().await,
storage_enums::MerchantStorageScheme::RedisKv => {

View File

@ -10,3 +10,4 @@ counter_metric!(KV_OPERATION_SUCCESSFUL, GLOBAL_METER);
counter_metric!(KV_OPERATION_FAILED, GLOBAL_METER);
counter_metric!(KV_PUSHED_TO_DRAINER, GLOBAL_METER);
counter_metric!(KV_FAILED_TO_PUSH_TO_DRAINER, GLOBAL_METER);
counter_metric!(KV_SOFT_KILL_ACTIVE_UPDATE, GLOBAL_METER);

View File

@ -31,7 +31,7 @@ use crate::{
diesel_error_to_data_error,
errors::RedisErrorExt,
lookup::ReverseLookupInterface,
redis::kv_store::{kv_wrapper, KvOperation, PartitionKey},
redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey},
utils::{pg_connection_read, pg_connection_write, try_redis_get_else_try_database_get},
DataModelExt, DatabaseStore, KVRouterStore, RouterStore,
};
@ -333,6 +333,9 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
payment_attempt: PaymentAttemptNew,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
let storage_scheme =
decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Insert)
.await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store
@ -470,6 +473,17 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
payment_attempt: PaymentAttemptUpdate,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
let key = PartitionKey::MerchantIdPaymentId {
merchant_id: &this.merchant_id,
payment_id: &this.payment_id,
};
let field = format!("pa_{}", this.attempt_id);
let storage_scheme = decide_storage_scheme::<_, DieselPaymentAttempt>(
self,
storage_scheme,
Op::Update(key.clone(), &field, Some(&this.updated_by)),
)
.await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store
@ -477,10 +491,6 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
.await
}
MerchantStorageScheme::RedisKv => {
let key = PartitionKey::MerchantIdPaymentId {
merchant_id: &this.merchant_id,
payment_id: &this.payment_id,
};
let key_str = key.to_string();
let old_connector_transaction_id = &this.connector_transaction_id;
let old_preprocessing_id = &this.preprocessing_step_id;
@ -493,7 +503,6 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
// Check for database presence as well Maybe use a read replica here ?
let redis_value = serde_json::to_string(&updated_attempt)
.change_context(errors::StorageError::KVError)?;
let field = format!("pa_{}", updated_attempt.attempt_id);
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Update {
@ -588,6 +597,8 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
merchant_id: &str,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
let storage_scheme =
decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store
@ -645,6 +656,8 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
storage_scheme,
)
};
let storage_scheme =
decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => database_call().await,
MerchantStorageScheme::RedisKv => {
@ -697,6 +710,8 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
storage_scheme,
)
};
let storage_scheme =
decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => database_call().await,
MerchantStorageScheme::RedisKv => {
@ -744,6 +759,8 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
connector_txn_id: &str,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
let storage_scheme =
decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store
@ -804,6 +821,8 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
attempt_id: &str,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
let storage_scheme =
decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store
@ -850,6 +869,8 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
merchant_id: &str,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
let storage_scheme =
decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store
@ -909,6 +930,8 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
merchant_id: &str,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
let storage_scheme =
decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store
@ -968,6 +991,8 @@ impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
payment_id: &str,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Vec<PaymentAttempt>, errors::StorageError> {
let storage_scheme =
decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store

View File

@ -43,7 +43,7 @@ use crate::connection;
use crate::{
diesel_error_to_data_error,
errors::RedisErrorExt,
redis::kv_store::{kv_wrapper, KvOperation, PartitionKey},
redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey},
utils::{self, pg_connection_read, pg_connection_write},
DataModelExt, DatabaseStore, KVRouterStore,
};
@ -55,6 +55,15 @@ impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
new: PaymentIntentNew,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PaymentIntent, StorageError> {
let merchant_id = new.merchant_id.clone();
let payment_id = new.payment_id.clone();
let field = format!("pi_{}", new.payment_id);
let key = PartitionKey::MerchantIdPaymentId {
merchant_id: &merchant_id,
payment_id: &payment_id,
};
let storage_scheme =
decide_storage_scheme::<_, DieselPaymentIntent>(self, storage_scheme, Op::Insert).await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store
@ -63,14 +72,7 @@ impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
}
MerchantStorageScheme::RedisKv => {
let merchant_id = new.merchant_id.clone();
let payment_id = new.payment_id.clone();
let key = PartitionKey::MerchantIdPaymentId {
merchant_id: &merchant_id,
payment_id: &payment_id,
};
let key_str = key.to_string();
let field = format!("pi_{}", new.payment_id);
let created_intent = PaymentIntent {
id: 0i32,
payment_id: new.payment_id.clone(),
@ -155,6 +157,19 @@ impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
payment_intent_update: PaymentIntentUpdate,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PaymentIntent, StorageError> {
let merchant_id = this.merchant_id.clone();
let payment_id = this.payment_id.clone();
let key = PartitionKey::MerchantIdPaymentId {
merchant_id: &merchant_id,
payment_id: &payment_id,
};
let field = format!("pi_{}", this.payment_id);
let storage_scheme = decide_storage_scheme::<_, DieselPaymentIntent>(
self,
storage_scheme,
Op::Update(key.clone(), &field, Some(&this.updated_by)),
)
.await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store
@ -162,14 +177,7 @@ impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
.await
}
MerchantStorageScheme::RedisKv => {
let merchant_id = this.merchant_id.clone();
let payment_id = this.payment_id.clone();
let key = PartitionKey::MerchantIdPaymentId {
merchant_id: &merchant_id,
payment_id: &payment_id,
};
let key_str = key.to_string();
let field = format!("pi_{}", this.payment_id);
let diesel_intent_update = payment_intent_update.to_storage_model();
let origin_diesel_intent = this.to_storage_model();
@ -225,6 +233,8 @@ impl<T: DatabaseStore> PaymentIntentInterface for KVRouterStore<T> {
er.change_context(new_err)
})
};
let storage_scheme =
decide_storage_scheme::<_, DieselPaymentIntent>(self, storage_scheme, Op::Find).await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => database_call().await,

View File

@ -29,7 +29,7 @@ use crate::{
diesel_error_to_data_error,
errors::RedisErrorExt,
lookup::ReverseLookupInterface,
redis::kv_store::{kv_wrapper, KvOperation, PartitionKey},
redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey},
utils::{self, pg_connection_read, pg_connection_write},
DataModelExt, DatabaseStore, KVRouterStore,
};
@ -43,6 +43,8 @@ impl<T: DatabaseStore> PayoutAttemptInterface for KVRouterStore<T> {
payouts: &Payouts,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PayoutAttempt, errors::StorageError> {
let storage_scheme =
decide_storage_scheme::<_, DieselPayoutAttempt>(self, storage_scheme, Op::Insert).await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store
@ -136,6 +138,17 @@ impl<T: DatabaseStore> PayoutAttemptInterface for KVRouterStore<T> {
payouts: &Payouts,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PayoutAttempt, errors::StorageError> {
let key = PartitionKey::MerchantIdPayoutAttemptId {
merchant_id: &this.merchant_id,
payout_attempt_id: &this.payout_id,
};
let field = format!("poa_{}", this.payout_attempt_id);
let storage_scheme = decide_storage_scheme::<_, DieselPayoutAttempt>(
self,
storage_scheme,
Op::Update(key.clone(), &field, None),
)
.await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store
@ -143,12 +156,7 @@ impl<T: DatabaseStore> PayoutAttemptInterface for KVRouterStore<T> {
.await
}
MerchantStorageScheme::RedisKv => {
let key = PartitionKey::MerchantIdPayoutAttemptId {
merchant_id: &this.merchant_id,
payout_attempt_id: &this.payout_id,
};
let key_str = key.to_string();
let field = format!("poa_{}", this.payout_attempt_id);
let diesel_payout_update = payout_update.to_storage_model();
let origin_diesel_payout = this.clone().to_storage_model();
@ -195,6 +203,8 @@ impl<T: DatabaseStore> PayoutAttemptInterface for KVRouterStore<T> {
payout_attempt_id: &str,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<PayoutAttempt, errors::StorageError> {
let storage_scheme =
decide_storage_scheme::<_, DieselPayoutAttempt>(self, storage_scheme, Op::Find).await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store

View File

@ -38,7 +38,7 @@ use crate::connection;
use crate::{
diesel_error_to_data_error,
errors::RedisErrorExt,
redis::kv_store::{kv_wrapper, KvOperation, PartitionKey},
redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey},
utils::{self, pg_connection_read, pg_connection_write},
DataModelExt, DatabaseStore, KVRouterStore,
};
@ -51,6 +51,8 @@ impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {
new: PayoutsNew,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Payouts, StorageError> {
let storage_scheme =
decide_storage_scheme::<_, DieselPayouts>(self, storage_scheme, Op::Insert).await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store.insert_payout(new, storage_scheme).await
@ -128,6 +130,17 @@ impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {
payout_attempt: &PayoutAttempt,
storage_scheme: MerchantStorageScheme,
) -> error_stack::Result<Payouts, StorageError> {
let key = PartitionKey::MerchantIdPayoutId {
merchant_id: &this.merchant_id,
payout_id: &this.payout_id,
};
let field = format!("po_{}", this.payout_id);
let storage_scheme = decide_storage_scheme::<_, DieselPayouts>(
self,
storage_scheme,
Op::Update(key.clone(), &field, None),
)
.await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
self.router_store
@ -135,12 +148,7 @@ impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {
.await
}
MerchantStorageScheme::RedisKv => {
let key = PartitionKey::MerchantIdPayoutId {
merchant_id: &this.merchant_id,
payout_id: &this.payout_id,
};
let key_str = key.to_string();
let field = format!("po_{}", this.payout_id);
let diesel_payout_update = payout_update.to_storage_model();
let origin_diesel_payout = this.clone().to_storage_model();
@ -194,6 +202,8 @@ impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {
er.change_context(new_err)
})
};
let storage_scheme =
decide_storage_scheme::<_, DieselPayouts>(self, storage_scheme, Op::Find).await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => database_call().await,
MerchantStorageScheme::RedisKv => {
@ -236,6 +246,8 @@ impl<T: DatabaseStore> PayoutsInterface for KVRouterStore<T> {
er.change_context(new_err)
})
};
let storage_scheme =
decide_storage_scheme::<_, DieselPayouts>(self, storage_scheme, Op::Find).await;
match storage_scheme {
MerchantStorageScheme::PostgresOnly => {
let maybe_payouts = database_call().await?;

View File

@ -1,6 +1,7 @@
use std::{fmt::Debug, sync::Arc};
use common_utils::errors::CustomResult;
use diesel_models::enums::MerchantStorageScheme;
use error_stack::report;
use redis_interface::errors::RedisError;
use router_derive::TryGetEnumVariant;
@ -20,6 +21,7 @@ pub trait KvStorePartition {
}
#[allow(unused)]
#[derive(Clone)]
pub enum PartitionKey<'a> {
MerchantIdPaymentId {
merchant_id: &'a str,
@ -235,3 +237,67 @@ where
err
})
}
pub enum Op<'a> {
Insert,
Update(PartitionKey<'a>, &'a str, Option<&'a str>),
Find,
}
impl<'a> std::fmt::Display for Op<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Op::Insert => f.write_str("insert"),
Op::Find => f.write_str("find"),
Op::Update(p_key, _, _) => f.write_str(&format!("update_{}", p_key)),
}
}
}
pub async fn decide_storage_scheme<'a, T, D>(
store: &KVRouterStore<T>,
storage_scheme: MerchantStorageScheme,
operation: Op<'a>,
) -> MerchantStorageScheme
where
D: de::DeserializeOwned
+ serde::Serialize
+ Debug
+ KvStorePartition
+ UniqueConstraints
+ Sync,
T: crate::database::store::DatabaseStore,
{
if store.soft_kill_mode {
let ops = operation.to_string();
let updated_scheme = match operation {
Op::Insert => MerchantStorageScheme::PostgresOnly,
Op::Find => MerchantStorageScheme::RedisKv,
Op::Update(partition_key, field, Some("redis_kv")) => {
match kv_wrapper::<D, _, _>(store, KvOperation::<D>::HGet(field), partition_key)
.await
{
Ok(_) => {
metrics::KV_SOFT_KILL_ACTIVE_UPDATE.add(&metrics::CONTEXT, 1, &[]);
MerchantStorageScheme::RedisKv
}
Err(_) => MerchantStorageScheme::PostgresOnly,
}
}
Op::Update(_, _, None) => MerchantStorageScheme::PostgresOnly,
Op::Update(_, _, Some("postgres_only")) => MerchantStorageScheme::PostgresOnly,
_ => {
logger::debug!("soft_kill_mode - using default storage scheme");
storage_scheme
}
};
let type_name = std::any::type_name::<D>();
logger::info!(soft_kill_mode = "decide_storage_scheme", decided_scheme = %updated_scheme, configured_scheme = %storage_scheme,entity = %type_name, operation = %ops);
updated_scheme
} else {
storage_scheme
}
}