refactor: use generic function to push to drainer (#496)

This commit is contained in:
Kartikeya Hegde
2023-02-06 17:18:28 +05:30
committed by GitHub
parent 9381a37f8f
commit e4b525485e
4 changed files with 67 additions and 108 deletions

View File

@ -315,7 +315,7 @@ impl PaymentAttemptInterface for MockDb {
mod storage {
use common_utils::date_time;
use error_stack::{IntoReport, ResultExt};
use redis_interface::{HsetnxReply, RedisEntryId};
use redis_interface::HsetnxReply;
use super::PaymentAttemptInterface;
use crate::{
@ -324,7 +324,6 @@ mod storage {
db::reverse_lookup::ReverseLookupInterface,
services::Store,
types::storage::{enums, kv, payment_attempt::*, ReverseLookupNew},
utils::storage_partitioning::KvStorePartition,
};
#[async_trait::async_trait]
@ -421,23 +420,14 @@ mod storage {
insertable: kv::Insertable::PaymentAttempt(payment_attempt),
},
};
let stream_name = self.get_drainer_stream_name(&PaymentAttempt::shard_key(
self.push_to_drainer_stream::<PaymentAttempt>(
redis_entry,
crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId {
merchant_id: &created_attempt.merchant_id,
payment_id: &created_attempt.payment_id,
},
self.config.drainer_num_partitions,
));
self.redis_conn
.stream_append_entry(
&stream_name,
&RedisEntryId::AutoGeneratedID,
redis_entry
.to_field_value_pairs()
.change_context(errors::StorageError::KVError)?,
)
.await
.change_context(errors::StorageError::KVError)?;
}
)
.await?;
Ok(created_attempt)
}
Err(error) => Err(error.change_context(errors::StorageError::KVError)),
@ -508,24 +498,14 @@ mod storage {
),
},
};
let stream_name = self.get_drainer_stream_name(&PaymentAttempt::shard_key(
self.push_to_drainer_stream::<PaymentAttempt>(
redis_entry,
crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId {
merchant_id: &updated_attempt.merchant_id,
payment_id: &updated_attempt.payment_id,
},
self.config.drainer_num_partitions,
));
self.redis_conn
.stream_append_entry(
&stream_name,
&RedisEntryId::AutoGeneratedID,
redis_entry
.to_field_value_pairs()
.change_context(errors::StorageError::KVError)?,
)
.await
.change_context(errors::StorageError::KVError)?;
)
.await?;
Ok(updated_attempt)
}
}

View File

@ -41,7 +41,7 @@ pub trait PaymentIntentInterface {
mod storage {
use common_utils::date_time;
use error_stack::{IntoReport, ResultExt};
use redis_interface::{HsetnxReply, RedisEntryId};
use redis_interface::HsetnxReply;
use super::PaymentIntentInterface;
#[cfg(feature = "olap")]
@ -51,10 +51,7 @@ mod storage {
core::errors::{self, CustomResult},
services::Store,
types::storage::{enums, kv, payment_intent::*},
utils::{
self,
storage_partitioning::{self, KvStorePartition},
},
utils::{self, storage_partitioning},
};
#[async_trait::async_trait]
@ -113,24 +110,14 @@ mod storage {
insertable: kv::Insertable::PaymentIntent(new),
},
};
let stream_name =
self.get_drainer_stream_name(&PaymentIntent::shard_key(
storage_partitioning::PartitionKey::MerchantIdPaymentId {
merchant_id: &created_intent.merchant_id,
payment_id: &created_intent.payment_id,
},
self.config.drainer_num_partitions,
));
self.redis_conn
.stream_append_entry(
&stream_name,
&RedisEntryId::AutoGeneratedID,
redis_entry
.to_field_value_pairs()
.change_context(errors::StorageError::KVError)?,
)
.await
.change_context(errors::StorageError::KVError)?;
self.push_to_drainer_stream::<PaymentIntent>(
redis_entry,
storage_partitioning::PartitionKey::MerchantIdPaymentId {
merchant_id: &created_intent.merchant_id,
payment_id: &created_intent.payment_id,
},
)
.await?;
Ok(created_intent)
}
Err(error) => Err(error.change_context(errors::StorageError::KVError)),
@ -182,24 +169,14 @@ mod storage {
},
};
let stream_name = self.get_drainer_stream_name(&PaymentIntent::shard_key(
self.push_to_drainer_stream::<PaymentIntent>(
redis_entry,
storage_partitioning::PartitionKey::MerchantIdPaymentId {
merchant_id: &updated_intent.merchant_id,
payment_id: &updated_intent.payment_id,
},
self.config.drainer_num_partitions,
));
self.redis_conn
.stream_append_entry(
&stream_name,
&RedisEntryId::AutoGeneratedID,
redis_entry
.to_field_value_pairs()
.change_context(errors::StorageError::KVError)?,
)
.await
.change_context(errors::StorageError::KVError)?;
)
.await?;
Ok(updated_intent)
}
}

