fix: replace xtrim with xdel to support older redis version (#8515)

Co-authored-by: Akshay S <akshay.s@juspay.in>
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
akshay-97
2025-07-29 16:02:02 +05:30
committed by GitHub
parent f6cdddcb98
commit 3d085abf38
4 changed files with 37 additions and 1 deletions

View File

@ -277,9 +277,15 @@ async fn drainer(
_ => break, _ => break,
}, },
} }
if store.use_legacy_version() {
store
.delete_from_stream(stream_name, &last_processed_id)
.await?;
}
} }
if !last_processed_id.is_empty() { if !(last_processed_id.is_empty() || store.use_legacy_version()) {
let entries_trimmed = store let entries_trimmed = store
.trim_from_stream(stream_name, &last_processed_id) .trim_from_stream(stream_name, &last_processed_id)
.await?; .await?;

View File

@ -18,3 +18,4 @@ histogram_metric_f64!(REDIS_STREAM_READ_TIME, DRAINER_METER); // Time in (ms) mi
histogram_metric_f64!(REDIS_STREAM_TRIM_TIME, DRAINER_METER); // Time in (ms) milliseconds histogram_metric_f64!(REDIS_STREAM_TRIM_TIME, DRAINER_METER); // Time in (ms) milliseconds
histogram_metric_f64!(CLEANUP_TIME, DRAINER_METER); // Time in (ms) milliseconds histogram_metric_f64!(CLEANUP_TIME, DRAINER_METER); // Time in (ms) milliseconds
histogram_metric_u64!(DRAINER_DELAY_SECONDS, DRAINER_METER); // Time in (s) seconds histogram_metric_u64!(DRAINER_DELAY_SECONDS, DRAINER_METER); // Time in (s) seconds
histogram_metric_f64!(REDIS_STREAM_DEL_TIME, DRAINER_METER); // Time in (ms) milliseconds

View File

@ -22,6 +22,7 @@ pub struct Store {
pub struct StoreConfig { pub struct StoreConfig {
pub drainer_stream_name: String, pub drainer_stream_name: String,
pub drainer_num_partitions: u8, pub drainer_num_partitions: u8,
pub use_legacy_version: bool,
} }
impl Store { impl Store {
@ -45,10 +46,15 @@ impl Store {
config: StoreConfig { config: StoreConfig {
drainer_stream_name: config.drainer.stream_name.clone(), drainer_stream_name: config.drainer.stream_name.clone(),
drainer_num_partitions: config.drainer.num_partitions, drainer_num_partitions: config.drainer.num_partitions,
use_legacy_version: config.redis.use_legacy_version,
}, },
request_id: None, request_id: None,
} }
} }
pub fn use_legacy_version(&self) -> bool {
self.config.use_legacy_version
}
} }
pub fn log_and_return_error_response<T>(error: Report<T>) -> HttpResponse pub fn log_and_return_error_response<T>(error: Report<T>) -> HttpResponse

View File

@ -110,4 +110,27 @@ impl Store {
// adding 1 because we are deleting the given id too // adding 1 because we are deleting the given id too
Ok(trim_result? + 1) Ok(trim_result? + 1)
} }
pub async fn delete_from_stream(
&self,
stream_name: &str,
entry_id: &str,
) -> errors::DrainerResult<()> {
let (_trim_result, execution_time) =
common_utils::date_time::time_it::<errors::DrainerResult<_>, _, _>(|| async {
self.redis_conn
.stream_delete_entries(&stream_name.into(), entry_id)
.await
.map_err(errors::DrainerError::from)?;
Ok(())
})
.await;
metrics::REDIS_STREAM_DEL_TIME.record(
execution_time,
router_env::metric_attributes!(("stream", stream_name.to_owned())),
);
Ok(())
}
} }