refactor(drainer): removed router dependency from drainer (#209)

This commit is contained in:
Abhishek
2022-12-23 15:11:06 +05:30
committed by GitHub
parent 7274fd70c6
commit c9276a30d7
11 changed files with 252 additions and 43 deletions

View File

@ -1,15 +1,17 @@
use std::{collections::HashMap, sync::Arc};
use error_stack::{IntoReport, ResultExt};
use error_stack::IntoReport;
use redis_interface as redis;
use router::services::Store;
use crate::errors;
use crate::{
errors::{self, DrainerError},
services,
};
pub type StreamEntries = Vec<(String, HashMap<String, String>)>;
pub type StreamReadResult = HashMap<String, StreamEntries>;
pub async fn is_stream_available(stream_index: u8, store: Arc<router::services::Store>) -> bool {
pub async fn is_stream_available(stream_index: u8, store: Arc<services::Store>) -> bool {
let stream_key_flag = get_stream_key_flag(store.clone(), stream_index);
match store
@ -35,9 +37,8 @@ pub async fn read_from_stream(
let entries = redis
.stream_read_entries(stream_name, stream_id, Some(max_read_count))
.await
.change_context(errors::DrainerError::StreamReadError(
stream_name.to_owned(),
))?;
.map_err(DrainerError::from)
.into_report()?;
Ok(entries)
}
@ -52,18 +53,16 @@ pub async fn trim_from_stream(
let trim_result = redis
.stream_trim_entries(stream_name, (trim_kind, trim_type, trim_id))
.await
.change_context(errors::DrainerError::StreamTrimFailed(
stream_name.to_owned(),
))?;
.map_err(DrainerError::from)
.into_report()?;
// Since xtrim deletes entires below given id excluding the given id.
// Hence, deleting the minimum entry id
redis
.stream_delete_entries(stream_name, minimum_entry_id)
.await
.change_context(errors::DrainerError::StreamTrimFailed(
stream_name.to_owned(),
))?;
.map_err(DrainerError::from)
.into_report()?;
// adding 1 because we are deleting the given id too
Ok(trim_result + 1)
@ -76,9 +75,8 @@ pub async fn make_stream_available(
redis
.delete_key(stream_name_flag)
.await
.change_context(errors::DrainerError::DeleteKeyFailed(
stream_name_flag.to_owned(),
))
.map_err(DrainerError::from)
.into_report()
}
pub fn parse_stream_entries<'a>(
@ -92,7 +90,11 @@ pub fn parse_stream_entries<'a>(
.last()
.map(|last_entry| (entries, last_entry.0.clone()))
})
.ok_or_else(|| errors::DrainerError::NoStreamEntry(stream_name.to_owned()))
.ok_or_else(|| {
errors::DrainerError::RedisError(error_stack::report!(
redis::errors::RedisError::NotFound
))
})
.into_report()
}
@ -104,10 +106,10 @@ pub fn increment_stream_index(index: u8, total_streams: u8) -> u8 {
}
}
pub(crate) fn get_stream_key_flag(store: Arc<router::services::Store>, stream_index: u8) -> String {
pub(crate) fn get_stream_key_flag(store: Arc<services::Store>, stream_index: u8) -> String {
format!("{}_in_use", get_drainer_stream_name(store, stream_index))
}
pub(crate) fn get_drainer_stream_name(store: Arc<Store>, stream_index: u8) -> String {
store.get_drainer_stream_name(format!("shard_{}", stream_index).as_str())
pub(crate) fn get_drainer_stream_name(store: Arc<services::Store>, stream_index: u8) -> String {
store.drainer_stream(format!("shard_{}", stream_index).as_str())
}