diff --git a/Cargo.lock b/Cargo.lock index e2854b592d..9d13e896cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1149,7 +1149,6 @@ name = "drainer" version = "0.1.0" dependencies = [ "error-stack", - "fred", "redis_interface", "router", "serde_json", diff --git a/crates/drainer/Cargo.toml b/crates/drainer/Cargo.toml index 1ec8d37ad0..b5b149bd92 100644 --- a/crates/drainer/Cargo.toml +++ b/crates/drainer/Cargo.toml @@ -9,11 +9,11 @@ license = "Apache-2.0" [dependencies] error-stack = "0.2.4" -fred = { version = "5.2.0", features = ["metrics", "partial-tracing"] } serde_json = "1.0.89" structopt = "0.3.26" thiserror = "1.0.37" tokio = { version = "1.21.2", features = ["macros", "rt-multi-thread"] } + # First Party Crates redis_interface = { version = "0.1.0", path = "../redis_interface" } router = { version = "0.2.0", path = "../router", features = ["kv_store"] } diff --git a/crates/drainer/src/lib.rs b/crates/drainer/src/lib.rs index f52fbd1820..1f83444ea9 100644 --- a/crates/drainer/src/lib.rs +++ b/crates/drainer/src/lib.rs @@ -60,15 +60,18 @@ async fn drainer( }; let conn = pg_connection(&store.master_pool).await; - + let insert_op = "insert"; + let update_op = "update"; + let payment_intent = "payment_intent"; + let payment_attempt = "payment_attempt"; match db_op { // TODO: Handle errors kv::DBOperation::Insert { insertable } => match insertable { kv::Insertable::PaymentIntent(a) => { - macro_util::handle_resp!(a.insert(&conn).await, "ins", "pi") + macro_util::handle_resp!(a.insert(&conn).await, insert_op, payment_intent) } kv::Insertable::PaymentAttempt(a) => { - macro_util::handle_resp!(a.insert(&conn).await, "ins", "pa") + macro_util::handle_resp!(a.insert(&conn).await, insert_op, payment_attempt) } kv::Insertable::Refund(a) => { macro_util::handle_resp!(a.insert(&conn).await, "ins", "ref") @@ -76,10 +79,18 @@ async fn drainer( }, kv::DBOperation::Update { updatable } => match updatable { kv::Updateable::PaymentIntentUpdate(a) => { - macro_util::handle_resp!(a.orig.update(&conn, a.update_data).await, "up", "pi") + macro_util::handle_resp!( + a.orig.update(&conn, a.update_data).await, + update_op, + payment_intent + ) } kv::Updateable::PaymentAttemptUpdate(a) => { - macro_util::handle_resp!(a.orig.update(&conn, a.update_data).await, "up", "pa") + macro_util::handle_resp!( + a.orig.update(&conn, a.update_data).await, + update_op, + payment_attempt + ) } kv::Updateable::RefundUpdate(a) => { macro_util::handle_resp!(a.orig.update(&conn, a.update_data).await, "up", "ref") diff --git a/crates/drainer/src/utils.rs b/crates/drainer/src/utils.rs index 44b76659b2..33c304eae0 100644 --- a/crates/drainer/src/utils.rs +++ b/crates/drainer/src/utils.rs @@ -1,7 +1,6 @@ use std::{collections::HashMap, sync::Arc}; use error_stack::{IntoReport, ResultExt}; -use fred::types as fred; use redis_interface as redis; use router::services::Store; @@ -31,11 +30,10 @@ pub async fn read_from_stream( max_read_count: u64, redis: &redis::RedisConnectionPool, ) -> errors::DrainerResult { - let stream_key = fred::MultipleKeys::from(stream_name); // "0-0" id gives first entry let stream_id = "0-0"; let entries = redis - .stream_read_entries(stream_key, stream_id, Some(max_read_count)) + .stream_read_entries(stream_name, stream_id, Some(max_read_count)) .await .change_context(errors::DrainerError::StreamReadError( stream_name.to_owned(), @@ -48,17 +46,11 @@ pub async fn trim_from_stream( minimum_entry_id: &str, redis: &redis::RedisConnectionPool, ) -> errors::DrainerResult { - let trim_kind = fred::XCapKind::MinID; - let trim_type = fred::XCapTrim::Exact; - let trim_id = fred::StringOrNumber::String(minimum_entry_id.into()); - let xcap = fred::XCap::try_from((trim_kind, trim_type, trim_id)) - .into_report() - .change_context(errors::DrainerError::StreamTrimFailed( - stream_name.to_owned(), - ))?; - + let trim_kind = redis::StreamCapKind::MinID; + let trim_type = redis::StreamCapTrim::Exact; + let trim_id = minimum_entry_id; let trim_result = redis - .stream_trim_entries(stream_name, xcap) + .stream_trim_entries(stream_name, (trim_kind, trim_type, trim_id)) .await .change_context(errors::DrainerError::StreamTrimFailed( stream_name.to_owned(), diff --git a/crates/redis_interface/src/types.rs b/crates/redis_interface/src/types.rs index d765fe4ce6..ba4b1b8455 100644 --- a/crates/redis_interface/src/types.rs +++ b/crates/redis_interface/src/types.rs @@ -142,3 +142,33 @@ impl fred::types::FromRedis for MsetnxReply { } } } + +#[derive(Debug)] +pub enum StreamCapKind { + MinID, + MaxLen, +} + +impl From for fred::types::XCapKind { + fn from(item: StreamCapKind) -> Self { + match item { + StreamCapKind::MaxLen => Self::MaxLen, + StreamCapKind::MinID => Self::MinID, + } + } +} + +#[derive(Debug)] +pub enum StreamCapTrim { + Exact, + AlmostExact, +} + +impl From for fred::types::XCapTrim { + fn from(item: StreamCapTrim) -> Self { + match item { + StreamCapTrim::Exact => Self::Exact, + StreamCapTrim::AlmostExact => Self::AlmostExact, + } + } +}