View File

@ -199,7 +199,7 @@ mod storage {
mod storage {
use common_utils::date_time;
use error_stack::{IntoReport, ResultExt};
use redis_interface::{HsetnxReply, RedisEntryId};
use redis_interface::HsetnxReply;
use super::RefundInterface;
use crate::{
@ -209,10 +209,7 @@ mod storage {
logger,
services::Store,
types::storage::{self as storage_types, enums, kv},
utils::{
self, db_utils,
storage_partitioning::{KvStorePartition, PartitionKey},
},
utils::{self, db_utils, storage_partitioning::PartitionKey},
};
#[async_trait::async_trait]
impl RefundInterface for Store {
@ -344,25 +341,15 @@ mod storage {
insertable: kv::Insertable::Refund(new),
},
};
self.push_to_drainer_stream::<storage_types::Refund>(
redis_entry,
PartitionKey::MerchantIdPaymentId {
merchant_id: &created_refund.merchant_id,
payment_id: &created_refund.payment_id,
},
)
.await?;
let stream_name =
self.get_drainer_stream_name(&storage_types::Refund::shard_key(
PartitionKey::MerchantIdPaymentId {
merchant_id: &created_refund.merchant_id,
payment_id: &created_refund.payment_id,
},
self.config.drainer_num_partitions,
));
self.redis_conn
.stream_append_entry(
&stream_name,
&RedisEntryId::AutoGeneratedID,
redis_entry
.to_field_value_pairs()
.change_context(errors::StorageError::KVError)?,
)
.await
.change_context(errors::StorageError::KVError)?;
Ok(created_refund)
}
Err(er) => Err(er).change_context(errors::StorageError::KVError),
@ -449,14 +436,6 @@ mod storage {
.await
.change_context(errors::StorageError::KVError)?;
let stream_name =
self.get_drainer_stream_name(&storage_types::Refund::shard_key(
PartitionKey::MerchantIdPaymentId {
merchant_id: &updated_refund.merchant_id,
payment_id: &updated_refund.payment_id,
},
self.config.drainer_num_partitions,
));
let redis_entry = kv::TypedSql {
op: kv::DBOperation::Update {
updatable: kv::Updateable::RefundUpdate(kv::RefundUpdateMems {
@ -465,16 +444,14 @@ mod storage {
}),
},
};
self.redis_conn
.stream_append_entry(
&stream_name,
&RedisEntryId::AutoGeneratedID,
redis_entry
.to_field_value_pairs()
.change_context(errors::StorageError::KVError)?,
)
.await
.change_context(errors::StorageError::KVError)?;
self.push_to_drainer_stream::<storage_types::Refund>(
redis_entry,
PartitionKey::MerchantIdPaymentId {
merchant_id: &updated_refund.merchant_id,
payment_id: &updated_refund.payment_id,
},
)
.await?;
Ok(updated_refund)
}
}

View File

@ -45,4 +45,29 @@ impl Store {
// Example: {shard_5}_drainer_stream
format!("{{{}}}_{}", shard_key, self.config.drainer_stream_name,)
}
#[cfg(feature = "kv_store")]
pub(crate) async fn push_to_drainer_stream<T>(
&self,
redis_entry: storage_models::kv::TypedSql,
partition_key: crate::utils::storage_partitioning::PartitionKey<'_>,
) -> crate::core::errors::CustomResult<(), crate::core::errors::StorageError>
where
T: crate::utils::storage_partitioning::KvStorePartition,
{
use error_stack::ResultExt;
let shard_key = T::shard_key(partition_key, self.config.drainer_num_partitions);
let stream_name = self.get_drainer_stream_name(&shard_key);
self.redis_conn
.stream_append_entry(
&stream_name,
&redis_interface::RedisEntryId::AutoGeneratedID,
redis_entry
.to_field_value_pairs()
.change_context(crate::core::errors::StorageError::KVError)?,
)
.await
.change_context(crate::core::errors::StorageError::KVError)
}
}