Live: allow publishing over Centrifuge subscription (#102325)

* Live: allow publishing over Centrifuge subscription

Currently when publishing over a Grafana Live channel,
the data is sent over the HTTP API. This works fine when
there is only a single Grafana instance running, but
when there are multiple instances, the data will only hit
one instance, which is often not desired: sometimes you need
to guarantee that the data appears on the same instance that
the frontend is connected to.

An example of this is in the Grafana LLM app when running the
MCP server. The MCP protocol is stateful; users subscribe to
a channel to get a long-lived stream of server-sent events,
then send subsequent requests to the server to get further
results. If there are multiple Grafana instances running then
the requests are likely to land on an instance other than the
one that the user is connected to.

This commit adds a new option to the `GrafanaLiveSrv` interface
that allows the user to publish data over the Centrifuge
subscription instead of the HTTP API. This is not the default and
should rarely be used, but is required to fulfil certain use cases.

* Address nits from code review

Co-authored-by: kay delaney <45561153+kaydelaney@users.noreply.github.com>

---------

Co-authored-by: kay delaney <45561153+kaydelaney@users.noreply.github.com>
This commit is contained in:
Ben Sully
2025-03-20 16:50:52 +00:00
committed by GitHub
parent 24ebacb10b
commit 9fce4311e9
6 changed files with 40 additions and 4 deletions

View File

@ -39,6 +39,20 @@ export interface LiveQueryDataOptions {
body: unknown; // processed queries, same as sent to `/api/query/ds`
}
/**
* @alpha -- experimental
*/
export interface LivePublishOptions {
/**
* Publish the data over the websocket instead of the HTTP API.
*
* This is not recommended for most use cases.
*
* @experimental
*/
useSocket?: boolean;
}
/**
* @alpha -- experimental
*/
@ -79,7 +93,7 @@ export interface GrafanaLiveSrv {
*
* @alpha -- experimental
*/
publish(address: LiveChannelAddress, data: unknown): Promise<unknown>;
publish(address: LiveChannelAddress, data: unknown, options?: LivePublishOptions): Promise<unknown>;
}
let singletonInstance: GrafanaLiveSrv;

View File

@ -175,6 +175,8 @@ export class CentrifugeLiveChannel<T = any> {
});
}
publish = (data: unknown) => this.subscription?.publish(data);
/**
* This will close and terminate all streams for this channel
*/

View File

@ -20,6 +20,7 @@ import { FetchResponse } from '@grafana/runtime/src/services/backendSrv';
import {
GrafanaLiveSrv,
LiveDataStreamOptions,
LivePublishOptions,
LiveQueryDataOptions,
StreamingFrameAction,
StreamingFrameOptions,
@ -42,7 +43,7 @@ export type CentrifugeSrvDeps = {
export type StreamingDataQueryResponse = Omit<DataQueryResponse, 'data'> & { data: [StreamingResponseData] };
export type CentrifugeSrv = Omit<GrafanaLiveSrv, 'publish' | 'getDataStream' | 'getQueryData'> & {
export type CentrifugeSrv = Omit<GrafanaLiveSrv, 'getDataStream' | 'getQueryData'> & {
getDataStream: (options: LiveDataStreamOptions) => Observable<StreamingDataQueryResponse>;
getQueryData: (
options: LiveQueryDataOptions
@ -244,6 +245,13 @@ export class CentrifugeService implements CentrifugeSrv {
getPresence: CentrifugeSrv['getPresence'] = (address) => {
return this.getChannel(address).getPresence();
};
/**
* Publish into a channel.
*/
publish = async (address: LiveChannelAddress, data: unknown, options?: LivePublishOptions) => {
return this.getChannel(address).publish(data);
};
}
// This is used to give a unique key for each stream. The actual value does not matter

View File

@ -3,7 +3,7 @@ import './transferHandlers';
import * as comlink from 'comlink';
import { LiveChannelAddress } from '@grafana/data';
import { LiveDataStreamOptions, LiveQueryDataOptions } from '@grafana/runtime';
import { LiveDataStreamOptions, LivePublishOptions, LiveQueryDataOptions } from '@grafana/runtime';
import { remoteObservableAsObservable } from './remoteObservable';
import { CentrifugeService, CentrifugeSrvDeps } from './service';
@ -42,6 +42,9 @@ const getPresence = async (address: LiveChannelAddress) => {
return await centrifuge.getPresence(address);
};
const publish = (address: LiveChannelAddress, data: unknown, options?: LivePublishOptions) =>
centrifuge.publish(address, data, options);
const workObj = {
initialize,
getConnectionState,
@ -49,6 +52,7 @@ const workObj = {
getStream,
getQueryData,
getPresence,
publish,
};
export type RemoteCentrifugeService = typeof workObj;

View File

@ -47,4 +47,8 @@ export class CentrifugeServiceWorkerProxy implements CentrifugeSrv {
this.centrifugeWorker.getStream(address) as Promise<comlink.Remote<Observable<LiveChannelEvent<T>>>>
);
};
publish: CentrifugeSrv['publish'] = (address, data, options) => {
return this.centrifugeWorker.publish(address, data, options);
};
}

View File

@ -93,7 +93,11 @@ export class GrafanaLiveService implements GrafanaLiveSrv {
*
* @alpha -- experimental
*/
publish: GrafanaLiveSrv['publish'] = async (address, data) => {
publish: GrafanaLiveSrv['publish'] = async (address, data, options) => {
if (options?.useSocket) {
return this.deps.centrifugeSrv.publish(address, data);
}
return this.deps.backendSrv.post(`api/live/publish`, {
channel: toLiveChannelId(address), // orgId is from user
data,