From 68d31322547b0771cbe550f92768f8ab3464bd61 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Thu, 19 Feb 2026 05:51:53 +0100 Subject: [PATCH] chore: include open telemetry tracing on web socket connections --- src/components/ws/useAppflowyWebSocket.ts | 38 ++++++++++++++++++- src/utils/telemetry.ts | 46 ++++++++++++++++++++++- 2 files changed, 82 insertions(+), 2 deletions(-) diff --git a/src/components/ws/useAppflowyWebSocket.ts b/src/components/ws/useAppflowyWebSocket.ts index 2c1962b7..71f3e8ca 100644 --- a/src/components/ws/useAppflowyWebSocket.ts +++ b/src/components/ws/useAppflowyWebSocket.ts @@ -1,11 +1,13 @@ +import { type Context, type Span } from '@opentelemetry/api'; import * as random from 'lib0/random'; -import { useCallback, useMemo, useState } from 'react'; +import { useCallback, useEffect, useMemo, useRef, useState } from 'react'; import useWebSocket from 'react-use-websocket'; import { getTokenParsed } from '@/application/session/token'; import { messages } from '@/proto/messages'; import { Log } from '@/utils/log'; import { getConfigValue } from '@/utils/runtime-config'; +import { startWsConnectionSpan, endWsConnectionSpan, getWsTraceContext } from '@/utils/telemetry'; const wsURL = getConfigValue('APPFLOWY_WS_BASE_URL', 'ws://localhost:8000/ws/v2'); @@ -107,6 +109,7 @@ export const useAppflowyWebSocket = (options: Options): AppflowyWebSocketType => }); const url = `${options.url}/${options.workspaceId}/?clientId=${options.clientId}&deviceId=${options.deviceId}&token=${options.token}&cv=0.10.0&cp=web`; const [reconnectAttempt, setReconnectAttempt] = useState(0); + const connectionSpanRef = useRef<{ span: Span; ctx: Context } | null>(null); const { lastMessage, sendMessage, readyState, getWebSocket } = useWebSocket(url, { share: true, heartbeat: { @@ -172,6 +175,14 @@ export const useAppflowyWebSocket = (options: Options): AppflowyWebSocketType => onOpen: () => { Log.info('✅ WebSocket connection opened'); setReconnectAttempt(0); + + // End any previous connection span (e.g. from a reconnect) + if (connectionSpanRef.current) { + endWsConnectionSpan(connectionSpanRef.current.span); + } + + connectionSpanRef.current = startWsConnectionSpan(options.workspaceId); + const websocket = getWebSocket() as WebSocket | null; if (websocket && websocket.binaryType !== 'arraybuffer') { @@ -181,10 +192,22 @@ export const useAppflowyWebSocket = (options: Options): AppflowyWebSocketType => onClose: (event) => { Log.info('❌ WebSocket connection closed', event); + + if (connectionSpanRef.current) { + const isError = event.code !== CloseCode.NormalClose; + + endWsConnectionSpan(connectionSpanRef.current.span, isError); + + connectionSpanRef.current = null; + } }, onError: (event) => { Log.error('❌ WebSocket error', { event, deviceId: options.deviceId }); + + if (connectionSpanRef.current) { + connectionSpanRef.current.span.recordException(new Error('WebSocket error')); + } }, onReconnectStop: (numAttempts) => { @@ -196,6 +219,10 @@ export const useAppflowyWebSocket = (options: Options): AppflowyWebSocketType => (message: messages.IMessage, keep = true): void => { Log.debug('sending sync message:', message); + if (connectionSpanRef.current) { + message.trace = getWsTraceContext(connectionSpanRef.current.ctx); + } + const protobufMessage = messages.Message.encode(message).finish(); sendMessage(protobufMessage, keep); @@ -212,6 +239,15 @@ export const useAppflowyWebSocket = (options: Options): AppflowyWebSocketType => [lastMessage] ); + useEffect(() => { + return () => { + if (connectionSpanRef.current) { + endWsConnectionSpan(connectionSpanRef.current.span); + connectionSpanRef.current = null; + } + }; + }, []); + return { lastMessage: lastProtobufMessage, sendMessage: sendProtobufMessage, diff --git a/src/utils/telemetry.ts b/src/utils/telemetry.ts index 783c59c8..8484dbdd 100644 --- a/src/utils/telemetry.ts +++ b/src/utils/telemetry.ts @@ -1,7 +1,9 @@ -import { context, propagation, Span, SpanStatusCode, trace } from '@opentelemetry/api'; +import { type Context, context, propagation, Span, SpanKind, SpanStatusCode, trace } from '@opentelemetry/api'; import { W3CTraceContextPropagator } from '@opentelemetry/core'; import { WebTracerProvider } from '@opentelemetry/sdk-trace-web'; +import { messages } from '@/proto/messages'; + const TRACER_NAME = 'appflowy-web'; let initialized = false; @@ -50,3 +52,45 @@ export function endHttpSpan(span: Span, error?: boolean) { span.end(); } + +/** + * Creates a root span for a WebSocket connection session. + * Returns the span and its context so child spans can be parented under it. + */ +export function startWsConnectionSpan(workspaceId: string): { span: Span; ctx: Context } { + const tracer = getTracer(); + const span = tracer.startSpan(`WS connection ${workspaceId}`, { + kind: SpanKind.CLIENT, + attributes: { 'ws.workspace_id': workspaceId }, + }); + const ctx = trace.setSpan(context.active(), span); + + return { span, ctx }; +} + +/** + * Ends a WebSocket connection span, optionally marking it as an error. + */ +export function endWsConnectionSpan(span: Span, error?: boolean): void { + if (error) { + span.setStatus({ code: SpanStatusCode.ERROR }); + } + + span.end(); +} + +/** + * Extracts the W3C trace context from the connection-level context. + * The returned value is embedded in the protobuf `IMessage.trace` field so the + * server can create per-message spans as children of the connection span. + */ +export function getWsTraceContext(connectionCtx: Context): messages.ITraceContext { + const carrier: Record = {}; + + propagation.inject(connectionCtx, carrier); + + return { + traceparent: carrier['traceparent'] || null, + tracestate: carrier['tracestate'] || null, + }; +}