fix: batch fetch collab from redis stream (#1453)

* fix: batch fetch collab from redis stream
This commit is contained in:
Nathan.fooo
2025-06-16 12:42:06 +08:00
committed by GitHub
parent c94694580c
commit 9b83678487
10 changed files with 312 additions and 174 deletions

View File

@@ -132,7 +132,6 @@ pub trait CollabStorage: Send + Sync + 'static {
uid: &i64,
workspace_id: Uuid,
queries: Vec<QueryCollab>,
from_editing_collab: bool,
) -> HashMap<Uuid, QueryCollabResult>;
/// Deletes a collaboration from the storage.

View File

@@ -21,10 +21,11 @@ use database_entity::dto::{
use futures_util::{stream, StreamExt};
use infra::thread_pool::ThreadPoolNoAbort;
use itertools::{Either, Itertools};
use rayon::prelude::*;
use sqlx::{PgPool, Transaction};
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{error, instrument};
use tracing::{debug, error, instrument};
use uuid::Uuid;
use yrs::updates::decoder::Decode;
use yrs::updates::encoder::Encode;
@@ -158,7 +159,8 @@ impl CollabCache {
Ok(())
}
pub async fn get_encode_collab(
#[instrument(level = "trace", skip_all)]
pub async fn get_snapshot_collab(
&self,
workspace_id: &Uuid,
query: QueryCollab,
@@ -221,9 +223,18 @@ impl CollabCache {
encoding: EncoderVersion,
) -> Result<(Rid, EncodedCollab), AppError> {
let object_id = query.object_id;
let (rid, mut encoded_collab) = match self.get_encode_collab(workspace_id, query).await {
Ok((rid, encoded_collab)) => (rid, Some(encoded_collab)),
Err(AppError::RecordNotFound(_)) => (Rid::default(), None),
let (rid, mut encoded_collab) = match self.get_snapshot_collab(workspace_id, query).await {
Ok((rid, encoded_collab)) => {
debug!("Snapshot Collab:{} found in cache", object_id);
(rid, Some(encoded_collab))
},
Err(AppError::RecordNotFound(_)) => {
debug!(
"Snapshot Collab:{} not found in cache, returning default state",
object_id
);
(Rid::default(), None)
},
Err(err) => return Err(err),
};
@@ -231,41 +242,20 @@ impl CollabCache {
if !self.dirty_collabs.contains(&object_id) {
// there are no pending updates for this collab, so we can return the cached value directly
tracing::trace!("no pending updates for collab: {}", object_id);
match encoded_collab {
return match encoded_collab {
Some(encoded_collab) if encoded_collab.doc_state.len() <= self.small_collab_size => {
return Ok((rid, encoded_collab));
Ok((rid, encoded_collab))
},
Some(encoded_collab) => {
// If the collab is large, we do not replay updates and return the snapshot only.
let options = CollabOptions::new(object_id.to_string(), default_client_id())
.with_data_source(match encoded_collab.version {
EncoderVersion::V1 => DataSource::DocStateV1(encoded_collab.doc_state.into()),
EncoderVersion::V2 => DataSource::DocStateV2(encoded_collab.doc_state.into()),
});
let collab = Collab::new_with_options(CollabOrigin::Server, options)
.map_err(|err| AppError::Internal(err.into()))?;
let tx = collab.transact();
let doc_state: Bytes = match encoded_collab.version {
EncoderVersion::V1 => tx.encode_diff_v1(&from),
EncoderVersion::V2 => tx.encode_diff_v2(&from),
}
.into();
return Ok((
rid,
EncodedCollab {
version: encoded_collab.version,
state_vector: encoded_collab.state_vector,
doc_state,
},
));
let diff_encoded = self.encode_diff_for_large_collab(object_id, encoded_collab, &from)?;
Ok((rid, diff_encoded))
},
None => {
return Err(AppError::RecordNotFound(format!(
"Collab not found for object_id: {}",
object_id
)));
},
}
None => Err(AppError::RecordNotFound(format!(
"Collab not found for object_id: {}",
object_id
))),
};
}
let updates = self
@@ -324,9 +314,11 @@ impl CollabCache {
.await
}
/// Batch get the encoded collab data from the cache.
/// returns a hashmap of the object_id to the encoded collab data.
pub async fn batch_get_encode_collab<T: Into<QueryCollab>>(
/// Batch get the encoded collab **SNAPSHOT** data from the cache.
/// This function only returns cached/stored data and does NOT apply pending updates.
/// Use batch_get_full_collab() if you need current state with updates applied.
/// Returns a hashmap of the object_id to the encoded collab data.
pub async fn batch_get_snapshot_collab<T: Into<QueryCollab>>(
&self,
workspace_id: &Uuid,
queries: Vec<T>,
@@ -367,6 +359,89 @@ impl CollabCache {
results
}
/// Batch get the encoded collab data from the cache (DEPRECATED).
/// This function is deprecated - use batch_get_snapshot_collab() or batch_get_full_collab() instead.
/// Returns a hashmap of the object_id to the encoded collab data.
#[deprecated(
note = "Use batch_get_snapshot_collab() for snapshots or batch_get_full_collab() for current state"
)]
pub async fn batch_get_encode_collab<T: Into<QueryCollab>>(
&self,
workspace_id: &Uuid,
queries: Vec<T>,
) -> HashMap<Uuid, QueryCollabResult> {
self.batch_get_snapshot_collab(workspace_id, queries).await
}
/// Batch get the FULL/CURRENT collab data (applying pending updates for dirty collabs).
/// This function is consistent with get_full_collab - it applies pending updates.
/// Returns a hashmap of the object_id to the encoded collab data.
#[instrument(level = "debug", skip_all)]
pub async fn batch_get_full_collab<T: Into<QueryCollab>>(
&self,
workspace_id: &Uuid,
queries: Vec<T>,
from: Option<StateVector>,
encoding: EncoderVersion,
) -> HashMap<Uuid, QueryCollabResult> {
let queries = queries.into_iter().map(Into::into).collect::<Vec<_>>();
let mut results = HashMap::new();
// For batch efficiency, separate dirty and clean collabs
let mut clean_queries = Vec::new();
let mut dirty_queries = Vec::new();
for query in queries {
if self.dirty_collabs.contains(&query.object_id) {
dirty_queries.push(query);
} else {
clean_queries.push(query);
}
}
debug!(
"Batch processing collabs: {} clean, {} dirty",
clean_queries.len(),
dirty_queries.len()
);
if !clean_queries.is_empty() {
let clean_results = self
.batch_get_snapshot_collab(workspace_id, clean_queries)
.await;
results.extend(clean_results);
}
let mut encoded_collab_by_object_id: HashMap<Uuid, EncodedCollab> =
HashMap::with_capacity(dirty_queries.len());
for query in dirty_queries {
let object_id = query.object_id;
if let Ok((_, encoded_collab)) = self
.get_full_collab(workspace_id, query, from.clone(), encoding.clone())
.await
{
encoded_collab_by_object_id.insert(object_id, encoded_collab);
}
}
if let Ok(entries) = self.thread_pool.install(|| {
encoded_collab_by_object_id
.into_par_iter()
.map(|(object_id, encoded_collab)| {
(
object_id,
QueryCollabResult::Success {
encode_collab_v1: encoded_collab.encode_to_bytes().unwrap(),
},
)
})
.collect::<HashMap<_, _>>()
}) {
results.extend(entries);
}
results
}
/// Insert the encoded collab data into the cache.
/// The data is inserted into both the memory and disk cache.
pub async fn insert_encode_collab_data(
@@ -539,6 +614,37 @@ impl CollabCache {
Ok(())
}
/// Encode diff for a large collab without full replay (optimization for clean large collabs)
fn encode_diff_for_large_collab(
&self,
object_id: Uuid,
encoded_collab: EncodedCollab,
from: &StateVector,
) -> Result<EncodedCollab, AppError> {
let options = CollabOptions::new(object_id.to_string(), default_client_id()).with_data_source(
match encoded_collab.version {
EncoderVersion::V1 => DataSource::DocStateV1(encoded_collab.doc_state.into()),
EncoderVersion::V2 => DataSource::DocStateV2(encoded_collab.doc_state.into()),
},
);
let collab = Collab::new_with_options(CollabOrigin::Server, options)
.map_err(|err| AppError::Internal(err.into()))?;
let tx = collab.transact();
let doc_state: Bytes = match encoded_collab.version {
EncoderVersion::V1 => tx.encode_diff_v1(from),
EncoderVersion::V2 => tx.encode_diff_v2(from),
}
.into();
Ok(EncodedCollab {
version: encoded_collab.version,
state_vector: encoded_collab.state_vector,
doc_state,
})
}
}
#[inline]

View File

@@ -134,6 +134,7 @@ where
Ok(())
}
#[allow(dead_code)]
async fn batch_get_encode_collab_from_editing(
&self,
object_ids: Vec<Uuid>,
@@ -401,68 +402,14 @@ where
_uid: &i64,
workspace_id: Uuid,
queries: Vec<QueryCollab>,
from_editing_collab: bool,
) -> HashMap<Uuid, QueryCollabResult> {
if queries.is_empty() {
return HashMap::new();
}
// Partition queries based on validation into valid queries and errors (with associated error messages).
let (valid_queries, mut results): (Vec<_>, HashMap<_, _>) =
queries
.into_iter()
.partition_map(|params| match params.validate() {
Ok(_) => Either::Left(params),
Err(err) => Either::Right((
params.object_id,
QueryCollabResult::Failed {
error: err.to_string(),
},
)),
});
let cache_queries = if from_editing_collab {
let editing_queries = valid_queries.clone();
let editing_results = self
.batch_get_encode_collab_from_editing(editing_queries.iter().map(|q| q.object_id).collect())
.await;
let editing_query_collab_results: HashMap<Uuid, QueryCollabResult> =
tokio::task::spawn_blocking(move || {
let par_iter = editing_results.into_par_iter();
par_iter
.map(|(object_id, encoded_collab)| {
let encoding_result = encoded_collab.encode_to_bytes();
let query_collab_result = match encoding_result {
Ok(encoded_collab_bytes) => QueryCollabResult::Success {
encode_collab_v1: encoded_collab_bytes,
},
Err(err) => QueryCollabResult::Failed {
error: err.to_string(),
},
};
(object_id, query_collab_result)
})
.collect()
})
.await
.unwrap();
let editing_object_ids: Vec<_> = editing_query_collab_results.keys().cloned().collect();
results.extend(editing_query_collab_results);
valid_queries
.into_iter()
.filter(|q| !editing_object_ids.contains(&q.object_id))
.collect()
} else {
valid_queries
};
results.extend(
self
.cache
.batch_get_encode_collab(&workspace_id, cache_queries)
.await,
);
results
self
.cache
.batch_get_full_collab(&workspace_id, queries, None, EncoderVersion::V1)
.await
}
async fn delete_collab(&self, workspace_id: &Uuid, uid: &i64, object_id: &Uuid) -> AppResult<()> {

View File

@@ -81,7 +81,7 @@ impl CollabStore {
) -> AppResult<(Rid, Bytes)> {
match self
.collab_cache
.get_encode_collab(&workspace_id, QueryCollab::new(object_id, collab_type))
.get_snapshot_collab(&workspace_id, QueryCollab::new(object_id, collab_type))
.await
{
Ok((rid, collab)) => Ok((rid, collab.doc_state)),

View File

@@ -1963,7 +1963,7 @@ async fn batch_get_collab_handler(
let result = BatchQueryCollabResult(
state
.collab_access_control_storage
.batch_get_collab(&uid, workspace_id, payload.into_inner().0, false)
.batch_get_collab(&uid, workspace_id, payload.into_inner().0)
.await,
);
Ok(Json(AppResponse::Ok().with_data(result)))

View File

@@ -924,7 +924,7 @@ pub async fn list_database_row_details(
})
.collect();
let mut db_row_details = collab_storage
.batch_get_collab(&uid, workspace_uuid, query_collabs, true)
.batch_get_collab(&uid, workspace_uuid, query_collabs)
.await
.into_iter()
.flat_map(|(id, result)| match result {
@@ -994,7 +994,7 @@ pub async fn list_database_row_details(
})
.collect::<Vec<_>>();
let mut query_res = collab_storage
.batch_get_collab(&uid, workspace_uuid, query_db_docs, true)
.batch_get_collab(&uid, workspace_uuid, query_db_docs)
.await;
for row_detail in &mut db_row_details {
if let Err(err) = fill_in_db_row_doc(client_id, row_detail, &doc_id_by_row_id, &mut query_res)

View File

@@ -289,7 +289,7 @@ pub async fn batch_get_latest_collab_encoded(
})
.collect();
let query_collab_results = collab_storage
.batch_get_collab(&uid, workspace_id, queries, true)
.batch_get_collab(&uid, workspace_id, queries)
.await;
let encoded_collabs = tokio::task::spawn_blocking(move || {
let collabs: HashMap<_, EncodedCollab> = query_collab_results

View File

@@ -323,7 +323,7 @@ async fn duplicate_document(
})
.collect();
let query_results = collab_storage
.batch_get_collab(&uid, workspace_id, queries, true)
.batch_get_collab(&uid, workspace_id, queries)
.await;
let mut collab_params_list = vec![];
for (collab_id, query_result) in query_results {

View File

@@ -2254,7 +2254,7 @@ async fn get_page_collab_data_for_database(
})
.collect();
let row_query_collab_results = collab_access_control_storage
.batch_get_collab(&uid, *workspace_id, queries, true)
.batch_get_collab(&uid, *workspace_id, queries)
.await;
let row_data = tokio::task::spawn_blocking(move || {
let row_collabs: HashMap<_, _> = row_query_collab_results

View File

@@ -10,48 +10,131 @@ use tokio::time::{sleep, Duration};
/// Run servers and stress tests:
/// cargo run --package xtask -- --stress-test
///
/// Run without appflowy-worker:
/// cargo run --package xtask -- --no-worker
///
/// Run with optimizations (recommended for development):
/// cargo run --package xtask -- --ci
///
/// Run with full optimizations (production):
/// cargo run --package xtask -- --release
///
/// Run with profiling enabled:
/// cargo run --package xtask -- --profiling
///
/// Combine flags:
/// cargo run --package xtask -- --ci --stress-test --no-worker
///
/// Note: test start with 'stress_test' will be run as stress tests
#[tokio::main]
async fn main() -> Result<()> {
let is_stress_test = std::env::args().any(|arg| arg == "--stress-test");
let disable_log = std::env::args().any(|arg| arg == "--disable-log");
let no_worker = std::env::args().any(|arg| arg == "--no-worker");
let target_dir = "./target";
std::env::set_var("CARGO_TARGET_DIR", target_dir);
// Optimize Cargo for faster builds
std::env::set_var("CARGO_INCREMENTAL", "1");
// Only use sccache if it's available
if Command::new("sccache")
.arg("--version")
.output()
.await
.is_ok()
{
std::env::set_var("RUSTC_WRAPPER", "sccache");
println!("Using sccache for faster compilation");
}
// Add profile flags for optimized performance
let profile_flags = if std::env::args().any(|arg| arg == "--release") {
vec!["--profile", "release"] // Full optimization with LTO
} else if std::env::args().any(|arg| arg == "--ci") {
vec!["--profile", "ci"] // Faster compile, still optimized
} else if std::env::args().any(|arg| arg == "--profiling") {
vec!["--profile", "profiling"] // Release + debug info
} else {
vec![] // Default dev profile
};
let appflowy_cloud_bin_name = "appflowy_cloud";
let worker_bin_name = "appflowy_worker";
// Show which profile is being used
let profile_name = if profile_flags.contains(&"release") {
"release (full optimization + LTO)"
} else if profile_flags.contains(&"ci") {
"ci (fast compile + optimized)"
} else if profile_flags.contains(&"profiling") {
"profiling (release + debug info)"
} else {
"dev (fastest compile)"
};
println!("Using profile: {}", profile_name);
if no_worker {
println!("Worker disabled - running without appflowy-worker");
}
// Step 1: Kill existing processes
kill_existing_process(appflowy_cloud_bin_name).await?;
kill_existing_process(worker_bin_name).await?;
let kill_cloud_result = kill_existing_process(appflowy_cloud_bin_name);
let kill_worker_result = if no_worker {
// Still kill existing worker processes if they exist, even if we won't start a new one
kill_existing_process(worker_bin_name)
} else {
kill_existing_process(worker_bin_name)
};
// Step 2: Start servers sequentially
println!("Starting {} server...", appflowy_cloud_bin_name);
let mut appflowy_cloud_cmd = spawn_server(
"cargo",
&["run", "--features", "history, use_actix_cors"],
appflowy_cloud_bin_name,
disable_log,
None,
// Some(vec![("RUSTFLAGS", "--cfg tokio_unstable")]),
)?;
wait_for_readiness(appflowy_cloud_bin_name).await?;
let (kill_result1, kill_result2) = tokio::join!(kill_cloud_result, kill_worker_result);
kill_result1?;
kill_result2?;
println!("Starting {} server...", worker_bin_name);
let mut appflowy_worker_cmd = spawn_server(
"cargo",
&[
"run",
"--manifest-path",
"./services/appflowy-worker/Cargo.toml",
],
worker_bin_name,
disable_log,
None,
)?;
wait_for_readiness(worker_bin_name).await?;
// Step 2: Start servers
if no_worker {
println!("Starting appflowy-cloud only...");
} else {
println!("Starting servers in parallel...");
}
println!("All servers are up and running.");
// Prepare args with profile flags if specified
let mut cloud_args = vec!["run"];
cloud_args.extend(&profile_flags);
cloud_args.extend(&["--features", "history, use_actix_cors"]);
let appflowy_cloud_handle = tokio::spawn(async move {
let cmd = spawn_server("cargo", &cloud_args, appflowy_cloud_bin_name)?;
wait_for_readiness(appflowy_cloud_bin_name).await?;
Ok::<_, anyhow::Error>(cmd)
});
let appflowy_worker_handle = if no_worker {
None
} else {
let mut worker_args = vec!["run"];
worker_args.extend(&profile_flags);
worker_args.extend(&["--manifest-path", "./services/appflowy-worker/Cargo.toml"]);
Some(tokio::spawn(async move {
let cmd = spawn_server("cargo", &worker_args, worker_bin_name)?;
wait_for_readiness(worker_bin_name).await?;
Ok::<_, anyhow::Error>(cmd)
}))
};
// Wait for servers to be ready
let mut appflowy_cloud_cmd = appflowy_cloud_handle.await??;
let appflowy_worker_cmd = if let Some(handle) = appflowy_worker_handle {
Some(handle.await??)
} else {
None
};
if no_worker {
println!("AppFlowy Cloud is up and running (worker disabled).");
} else {
println!("All servers are up and running.");
}
// Step 3: Run stress tests if flag is set
let stress_test_cmd = if is_stress_test {
@@ -68,55 +151,58 @@ async fn main() -> Result<()> {
None
};
// Step 4: Monitor all processes
select! {
status = appflowy_cloud_cmd.wait() => {
handle_process_exit(status?, worker_bin_name)?;
},
status = appflowy_worker_cmd.wait() => {
handle_process_exit(status?, worker_bin_name)?;
},
status = async {
if let Some(mut stress_cmd) = stress_test_cmd {
stress_cmd.wait().await
} else {
futures::future::pending().await
}
} => {
if is_stress_test {
handle_process_exit(status?, "cargo test stress_test")?;
}
},
// Step 4: Monitor processes
match appflowy_worker_cmd {
Some(mut worker_cmd) => {
// Monitor both cloud and worker
select! {
status = appflowy_cloud_cmd.wait() => {
handle_process_exit(status?, appflowy_cloud_bin_name)?;
},
status = worker_cmd.wait() => {
handle_process_exit(status?, worker_bin_name)?;
},
status = async {
if let Some(mut stress_cmd) = stress_test_cmd {
stress_cmd.wait().await
} else {
futures::future::pending().await
}
} => {
if is_stress_test {
handle_process_exit(status?, "cargo test stress_test")?;
}
},
}
},
None => {
// Monitor only cloud (no worker)
select! {
status = appflowy_cloud_cmd.wait() => {
handle_process_exit(status?, appflowy_cloud_bin_name)?;
},
status = async {
if let Some(mut stress_cmd) = stress_test_cmd {
stress_cmd.wait().await
} else {
futures::future::pending().await
}
} => {
if is_stress_test {
handle_process_exit(status?, "cargo test stress_test")?;
}
},
}
},
}
Ok(())
}
fn spawn_server(
command: &str,
args: &[&str],
name: &str,
suppress_output: bool,
env_vars: Option<Vec<(&str, &str)>>,
) -> Result<tokio::process::Child> {
println!(
"Spawning {} process..., log enabled:{}",
name, suppress_output
);
fn spawn_server(command: &str, args: &[&str], name: &str) -> Result<tokio::process::Child> {
println!("Spawning {} process...", name,);
let mut cmd = Command::new(command);
cmd.args(args);
// Set environment variables if provided
if let Some(vars) = env_vars {
for (key, value) in vars {
cmd.env(key, value);
}
}
if suppress_output {
cmd.stdout(Stdio::null()).stderr(Stdio::null());
}
cmd
.spawn()
.context(format!("Failed to start {} process", name))