chore: include open telemetry tracing on web socket connections

This commit is contained in:
Bartosz Sypytkowski
2026-02-19 05:51:53 +01:00
parent 02bcf711b1
commit 68d3132254
2 changed files with 82 additions and 2 deletions

View File

@@ -1,11 +1,13 @@
import { type Context, type Span } from '@opentelemetry/api';
import * as random from 'lib0/random';
import { useCallback, useMemo, useState } from 'react';
import { useCallback, useEffect, useMemo, useRef, useState } from 'react';
import useWebSocket from 'react-use-websocket';
import { getTokenParsed } from '@/application/session/token';
import { messages } from '@/proto/messages';
import { Log } from '@/utils/log';
import { getConfigValue } from '@/utils/runtime-config';
import { startWsConnectionSpan, endWsConnectionSpan, getWsTraceContext } from '@/utils/telemetry';
const wsURL = getConfigValue('APPFLOWY_WS_BASE_URL', 'ws://localhost:8000/ws/v2');
@@ -107,6 +109,7 @@ export const useAppflowyWebSocket = (options: Options): AppflowyWebSocketType =>
});
const url = `${options.url}/${options.workspaceId}/?clientId=${options.clientId}&deviceId=${options.deviceId}&token=${options.token}&cv=0.10.0&cp=web`;
const [reconnectAttempt, setReconnectAttempt] = useState(0);
const connectionSpanRef = useRef<{ span: Span; ctx: Context } | null>(null);
const { lastMessage, sendMessage, readyState, getWebSocket } = useWebSocket(url, {
share: true,
heartbeat: {
@@ -172,6 +175,14 @@ export const useAppflowyWebSocket = (options: Options): AppflowyWebSocketType =>
onOpen: () => {
Log.info('✅ WebSocket connection opened');
setReconnectAttempt(0);
// End any previous connection span (e.g. from a reconnect)
if (connectionSpanRef.current) {
endWsConnectionSpan(connectionSpanRef.current.span);
}
connectionSpanRef.current = startWsConnectionSpan(options.workspaceId);
const websocket = getWebSocket() as WebSocket | null;
if (websocket && websocket.binaryType !== 'arraybuffer') {
@@ -181,10 +192,22 @@ export const useAppflowyWebSocket = (options: Options): AppflowyWebSocketType =>
onClose: (event) => {
Log.info('❌ WebSocket connection closed', event);
if (connectionSpanRef.current) {
const isError = event.code !== CloseCode.NormalClose;
endWsConnectionSpan(connectionSpanRef.current.span, isError);
connectionSpanRef.current = null;
}
},
onError: (event) => {
Log.error('❌ WebSocket error', { event, deviceId: options.deviceId });
if (connectionSpanRef.current) {
connectionSpanRef.current.span.recordException(new Error('WebSocket error'));
}
},
onReconnectStop: (numAttempts) => {
@@ -196,6 +219,10 @@ export const useAppflowyWebSocket = (options: Options): AppflowyWebSocketType =>
(message: messages.IMessage, keep = true): void => {
Log.debug('sending sync message:', message);
if (connectionSpanRef.current) {
message.trace = getWsTraceContext(connectionSpanRef.current.ctx);
}
const protobufMessage = messages.Message.encode(message).finish();
sendMessage(protobufMessage, keep);
@@ -212,6 +239,15 @@ export const useAppflowyWebSocket = (options: Options): AppflowyWebSocketType =>
[lastMessage]
);
useEffect(() => {
return () => {
if (connectionSpanRef.current) {
endWsConnectionSpan(connectionSpanRef.current.span);
connectionSpanRef.current = null;
}
};
}, []);
return {
lastMessage: lastProtobufMessage,
sendMessage: sendProtobufMessage,

View File

@@ -1,7 +1,9 @@
import { context, propagation, Span, SpanStatusCode, trace } from '@opentelemetry/api';
import { type Context, context, propagation, Span, SpanKind, SpanStatusCode, trace } from '@opentelemetry/api';
import { W3CTraceContextPropagator } from '@opentelemetry/core';
import { WebTracerProvider } from '@opentelemetry/sdk-trace-web';
import { messages } from '@/proto/messages';
const TRACER_NAME = 'appflowy-web';
let initialized = false;
@@ -50,3 +52,45 @@ export function endHttpSpan(span: Span, error?: boolean) {
span.end();
}
/**
* Creates a root span for a WebSocket connection session.
* Returns the span and its context so child spans can be parented under it.
*/
export function startWsConnectionSpan(workspaceId: string): { span: Span; ctx: Context } {
const tracer = getTracer();
const span = tracer.startSpan(`WS connection ${workspaceId}`, {
kind: SpanKind.CLIENT,
attributes: { 'ws.workspace_id': workspaceId },
});
const ctx = trace.setSpan(context.active(), span);
return { span, ctx };
}
/**
* Ends a WebSocket connection span, optionally marking it as an error.
*/
export function endWsConnectionSpan(span: Span, error?: boolean): void {
if (error) {
span.setStatus({ code: SpanStatusCode.ERROR });
}
span.end();
}
/**
* Extracts the W3C trace context from the connection-level context.
* The returned value is embedded in the protobuf `IMessage.trace` field so the
* server can create per-message spans as children of the connection span.
*/
export function getWsTraceContext(connectionCtx: Context): messages.ITraceContext {
const carrier: Record<string, string> = {};
propagation.inject(connectionCtx, carrier);
return {
traceparent: carrier['traceparent'] || null,
tracestate: carrier['tracestate'] || null,
};
}