feat(drainer): added drainer which reads from redis stream and executes queries on DB (#142)

This commit is contained in:
Abhishek
2022-12-16 15:38:03 +05:30
committed by GitHub
parent 3db49d0530
commit 3bad58b0d3
41 changed files with 648 additions and 655 deletions

View File

@ -0,0 +1,19 @@
use thiserror::Error;
#[derive(Debug, Error)]
pub enum DrainerError {
#[error("Error in parsing config : {0}")]
ConfigParsingError(String),
#[error("Error fetching stream length for stream : {0}")]
StreamGetLengthError(String),
#[error("Error reading from stream : {0}")]
StreamReadError(String),
#[error("Error triming from stream: {0}")]
StreamTrimFailed(String),
#[error("No entries found for stream: {0}")]
NoStreamEntry(String),
#[error("Error in making stream: {0} available")]
DeleteKeyFailed(String),
}
pub type DrainerResult<T> = error_stack::Result<T, DrainerError>;

107
crates/drainer/src/lib.rs Normal file
View File

@ -0,0 +1,107 @@
pub mod errors;
mod utils;
use std::sync::Arc;
use router::{connection::pg_connection, services::Store};
use storage_models::kv;
pub async fn start_drainer(
store: Arc<Store>,
number_of_streams: u8,
max_read_count: u64,
) -> errors::DrainerResult<()> {
let mut stream_index: u8 = 0;
loop {
if utils::is_stream_available(stream_index, store.clone()).await {
tokio::spawn(drainer_handler(store.clone(), stream_index, max_read_count));
}
stream_index = utils::increment_stream_index(stream_index, number_of_streams);
}
}
async fn drainer_handler(
store: Arc<Store>,
stream_index: u8,
max_read_count: u64,
) -> errors::DrainerResult<()> {
let stream_name = utils::get_drainer_stream(store.clone(), stream_index);
let drainer_result = drainer(store.clone(), max_read_count, stream_name.as_str()).await;
if let Err(_e) = drainer_result {
//TODO: LOG ERRORs
}
let flag_stream_name = utils::get_stream_key_flag(store.clone(), stream_index);
//TODO: USE THE RESULT FOR LOGGING
utils::make_stream_available(flag_stream_name.as_str(), store.redis_conn.as_ref()).await
}
async fn drainer(
store: Arc<Store>,
max_read_count: u64,
stream_name: &str,
) -> errors::DrainerResult<()> {
let stream_read =
utils::read_from_stream(stream_name, max_read_count, store.redis_conn.as_ref()).await?; // this returns the error.
// parse_stream_entries returns error if no entries is found, handle it
let (entries, last_entry_id) = utils::parse_stream_entries(&stream_read, stream_name)?;
let read_count = entries.len();
// TODO: Handle errors when deserialization fails and when DB error occurs
for entry in entries {
let typed_sql = entry.1.get("typed_sql").map_or(String::new(), Clone::clone);
let result = serde_json::from_str::<kv::DBOperation>(&typed_sql);
let db_op = match result {
Ok(f) => f,
Err(_err) => continue, // TODO: handle error
};
let conn = pg_connection(&store.master_pool).await;
match db_op {
// TODO: Handle errors
kv::DBOperation::Insert { insertable } => match insertable {
kv::Insertable::PaymentIntent(a) => {
macro_util::handle_resp!(a.insert(&conn).await, "ins", "pi")
}
kv::Insertable::PaymentAttempt(a) => {
macro_util::handle_resp!(a.insert(&conn).await, "ins", "pa")
}
},
kv::DBOperation::Update { updatable } => match updatable {
kv::Updateable::PaymentIntentUpdate(a) => {
macro_util::handle_resp!(a.orig.update(&conn, a.update_data).await, "up", "pi")
}
kv::Updateable::PaymentAttemptUpdate(a) => {
macro_util::handle_resp!(a.orig.update(&conn, a.update_data).await, "up", "pa")
}
},
kv::DBOperation::Delete => todo!(),
};
}
let entries_trimmed =
utils::trim_from_stream(stream_name, last_entry_id.as_str(), &store.redis_conn).await?;
if read_count != entries_trimmed {
// TODO: log
}
Ok(())
}
mod macro_util {
macro_rules! handle_resp {
($result:expr,$op_type:expr, $table:expr) => {
match $result {
Ok(aa) => println!("Ok|{}|{}|{:?}|", $op_type, $table, aa),
Err(err) => println!("Err|{}|{}|{:?}|", $op_type, $table, err),
}
};
}
pub(crate) use handle_resp;
}

View File

@ -0,0 +1,20 @@
use drainer::{errors::DrainerResult, start_drainer};
use router::configs::settings;
use structopt::StructOpt;
#[tokio::main]
async fn main() -> DrainerResult<()> {
// Get configuration
let cmd_line = settings::CmdLineConf::from_args();
let conf = settings::Settings::with_config_path(cmd_line.config_path).unwrap();
let store = router::services::Store::new(&conf, false).await;
let store = std::sync::Arc::new(store);
let number_of_drainers = conf.drainer.num_partitions;
let max_read_count = conf.drainer.max_read_count;
start_drainer(store, number_of_drainers, max_read_count).await?;
Ok(())
}

121
crates/drainer/src/utils.rs Normal file
View File

@ -0,0 +1,121 @@
use std::{collections::HashMap, sync::Arc};
use error_stack::{IntoReport, ResultExt};
use fred::types as fred;
use redis_interface as redis;
use router::services::Store;
use crate::errors;
pub type StreamEntries = Vec<(String, HashMap<String, String>)>;
pub type StreamReadResult = HashMap<String, StreamEntries>;
pub async fn is_stream_available(stream_index: u8, store: Arc<router::services::Store>) -> bool {
let stream_key_flag = get_stream_key_flag(store.clone(), stream_index);
match store
.redis_conn
.set_key_if_not_exist(stream_key_flag.as_str(), true)
.await
{
Ok(resp) => resp == redis::types::SetnxReply::KeySet,
Err(_e) => {
// Add metrics or logs
false
}
}
}
pub async fn read_from_stream(
stream_name: &str,
max_read_count: u64,
redis: &redis::RedisConnectionPool,
) -> errors::DrainerResult<StreamReadResult> {
let stream_key = fred::MultipleKeys::from(stream_name);
// "0-0" id gives first entry
let stream_id = "0-0";
let entries = redis
.stream_read_entries(stream_key, stream_id, Some(max_read_count))
.await
.change_context(errors::DrainerError::StreamReadError(
stream_name.to_owned(),
))?;
Ok(entries)
}
pub async fn trim_from_stream(
stream_name: &str,
minimum_entry_id: &str,
redis: &redis::RedisConnectionPool,
) -> errors::DrainerResult<usize> {
let trim_kind = fred::XCapKind::MinID;
let trim_type = fred::XCapTrim::Exact;
let trim_id = fred::StringOrNumber::String(minimum_entry_id.into());
let xcap = fred::XCap::try_from((trim_kind, trim_type, trim_id))
.into_report()
.change_context(errors::DrainerError::StreamTrimFailed(
stream_name.to_owned(),
))?;
let trim_result = redis
.stream_trim_entries(stream_name, xcap)
.await
.change_context(errors::DrainerError::StreamTrimFailed(
stream_name.to_owned(),
))?;
// Since xtrim deletes entires below given id excluding the given id.
// Hence, deleting the minimum entry id
redis
.stream_delete_entries(stream_name, minimum_entry_id)
.await
.change_context(errors::DrainerError::StreamTrimFailed(
stream_name.to_owned(),
))?;
// adding 1 because we are deleting the given id too
Ok(trim_result + 1)
}
pub async fn make_stream_available(
stream_name_flag: &str,
redis: &redis::RedisConnectionPool,
) -> errors::DrainerResult<()> {
redis
.delete_key(stream_name_flag)
.await
.change_context(errors::DrainerError::DeleteKeyFailed(
stream_name_flag.to_owned(),
))
}
pub fn parse_stream_entries<'a>(
read_result: &'a StreamReadResult,
stream_name: &str,
) -> errors::DrainerResult<(&'a StreamEntries, String)> {
read_result
.get(stream_name)
.and_then(|entries| {
entries
.last()
.map(|last_entry| (entries, last_entry.0.clone()))
})
.ok_or_else(|| errors::DrainerError::NoStreamEntry(stream_name.to_owned()))
.into_report()
}
pub fn increment_stream_index(index: u8, total_streams: u8) -> u8 {
if index == total_streams - 1 {
0
} else {
index + 1
}
}
pub(crate) fn get_stream_key_flag(store: Arc<router::services::Store>, stream_index: u8) -> String {
format!("{}_in_use", get_drainer_stream(store, stream_index))
}
pub(crate) fn get_drainer_stream(store: Arc<Store>, stream_index: u8) -> String {
store.drainer_stream(format!("shard_{}", stream_index).as_str())
}