feat: make drainer logs queryable with request_id and global_id (#2771)

This commit is contained in:
Kartikeya Hegde
2023-11-06 13:45:15 +05:30
committed by GitHub
parent d335879f92
commit ff73aba8e7
7 changed files with 70 additions and 8 deletions

View File

@ -31,6 +31,7 @@ pub struct RouterStore<T: DatabaseStore> {
db_store: T,
cache_store: RedisStore,
master_encryption_key: StrongSecret<Vec<u8>>,
pub request_id: Option<String>,
}
#[async_trait::async_trait]
@ -103,6 +104,7 @@ impl<T: DatabaseStore> RouterStore<T> {
db_store,
cache_store,
master_encryption_key: encryption_key,
request_id: None,
})
}
@ -128,6 +130,7 @@ impl<T: DatabaseStore> RouterStore<T> {
db_store,
cache_store,
master_encryption_key: encryption_key,
request_id: None,
})
}
}
@ -138,6 +141,7 @@ pub struct KVRouterStore<T: DatabaseStore> {
drainer_stream_name: String,
drainer_num_partitions: u8,
ttl_for_kv: u32,
pub request_id: Option<String>,
}
#[async_trait::async_trait]
@ -179,11 +183,14 @@ impl<T: DatabaseStore> KVRouterStore<T> {
drainer_num_partitions: u8,
ttl_for_kv: u32,
) -> Self {
let request_id = store.request_id.clone();
Self {
router_store: store,
drainer_stream_name,
drainer_num_partitions,
ttl_for_kv,
request_id,
}
}
@ -203,6 +210,9 @@ impl<T: DatabaseStore> KVRouterStore<T> {
where
R: crate::redis::kv_store::KvStorePartition,
{
let global_id = format!("{}", partition_key);
let request_id = self.request_id.clone().unwrap_or_default();
let shard_key = R::shard_key(partition_key, self.drainer_num_partitions);
let stream_name = self.get_drainer_stream_name(&shard_key);
self.router_store
@ -212,7 +222,7 @@ impl<T: DatabaseStore> KVRouterStore<T> {
&stream_name,
&redis_interface::RedisEntryId::AutoGeneratedID,
redis_entry
.to_field_value_pairs()
.to_field_value_pairs(request_id, global_id)
.change_context(RedisError::JsonSerializationFailed)?,
)
.await