chore: mount new ws endpoint

This commit is contained in:
Bartosz Sypytkowski
2025-04-09 13:12:38 +02:00
committed by Nathan
parent 156ff53d9d
commit 6ec2eac998
10 changed files with 78 additions and 27 deletions

1
Cargo.lock generated
View File

@@ -1968,7 +1968,6 @@ dependencies = [
"rand 0.8.5",
"rayon",
"reqwest",
"rocksdb",
"scraper",
"semver",
"serde",

View File

@@ -55,7 +55,6 @@ tokio-tungstenite = { workspace = true, features = ["stream"] }
rand = "0.8.5"
smallvec = { workspace = true, features = ["serde", "const_generics", "const_new", "write"] }
collab-plugins = { workspace = true, features = [] }
rocksdb = { version = "0.22.0", default-features = false, features = ["zstd"] }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio-retry = "0.3"

View File

@@ -292,7 +292,7 @@ impl WorkspaceController {
client_id: ClientID,
cancel: CancellationToken,
) -> anyhow::Result<Option<WsConn>> {
let url = format!("{}/ws/v2/{}", options.url, options.workspace_id);
let url = format!("{}/{}", options.url, options.workspace_id);
tracing::info!("establishing WebScoket connection to: {}", url);
let mut req = url.into_client_request()?;
let headers = req.headers_mut();

View File

@@ -1,2 +1,2 @@
[toolchain]
channel = "1.81.0"
channel = "1.86.0"

View File

@@ -1,3 +1,7 @@
mod server;
mod session;
mod workspace;
pub use server::*;
pub use session::*;
pub use workspace::*;

View File

@@ -25,8 +25,8 @@ use yrs::{Doc, ReadTxn, StateVector, Transact, TransactionMut, Update};
pub struct CollabStore {
collab_cache: Arc<CollabCache>,
update_streams: StreamRouter,
awareness_broadcast: AwarenessGossip,
update_streams: Arc<StreamRouter>,
awareness_broadcast: Arc<AwarenessGossip>,
connection_manager: ConnectionManager,
}
@@ -43,20 +43,18 @@ impl CollabStore {
/// Maximum number of concurrent snapshots that can be sent to S3 at the same time.
const MAX_CONCURRENT_SNAPSHOTS: usize = 200;
pub async fn new(
pub fn new(
collab_cache: Arc<CollabCache>,
client: Client,
metrics: Arc<CollabStreamMetrics>,
) -> anyhow::Result<Arc<Self>> {
let connection_manager = ConnectionManager::new(client.clone()).await?;
let update_streams = StreamRouter::new(&client, metrics.clone())?;
let awareness_broadcast = AwarenessGossip::new(&client).await?;
Ok(Arc::new(Self {
connection_manager: ConnectionManager,
update_streams: Arc<StreamRouter>,
awareness_broadcast: Arc<AwarenessGossip>,
) -> Arc<Self> {
Arc::new(Self {
collab_cache,
update_streams,
awareness_broadcast,
connection_manager,
}))
})
}
pub fn updates(&self) -> &StreamRouter {

View File

@@ -1,3 +1,7 @@
mod actors;
mod collab_store;
mod messages;
pub use actors::*;
pub use collab_store::*;
pub use messages::*;

View File

@@ -1,32 +1,33 @@
use std::collections::HashMap;
use std::time::Duration;
use crate::state::AppState;
use actix::Addr;
use actix_http::header::AUTHORIZATION;
use actix_web::web::{Data, Path, Payload};
use actix_web::{get, web, HttpRequest, HttpResponse, Result, Scope};
use actix_web_actors::ws;
use secrecy::Secret;
use semver::Version;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tracing::{debug, error, instrument, trace};
use app_error::AppError;
use appflowy_collaborate::actix_ws::client::rt_client::RealtimeClient;
use appflowy_collaborate::actix_ws::server::RealtimeServerActor;
use appflowy_collaborate::collab::storage::CollabAccessControlStorage;
use appflowy_collaborate::ws2::{SessionInfo, WsServer, WsSession};
use authentication::jwt::{authorization_from_token, UserUuid};
use collab_rt_entity::user::{AFUserChange, RealtimeUser, UserMessage};
use collab_rt_entity::RealtimeMessage;
use secrecy::Secret;
use semver::Version;
use shared_entity::response::AppResponseError;
use crate::biz::authentication::jwt::{authorization_from_token, UserUuid};
use crate::state::AppState;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tracing::{debug, error, instrument, trace};
use uuid::Uuid;
pub fn ws_scope() -> Scope {
web::scope("/ws")
//.service(establish_ws_connection)
.service(web::resource("/v1").route(web::get().to(establish_ws_connection_v1)))
.service(web::resource("/v2/{workspace_id}").route(web::get().to(establish_ws_connection_v2)))
}
const MAX_FRAME_SIZE: usize = 65_536; // 64 KiB
@@ -102,6 +103,41 @@ pub async fn establish_ws_connection_v1(
.await
}
#[instrument(skip_all, err)]
pub async fn establish_ws_connection_v2(
request: HttpRequest,
payload: Payload,
path: Path<Uuid>,
state: Data<AppState>,
jwt_secret: Data<Secret<String>>,
) -> Result<HttpResponse> {
let workspace_id = path.into_inner();
let ws_server = state.ws_server.clone();
let access_token = request.extract_param(AUTHORIZATION.as_str())?;
let device_id = request.extract_param("X-AF-DeviceID")?;
let client_id: u64 = request
.extract_param("X-AF-ClientID")?
.parse()
.map_err(|_| AppError::InvalidRequest("client-id header missing or invalid".into()))?;
let auth = authorization_from_token(access_token.as_str(), &jwt_secret)?;
let user_uuid = UserUuid::from_auth(auth)?;
let uid = state.user_cache.get_user_uid(&user_uuid).await?;
let info = SessionInfo::new(client_id, uid, device_id);
tracing::debug!(
"accepting new session {} (client id: {}) for workspace: {}",
info.collab_origin(),
client_id,
workspace_id
);
ws::WsResponseBuilder::new(
WsSession::new(workspace_id, info, ws_server),
&request,
payload,
)
.frame_size(MAX_FRAME_SIZE * 2)
.start()
}
#[allow(clippy::too_many_arguments)]
#[inline]
async fn start_connect(

View File

@@ -12,7 +12,7 @@ use access_control::noops::collab::{
};
use access_control::noops::workspace::WorkspaceAccessControlImpl as NoOpsWorkspaceAccessControlImpl;
use access_control::workspace::WorkspaceAccessControl;
use actix::Supervisor;
use actix::{Actor, Supervisor};
use actix_identity::IdentityMiddleware;
use actix_session::storage::RedisSessionStore;
use actix_session::SessionMiddleware;
@@ -38,6 +38,7 @@ use appflowy_collaborate::collab::cache::CollabCache;
use appflowy_collaborate::collab::storage::CollabStorageImpl;
use appflowy_collaborate::command::{CLCommandReceiver, CLCommandSender};
use appflowy_collaborate::snapshot::SnapshotControl;
use appflowy_collaborate::ws2::{CollabStore, WsServer};
use appflowy_collaborate::CollaborationServer;
use collab_stream::awareness_gossip::AwarenessGossip;
use collab_stream::metrics::CollabStreamMetrics;
@@ -299,6 +300,13 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
snapshot_control,
rt_cmd_tx,
));
let collab_store = CollabStore::new(
collab_cache.clone().into(),
redis_conn_manager.clone(),
redis_stream_router.clone(),
awareness_gossip.clone(),
);
let ws_server = WsServer::new(collab_store).start();
let mailer = get_mailer(&config.mailer).await?;
@@ -350,6 +358,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
mailer,
ai_client: appflowy_ai_client,
indexer_scheduler,
ws_server,
})
}

View File

@@ -1,12 +1,12 @@
use std::sync::Arc;
use access_control::collab::{CollabAccessControl, RealtimeAccessControl};
use access_control::workspace::WorkspaceAccessControl;
use actix::Addr;
use anyhow::anyhow;
use dashmap::DashMap;
use gotrue_entity::gotrue_jwt::GoTrueServiceRoleClaims;
use secrecy::{ExposeSecret, Secret};
use sqlx::PgPool;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_stream::StreamExt;
use uuid::Uuid;
@@ -17,6 +17,7 @@ use appflowy_ai_client::client::AppFlowyAIClient;
use appflowy_collaborate::collab::cache::CollabCache;
use appflowy_collaborate::collab::storage::CollabAccessControlStorage;
use appflowy_collaborate::metrics::CollabMetrics;
use appflowy_collaborate::ws2::WsServer;
use appflowy_collaborate::CollabRealtimeMetrics;
use collab_stream::awareness_gossip::AwarenessGossip;
use collab_stream::metrics::CollabStreamMetrics;
@@ -59,6 +60,7 @@ pub struct AppState {
pub mailer: AFCloudMailer,
pub ai_client: AppFlowyAIClient,
pub indexer_scheduler: Arc<IndexerScheduler>,
pub ws_server: Addr<WsServer>,
}
impl AppState {