feat(events): forward the tenant configuration as part of the kafka message (#5224)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Abhishek Kanojia
2024-07-25 18:39:28 +05:30
committed by GitHub
parent 5eccffac9d
commit 623cf4c841
13 changed files with 100 additions and 56 deletions

View File

@ -699,7 +699,7 @@ sdk_eligible_payment_methods = "card"
[multitenancy]
enabled = false
global_tenant = { schema = "public", redis_key_prefix = "" }
global_tenant = { schema = "public", redis_key_prefix = "", clickhouse_database = "default"}
[multitenancy.tenants]
public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = "", clickhouse_database = "default" } # schema -> Postgres db schema, redis_key_prefix -> redis key distinguisher, base_url -> url of the tenant

View File

@ -293,7 +293,7 @@ region = "kms_region" # The AWS region used by the KMS SDK for decrypting data.
[multitenancy]
enabled = false
global_tenant = { schema = "public", redis_key_prefix = "" }
global_tenant = { schema = "public", redis_key_prefix = "", clickhouse_database = "default"}
[multitenancy.tenants]
public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = "", clickhouse_database = "default" }

View File

@ -698,7 +698,7 @@ sdk_eligible_payment_methods = "card"
[multitenancy]
enabled = false
global_tenant = { schema = "public", redis_key_prefix = "" }
global_tenant = { schema = "public", redis_key_prefix = "", clickhouse_database = "default"}
[multitenancy.tenants]
public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = "", clickhouse_database = "default"}

View File

@ -526,7 +526,7 @@ sdk_eligible_payment_methods = "card"
[multitenancy]
enabled = false
global_tenant = { schema = "public", redis_key_prefix = "" }
global_tenant = { schema = "public", redis_key_prefix = "", clickhouse_database = "default"}
[multitenancy.tenants]
public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = "", clickhouse_database = "default" }

View File

