feat(multitenancy): add support for multitenancy and handle the same in router, producer, consumer, drainer and analytics (#4630)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: Arun Raj M <jarnura47@gmail.com>
This commit is contained in:
Jagan
2024-06-03 17:57:30 +05:30
committed by GitHub
parent a1788b8da9
commit 15d6c3e846
188 changed files with 2260 additions and 1414 deletions

View File

@ -1,4 +1,7 @@
use std::sync::{atomic, Arc};
use std::{
collections::HashMap,
sync::{atomic, Arc},
};
use router_env::tracing::Instrument;
use tokio::{
@ -31,12 +34,12 @@ pub struct HandlerInner {
loop_interval: Duration,
active_tasks: Arc<atomic::AtomicU64>,
conf: DrainerSettings,
store: Arc<Store>,
stores: HashMap<String, Arc<Store>>,
running: Arc<atomic::AtomicBool>,
}
impl Handler {
pub fn from_conf(conf: DrainerSettings, store: Arc<Store>) -> Self {
pub fn from_conf(conf: DrainerSettings, stores: HashMap<String, Arc<Store>>) -> Self {
let shutdown_interval = Duration::from_millis(conf.shutdown_interval.into());
let loop_interval = Duration::from_millis(conf.loop_interval.into());
@ -49,7 +52,7 @@ impl Handler {
loop_interval,
active_tasks,
conf,
store,
stores,
running,
};
@ -68,21 +71,23 @@ impl Handler {
while self.running.load(atomic::Ordering::SeqCst) {
metrics::DRAINER_HEALTH.add(&metrics::CONTEXT, 1, &[]);
if self.store.is_stream_available(stream_index).await {
let _task_handle = tokio::spawn(
drainer_handler(
self.store.clone(),
stream_index,
self.conf.max_read_count,
self.active_tasks.clone(),
jobs_picked.clone(),
)
.in_current_span(),
);
for store in self.stores.values() {
if store.is_stream_available(stream_index).await {
let _task_handle = tokio::spawn(
drainer_handler(
store.clone(),
stream_index,
self.conf.max_read_count,
self.active_tasks.clone(),
jobs_picked.clone(),
)
.in_current_span(),
);
}
}
stream_index = utils::increment_stream_index(
(stream_index, jobs_picked.clone()),
self.store.config.drainer_num_partitions,
self.conf.num_partitions,
)
.await;
time::sleep(self.loop_interval).await;
@ -116,18 +121,33 @@ impl Handler {
pub fn spawn_error_handlers(&self, tx: mpsc::Sender<()>) -> errors::DrainerResult<()> {
let (redis_error_tx, redis_error_rx) = oneshot::channel();
let redis_conn_clone = self
.stores
.values()
.next()
.map(|store| store.redis_conn.clone());
match redis_conn_clone {
None => {
logger::error!("No redis connection found");
Err(
errors::DrainerError::UnexpectedError("No redis connection found".to_string())
.into(),
)
}
Some(redis_conn_clone) => {
// Spawn a task to monitor if redis is down or not
let _task_handle = tokio::spawn(
async move { redis_conn_clone.on_error(redis_error_tx).await }
.in_current_span(),
);
let redis_conn_clone = self.store.redis_conn.clone();
//Spawns a task to send shutdown signal if redis goes down
let _task_handle =
tokio::spawn(redis_error_receiver(redis_error_rx, tx).in_current_span());
// Spawn a task to monitor if redis is down or not
let _task_handle = tokio::spawn(
async move { redis_conn_clone.on_error(redis_error_tx).await }.in_current_span(),
);
//Spawns a task to send shutdown signal if redis goes down
let _task_handle = tokio::spawn(redis_error_receiver(redis_error_rx, tx).in_current_span());
Ok(())
Ok(())
}
}
}
}
@ -208,7 +228,10 @@ async fn drainer(
};
// parse_stream_entries returns error if no entries is found, handle it
let entries = utils::parse_stream_entries(&stream_read, stream_name)?;
let entries = utils::parse_stream_entries(
&stream_read,
store.redis_conn.add_prefix(stream_name).as_str(),
)?;
let read_count = entries.len();
metrics::JOBS_PICKED_PER_STREAM.add(