@ -854,7 +854,7 @@ impl AnalyticsProvider {
pub async fn from_conf(
config: &AnalyticsConfig,
tenant: &dyn storage_impl::config::ClickHouseConfig,
tenant: &dyn storage_impl::config::TenantConfig,
) -> Self {
match config {
AnalyticsConfig::Sqlx { sqlx } => {

View File

@ -43,14 +43,12 @@ where
let start_instant = Instant::now();
logger::info!(tag = ?Tag::BeginRequest, payload = ?payload);
let req_state = state.get_req_state();
let server_wrap_util_res = metrics::request::record_request_time_metric(
api::server_wrap_util(
&flow,
state.clone().into(),
request.headers(),
req_state,
request,
payload,
func,

View File

@ -176,9 +176,6 @@ impl storage_impl::config::TenantConfig for Tenant {
fn get_redis_key_prefix(&self) -> &str {
self.redis_key_prefix.as_str()
}
}
impl storage_impl::config::ClickHouseConfig for Tenant {
fn get_clickhouse_database(&self) -> &str {
self.clickhouse_database.as_str()
}
@ -188,6 +185,7 @@ impl storage_impl::config::ClickHouseConfig for Tenant {
pub struct GlobalTenant {
pub schema: String,
pub redis_key_prefix: String,
pub clickhouse_database: String,
}
impl storage_impl::config::TenantConfig for GlobalTenant {
@ -197,6 +195,9 @@ impl storage_impl::config::TenantConfig for GlobalTenant {
fn get_redis_key_prefix(&self) -> &str {
self.redis_key_prefix.as_str()
}
fn get_clickhouse_database(&self) -> &str {
self.clickhouse_database.as_str()
}
}
#[derive(Debug, Deserialize, Clone, Default)]

View File

@ -26,7 +26,7 @@ use scheduler::{
SchedulerInterface,
};
use serde::Serialize;
use storage_impl::redis::kv_store::RedisConnInterface;
use storage_impl::{config::TenantConfig, redis::kv_store::RedisConnInterface};
use time::PrimitiveDateTime;
use super::{
@ -89,7 +89,13 @@ pub struct KafkaStore {
}
impl KafkaStore {
pub async fn new(store: Store, kafka_producer: KafkaProducer, tenant_id: TenantID) -> Self {
pub async fn new(
store: Store,
mut kafka_producer: KafkaProducer,
tenant_id: TenantID,
tenant_config: &dyn TenantConfig,
) -> Self {
kafka_producer.set_tenancy(tenant_config);
Self {
kafka_producer,
diesel_store: store,

View File

@ -6,7 +6,7 @@ use hyperswitch_domain_models::errors::{StorageError, StorageResult};
use masking::ErasedMaskSerialize;
use router_env::logger;
use serde::{Deserialize, Serialize};
use storage_impl::errors::ApplicationError;
use storage_impl::{config::TenantConfig, errors::ApplicationError};
use time::PrimitiveDateTime;
use crate::{
@ -90,6 +90,11 @@ impl EventsHandler {
Self::Logs(logger) => logger.log_event(event),
};
}
pub fn add_tenant(&mut self, tenant_config: &dyn TenantConfig) {
if let Self::Kafka(kafka_producer) = self {
kafka_producer.set_tenancy(tenant_config);
}
}
}
impl MessagingInterface for EventsHandler {

View File

@ -340,6 +340,7 @@ impl AppState {
.expect("Failed to create store"),
kafka_client.clone(),
TenantID(tenant.get_schema().to_string()),
tenant,
)
.await,
),
@ -373,22 +374,19 @@ impl AppState {
.await
}
pub fn get_req_state(&self) -> ReqState {
ReqState {
event_context: events::EventContext::new(self.event_handler.clone()),
}
}
pub fn get_session_state<E, F>(self: Arc<Self>, tenant: &str, err: F) -> Result<SessionState, E>
where
F: FnOnce() -> E + Copy,
{
let tenant_conf = self.conf.multitenancy.get_tenant(tenant).ok_or_else(err)?;
let mut event_handler = self.event_handler.clone();
event_handler.add_tenant(tenant_conf);
Ok(SessionState {
store: self.stores.get(tenant).ok_or_else(err)?.clone(),
global_store: self.global_store.clone(),
conf: Arc::clone(&self.conf),
api_client: self.api_client.clone(),
event_handler: self.event_handler.clone(),
event_handler,
#[cfg(feature = "olap")]
pool: self.pools.get(tenant).ok_or_else(err)?.clone(),
file_storage_client: self.file_storage_client.clone(),

View File

@ -692,22 +692,13 @@ pub enum AuthFlow {
#[allow(clippy::too_many_arguments)]
#[instrument(
skip(
request,
payload,
state,
func,
api_auth,
request_state,
incoming_request_header
),
skip(request, payload, state, func, api_auth, incoming_request_header),
fields(merchant_id)
)]
pub async fn server_wrap_util<'a, 'b, U, T, Q, F, Fut, E, OErr>(
flow: &'a impl router_env::types::FlowMetric,
state: web::Data<AppState>,
incoming_request_header: &HeaderMap,
mut request_state: ReqState,
request: &'a HttpRequest,
payload: T,
func: F,
@ -729,12 +720,6 @@ where
.attach_printable("Unable to extract request id from request")
.change_context(errors::ApiErrorResponse::InternalServerError.switch())?;
request_state.event_context.record_info(request_id);
request_state
.event_context
.record_info(("flow".to_string(), flow.to_string()));
// request_state.event_context.record_info(request.clone());
let mut app_state = state.get_ref().clone();
let start_instant = Instant::now();
@ -767,9 +752,6 @@ where
}
})??
};
request_state
.event_context
.record_info(("tenant_id".to_string(), tenant_id.to_string()));
// let tenant_id = "public".to_string();
let mut session_state =
Arc::new(app_state.clone()).get_session_state(tenant_id.as_str(), || {
@ -779,6 +761,16 @@ where
.switch()
})?;
session_state.add_request_id(request_id);
let mut request_state = session_state.get_req_state();
request_state.event_context.record_info(request_id);
request_state
.event_context
.record_info(("flow".to_string(), flow.to_string()));
request_state
.event_context
.record_info(("tenant_id".to_string(), tenant_id.to_string()));
// Currently auth failures are not recorded as API events
let (auth_out, auth_type) = api_auth
@ -904,7 +896,6 @@ where
ApplicationResponse<Q>: Debug,
E: ErrorSwitch<api_models::errors::types::ApiErrorResponse> + error_stack::Context,
{
let req_state = state.get_req_state();
let request_method = request.method().as_str();
let url_path = request.path();
@ -939,7 +930,6 @@ where
&flow,
state.clone(),
incoming_request_header,
req_state,
request,
payload,
func,

View File

@ -9,6 +9,8 @@ use rdkafka::{
message::{Header, OwnedHeaders},
producer::{BaseRecord, DefaultProducerContext, Producer, ThreadedProducer},
};
use serde_json::Value;
use storage_impl::config::TenantConfig;
#[cfg(feature = "payouts")]
pub mod payout;
use diesel_models::fraud_check::FraudCheck;
@ -70,21 +72,24 @@ struct KafkaEvent<'a, T: KafkaMessage> {
event: &'a T,
sign_flag: i32,
tenant_id: TenantID,
clickhouse_database: Option<String>,
}
impl<'a, T: KafkaMessage> KafkaEvent<'a, T> {
fn new(event: &'a T, tenant_id: TenantID) -> Self {
fn new(event: &'a T, tenant_id: TenantID, clickhouse_database: Option<String>) -> Self {
Self {
event,
sign_flag: 1,
tenant_id,
clickhouse_database,
}
}
fn old(event: &'a T, tenant_id: TenantID) -> Self {
fn old(event: &'a T, tenant_id: TenantID, clickhouse_database: Option<String>) -> Self {
Self {
event,
sign_flag: -1,
tenant_id,
clickhouse_database,
}
}
}
@ -263,6 +268,7 @@ pub struct KafkaProducer {
payout_analytics_topic: String,
consolidated_events_topic: String,
authentication_analytics_topic: String,
ckh_database_name: Option<String>,
}
struct RdKafkaProducer(ThreadedProducer<DefaultProducerContext>);
@ -285,6 +291,10 @@ pub enum KafkaError {
#[allow(unused)]
impl KafkaProducer {
pub fn set_tenancy(&mut self, tenant_config: &dyn TenantConfig) {
self.ckh_database_name = Some(tenant_config.get_clickhouse_database().to_string());
}
pub async fn create(conf: &KafkaSettings) -> MQResult<Self> {
Ok(Self {
producer: Arc::new(RdKafkaProducer(
@ -307,6 +317,7 @@ impl KafkaProducer {
payout_analytics_topic: conf.payout_analytics_topic.clone(),
consolidated_events_topic: conf.consolidated_events_topic.clone(),
authentication_analytics_topic: conf.authentication_analytics_topic.clone(),
ckh_database_name: None,
})
}
@ -342,6 +353,7 @@ impl KafkaProducer {
self.log_event(&KafkaEvent::old(
&KafkaFraudCheck::from_storage(&negative_event),
tenant_id.clone(),
self.ckh_database_name.clone(),
))
.attach_printable_lazy(|| {
format!("Failed to add negative fraud check event {negative_event:?}")
@ -351,6 +363,7 @@ impl KafkaProducer {
self.log_event(&KafkaEvent::new(
&KafkaFraudCheck::from_storage(attempt),
tenant_id.clone(),
self.ckh_database_name.clone(),
))
.attach_printable_lazy(|| {
format!("Failed to add positive fraud check event {attempt:?}")
@ -375,6 +388,7 @@ impl KafkaProducer {
self.log_event(&KafkaEvent::old(
&KafkaPaymentAttempt::from_storage(&negative_event),
tenant_id.clone(),
self.ckh_database_name.clone(),
))
.attach_printable_lazy(|| {
format!("Failed to add negative attempt event {negative_event:?}")
@ -384,6 +398,7 @@ impl KafkaProducer {
self.log_event(&KafkaEvent::new(
&KafkaPaymentAttempt::from_storage(attempt),
tenant_id.clone(),
self.ckh_database_name.clone(),
))
.attach_printable_lazy(|| format!("Failed to add positive attempt event {attempt:?}"))?;
@ -402,6 +417,7 @@ impl KafkaProducer {
self.log_event(&KafkaEvent::old(
&KafkaPaymentAttempt::from_storage(delete_old_attempt),
tenant_id.clone(),
self.ckh_database_name.clone(),
))
.attach_printable_lazy(|| {
format!("Failed to add negative attempt event {delete_old_attempt:?}")
@ -418,6 +434,7 @@ impl KafkaProducer {
self.log_event(&KafkaEvent::old(
&KafkaAuthentication::from_storage(&negative_event),
tenant_id.clone(),
self.ckh_database_name.clone(),
))
.attach_printable_lazy(|| {
format!("Failed to add negative authentication event {negative_event:?}")
@ -427,6 +444,7 @@ impl KafkaProducer {
self.log_event(&KafkaEvent::new(
&KafkaAuthentication::from_storage(authentication),
tenant_id.clone(),
self.ckh_database_name.clone(),
))
.attach_printable_lazy(|| {
format!("Failed to add positive authentication event {authentication:?}")
@ -451,6 +469,7 @@ impl KafkaProducer {
self.log_event(&KafkaEvent::old(
&KafkaPaymentIntent::from_storage(&negative_event),
tenant_id.clone(),
self.ckh_database_name.clone(),
))
.attach_printable_lazy(|| {
format!("Failed to add negative intent event {negative_event:?}")
@ -460,6 +479,7 @@ impl KafkaProducer {
self.log_event(&KafkaEvent::new(
&KafkaPaymentIntent::from_storage(intent),
tenant_id.clone(),
self.ckh_database_name.clone(),
))
.attach_printable_lazy(|| format!("Failed to add positive intent event {intent:?}"))?;
@ -478,6 +498,7 @@ impl KafkaProducer {
self.log_event(&KafkaEvent::old(
&KafkaPaymentIntent::from_storage(delete_old_intent),
tenant_id.clone(),
self.ckh_database_name.clone(),
))
.attach_printable_lazy(|| {
format!("Failed to add negative intent event {delete_old_intent:?}")
@ -494,6 +515,7 @@ impl KafkaProducer {
self.log_event(&KafkaEvent::old(
&KafkaRefund::from_storage(&negative_event),
tenant_id.clone(),
self.ckh_database_name.clone(),
))
.attach_printable_lazy(|| {
format!("Failed to add negative refund event {negative_event:?}")
@ -503,6 +525,7 @@ impl KafkaProducer {
self.log_event(&KafkaEvent::new(
&KafkaRefund::from_storage(refund),
tenant_id.clone(),
self.ckh_database_name.clone(),
))
.attach_printable_lazy(|| format!("Failed to add positive refund event {refund:?}"))?;
@ -521,6 +544,7 @@ impl KafkaProducer {
self.log_event(&KafkaEvent::old(
&KafkaRefund::from_storage(delete_old_refund),
tenant_id.clone(),
self.ckh_database_name.clone(),
))
.attach_printable_lazy(|| {
format!("Failed to add negative refund event {delete_old_refund:?}")
@ -537,6 +561,7 @@ impl KafkaProducer {
self.log_event(&KafkaEvent::old(
&KafkaDispute::from_storage(&negative_event),
tenant_id.clone(),
self.ckh_database_name.clone(),
))
.attach_printable_lazy(|| {
format!("Failed to add negative dispute event {negative_event:?}")
@ -546,6 +571,7 @@ impl KafkaProducer {
self.log_event(&KafkaEvent::new(
&KafkaDispute::from_storage(dispute),
tenant_id.clone(),
self.ckh_database_name.clone(),
))
.attach_printable_lazy(|| format!("Failed to add positive dispute event {dispute:?}"))?;
@ -564,12 +590,20 @@ impl KafkaProducer {
tenant_id: TenantID,
) -> MQResult<()> {
if let Some(negative_event) = old_payout {
self.log_event(&KafkaEvent::old(&negative_event, tenant_id.clone()))
self.log_event(&KafkaEvent::old(
&negative_event,
tenant_id.clone(),
self.ckh_database_name.clone(),
))
.attach_printable_lazy(|| {
format!("Failed to add negative payout event {negative_event:?}")
})?;
};
self.log_event(&KafkaEvent::new(payout, tenant_id.clone()))
self.log_event(&KafkaEvent::new(
payout,
tenant_id.clone(),
self.ckh_database_name.clone(),
))
.attach_printable_lazy(|| format!("Failed to add positive payout event {payout:?}"))
}
@ -579,7 +613,11 @@ impl KafkaProducer {
delete_old_payout: &KafkaPayout<'_>,
tenant_id: TenantID,
) -> MQResult<()> {
self.log_event(&KafkaEvent::old(delete_old_payout, tenant_id.clone()))
self.log_event(&KafkaEvent::old(
delete_old_payout,
tenant_id.clone(),
self.ckh_database_name.clone(),
))
.attach_printable_lazy(|| {
format!("Failed to add negative payout event {delete_old_payout:?}")
})
@ -631,7 +669,14 @@ impl MessagingInterface for KafkaProducer {
let topic = self.get_topic(data.get_message_class());
let json_data = data
.masked_serialize()
.and_then(|i| serde_json::to_vec(&i))
.and_then(|mut value| {
if let Value::Object(ref mut map) = value {
if let Some(db_name) = self.ckh_database_name.clone() {
map.insert("clickhouse_database".to_string(), Value::String(db_name));
}
}
serde_json::to_vec(&value)
})
.change_context(EventsError::SerializationError)?;
let mut headers = OwnedHeaders::new();
for (k, v) in metadata.iter() {
@ -640,6 +685,10 @@ impl MessagingInterface for KafkaProducer {
value: Some(v),
});
}
headers = headers.insert(Header {
key: "clickhouse_database",
value: self.ckh_database_name.as_ref(),
});
self.producer
.0
.send(

View File

@ -36,9 +36,6 @@ impl DbConnectionParams for Database {
pub trait TenantConfig: Send + Sync {
fn get_schema(&self) -> &str;
fn get_redis_key_prefix(&self) -> &str;
}
pub trait ClickHouseConfig: TenantConfig + Send + Sync {
fn get_clickhouse_database(&self) -> &str;
}